`
aswang
  • 浏览: 838316 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

基于Oracle Streams + Oracle AQ 捕获变更,发布变更(一)

阅读更多
要求:使用Oracle Streams捕获某个用户下部分表的DML操作变更,并通过Oracle的AQ(高级队列)对外发布,然后Java端通过JMS来获取变更,并执行后续同步操作。
数据库部分:
1、使用Streams要求Oracle以归档模式运行,归档日志默认存放在DB_RECOVERY_FILE_DEST指定的位置,由于该区域有大小限制,
所以,为了避免空间不足导致的后续问题,首先需要修改Oracle归档日志目录
 
alter system set log_archive_dest='/home/dev/app/dev/oradata/archivelog';
--其中archivelog目录为手动创建
--如果该语句执行失败,可以先尝试将DB_RECOVERY_FILE_DEST置空,再执行上述语句。
alter system set DB_RECOVERY_FILE_DEST='';
 
2、修改数据库为归档模式运行:
shutdown immediate;
startup nomount;
alter database mount;
alter database archivelog;
archive log list;
3、修改系统参数:
alter system set aq_tm_processes=2 scope=both;
alter system set global_names=true scope=both;
alter system set job_queue_processes=10 scope=both;
alter system set parallel_max_servers=20 scope=both;
alter system set undo_retention=3600 scope=both;
alter system set streams_pool_size=25M scope=spfile; 
 
4、更改数据设置,增加补充日志信息
alter database add supplemental log data;
--该语句是在数据库级别添加的补充日志,也可以单独为表设置
alter table user add supplemental log group g_user (id) always;
 
5、创建用于Oracle Streams的表空间和用户
create tablespace streams_tbs datafile '/home/dev/app/dev/oradata/stream_tbs.dbf'
  size 25M Reuse autoextend on maxsize unlimited;
 
create user stradmin
identified by stradmin
default tablespace streams_tbs quota unlimited on streams_tbs;  
6、授权
grant dba,select_catalog_role to strmadmin; 
 
--执行系统存储过程,分配权限
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
 privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
 grantee => 'stradmin',
 grant_option => FALSE);
END;
/
 
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
 privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
 grantee => 'stradmin',
 grant_option => FALSE);
END;
/
 
7、创建用于捕获进程的队列:
exec DBMS_STREAMS_ADM.SET_UP_QUEUE(
  queue_table => 'streams_queue_table', 
  queue_name => 'streams_queue');
 
8、创建用于捕获数据变更的捕获进程:
begin
   dbms_streams_adm.add_schema_rules(
     schema_name => 'scott',
     streams_type => 'capture',
     streams_name => 'capture_jms',
     queue_name => 'strmadmin.streams_queue',
     include_dml => true,
     include_ddl => false,
  source_database => 'orcl',
     inclusion_rule => true,
  and_condition => ':lcr.get_compatible()<dbms_streams.max_compatible()'
  );
 end;
/
说明:其中include_dml和include_ddl用于指明是否希望捕获DML操作和DDL操作。
and_condition用于添加捕获规则。
该捕获进程会捕获scott用户下所有表的dml变更
 
9、创建用于Oracle AQ的消息队列
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE(
        Queue_table => 'jms_queue_table',
        Queue_payload_type => 'SYS.AQ$_jms_bytes_message',
        multiple_consumers => true);
   DBMS_AQADM.CREATE_QUEUE(
      Queue_name => 'jms_queue',
      Queue_table => 'jms_queue_table');
   DBMS_AQADM.START_QUEUE(queue_name=> 'jms_queue');
END;
/
说明:
    应用进程会将streams_queue队列中的消息出队,然后经过转换插入到该队列中。
    queue_payload_type用于指定该队列存放的用户数据的类型。
 
10、初始化SCN
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_SCHEMA_INSTANTIATION_SCN(
 source_schema_name => 'scott',
    source_database_name => 'orcl',
    instantiation_scn => iscn,
    recursive => true);
END;
/
11、为AQ消息队列创建代理和订阅者
--6、为消息队列创建代理
exec DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'jms_agent');
exec DBMS_AQADM.ENABLE_DB_ACCESS(agent_name => 'jms_agent',
  db_username => 'strmadmin');
 
DECLARE
 subscriber SYS.AQ$_AGENT;
BEGIN
 subscriber := SYS.AQ$_AGENT('jms_agent', NULL, NULL);
 SYS.DBMS_AQADM.ADD_SUBSCRIBER(
  queue_name => 'strmadmin.jms_queue',
  subscriber => subscriber,
  rule => NULL,
  transformation => NULL);
END;
/
备注:这里的agent_name和jms_queue会在java的JMS中使用到。
 
12、创建应用进程
BEGIN
   dbms_streams_adm.add_schema_rules(
      schema_name => 'scott',
      streams_type => 'apply',
      streams_name => 'apply_jms',
      queue_name => 'strmadmin.streams_queue',
      include_dml => true,
      include_ddl => false,
      source_database => 'orcl',
      inclusion_rule => true);
END;
/
说明:默认情况下,应用进程会将streams_queue中的消息(LCR)出队,然后将消息对应的变更应用到指定的数据库中。
但在这里,我们希望通过应用进程进行消息转换,并插入到AQ的消息队列中,所以后面还需要针对目标表设置处理器。
 
13、创建用于应用进程的消息处理存储过程,对LCR进行转换,并插入到jms_queue队列
CREATE OR REPLACE PROCEDURE enq_jms_lcr(in_any IN SYS.ANYDATA) IS
  message SYS.AQ$_jms_bytes_message;
  enqueue_options dbms_aq.enqueue_options_t;
  message_properties dbms_aq.message_properties_t;
  msgid raw(16);
  lcr SYS.LCR$_ROW_RECORD;
  rc PLS_INTEGER;
  agent sys.aq$_agent := sys.aq$_agent('jms_agent', null, NULL);
 
  ID number;
  tablename varchar2(100);
  action varchar2(100);
  updatetime date;
BEGIN
 rc := in_any.GETOBJECT(lcr);
 tablename := lcr.get_object_name();
 action := lcr.get_command_type();
 updatetime := lcr.get_source_time();
 
 if action = 'UPDATE' or action = 'DELETE' then
  ID := lcr.get_value('OLD', 'ID').ACCESSnumber();
 end if;
 
 if action = 'INSERT' then
  ID := lcr.get_value('NEW', 'ID').ACCESSnumber();
 end if;
 
 message := SYS.AQ$_jms_bytes_message.construct;
    message.set_type('map');
    message.set_userid('strmadmin');
    message.set_appid('jms_sync');
    message.set_groupid('jms');
    message.set_groupseq(1);
 
 message.set_long_property('id',ID);
 message.set_long_property('areacode',areacode);
 message.set_string_property('tablename', tablename);
 message.set_string_property('action', action);
 
 dbms_aq.enqueue(queue_name => 'strmadmin.jms_queue',
                  enqueue_options => enqueue_options,
                  message_properties => message_properties,
                  payload => message,
                  msgid => msgid);
END;
/
说明:在这个存储过程中,我们仅仅抓取了数据变更的关键信息:主键ID,表名,DML动作以及时间。
然后将信息封装在SYS.AQ$_jms_bytes_message中,并放入队列jms_queue中。
 
14、创建一个空的消息处理存储过程
CREATE OR REPLACE PROCEDURE empty_jms_lcr(in_any IN SYS.ANYDATA) IS
i number;
BEGIN
i:= 1;
END;
/
说明:由于前面的捕获进程会捕获用户下所有受支持的表的dml变更,但有少数表,我们不希望对其处理,
这个时候我们就可以通过一个什么都不做的存储过程,来覆盖应用进程的默认行为。否则,应用进程会默认将
这些表的变更进行处理,这时就会报错ORA-25215 user_data type and queue type do not match.
 
15、在上面建立了两个消息处理存储过程以后,我们需要将其设置为对应表的处理器。
--工具函数
create or replace PROCEDURE set_dml_handler(tablename varchar2, action varchar2, handler varchar2) is
begin
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name => tablename ,
    object_type => 'TABLE',
    operation_name => action,
    error_handler => false,
    user_procedure => handler,
    apply_database_link => NULL);
end;
/
 
--为目标表配置处理器stradmin.enq_jms_lcr
declare
  here number;
  type array_type is table of varchar2(100);
  --定义需要同步的表
  tables array_type := array_type('scott.USER');
 
  cursor c_table is
    select T.OWNER||'.'||T.TABLE_NAME tablename
      from dba_tables t
     where t.owner = 'scott'
       and instr(t.table_name,'EST') = 1 ;
 
begin
  for t in c_table loop
  here := 0;
    for i IN 1..tables.count loop
      if tables(i) = t.tablename then
        here := 1;
      end if;
    end loop;
 
 if here = 1 then
  dbms_output.put_line('enq_jms_lcr : ' || t.tablename);
        set_dml_handler(t.tablename, 'UPDATE', 'strmadmin.enq_jms_lcr');
        set_dml_handler(t.tablename, 'INSERT', 'strmadmin.enq_jms_lcr');
        set_dml_handler(t.tablename, 'DELETE', 'strmadmin.enq_jms_lcr');
 end if;
 
 if here = 0 then
  dbms_output.put_line('empty_jms_lcr : ' || t.tablename);
        set_dml_handler(t.tablename, 'UPDATE', 'strmadmin.empty_jms_lcr');
        set_dml_handler(t.tablename, 'INSERT', 'strmadmin.empty_jms_lcr');
        set_dml_handler(t.tablename, 'DELETE', 'strmadmin.empty_jms_lcr');
 end if;
  end loop;
end;
/
说明:上述存储过程,对应希望同步的表会设置enq_jms_lcr,而对于其它表则设置empty_jms_lcr。
 
16、启动应用进程以及捕获进程
exec DBMS_APPLY_ADM.SET_PARAMETER(apply_name => 'apply_jms',parameter => 'disable_on_error',value => 'n');
exec DBMS_APPLY_ADM.START_APPLY(apply_name => 'apply_jms');
exec DBMS_CAPTURE_ADM.START_CAPTURE(capture_name => 'capture_jms');
17、在完成上述操作以后,我们可以尝试对USER表进行INSERT、UPDATE和DELTE操作,然后查看jms_queue_table表中是否包含对应的转换后的数据
select t.rowid, t.* from jms_queue_table t;
如果有对应的记录,则说明配置成功,后面可以编写java代码来接受消息并处理了。
 
过程中的坑:
        在为应用进程创建消息处理存储过程以后,应用进程运行一次或几次之后,总会报错,状态为ABORT,错误信息为:ORA-25215 user_data type and queue type do not match.
错误原因描述的很清楚了,但是在检查了前后的语句,并没有发现数据类型与队列的数据类型不一致,
存储过程中放入队列的数据类型与创建队列时的类型均为SYS.AQ$_jms_bytes_message。
在google了N多信息,大体的解释都一样,说是保证进入队列的数据类型与队列表中的数据类型一致。
      于是在这个地方卡了很久,最后突然意识到,上面的捕获进程默认捕获用户下所有的表的变更,
而应用进程会自动的从捕获进程的队列strems_queue中获取所有变更,并将其应用到jms_queue队列。
但在刚开始时,默认只给需要同步的表设置了消息处理器,那么其它的表的变更消息在应用进程获取以后,
就没有转换为SYS.AQ$_jms_bytes_message,而是直接尝试进入队列,于是就报错类型不匹配了。
      在知道这个原因后,就为其它不需要同步的表,建立了一个空的消息处理器,来覆盖应用进程默认的行为,
从而确保应用进程不再报错终止运行。
 
 
错误排查:
查看捕获进程状态:
SELECT t.capture_name, t.capture_user, t.capture_type, t.status FROM DBA_CAPTURE t Where t.capture_name = 'CAPTURE_JMS';
 
查看应用进程状态:
select apply_name, queue_name, status, t.error_number, t.error_message from dba_apply t;
 
查看应用进程错误信息:
select t.rowid, t.* from dba_apply_error t where t.apply_name = 'APPLY_JMS';
查看消息队列:
SELECT t.rowid, t.* from JMS_QUEUE_TABLE t;
查看相关规则:
select * from dba_streams_rules;
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics