`
sheungxin
  • 浏览: 103449 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Oracle数据同步项目yugong

阅读更多
  • 概述
之前尝试了基于物化视图+java source的oracle数据同步方案,为了把物化视图的变化信息传递给java source发送给外部程序,需要使用触发器、存储过程/函数。触发器用于监控物化视图的数据变化,调用存储过程从而间接调用java source(存储过程可以指向一个java source)。该方案的缺点如下:
1、给每个同步表建立物化视图,会消耗存储资源
2、java source部分代码可能需要依赖第三方包,需要在数据库服务器上加载大量外部jar包
3、每个物化视图上都需要建立触发器,监控数据变化(看了yugong后,在想是否直接在物化视图日志上建立触发器,但相对就会繁琐,需要根据日志表记录从主表拿数据,然后传递给java source,最后删除日志)

  • yugong
最近看了阿里基于oracle的数据库迁移项目yugong,其也是基于物化视图进行的实现。差异在于:
1、虽然都是基于物化视图,但yugong项目仅使用物化视图日志,且创建物化视图日志时使用了参数PRIMARY KEY、SEQUENCE,这样日志中将包含主键列、操作序号。extractor会按照SEQUENCE$$顺序抽取变化数据,并把已抽取的变化数据从日志表中删除。而原来的方案使用的物化视图的快速刷新,commit后自动刷新到物化视图,日志被清空
2、yugong中数据抽取使用的是jdbc,即extractor部分(从源库抽取数据),根据日志表中的主键列从源表获取数据;而原方案使用触发器获取数据变化。
3、yugong的applier部分(更新到目标库),同样使用jdbc,直接把转化后的数据更新入目标库;而原方案利用java source把变化数据发送给外部程序处理。
4、yugong引入了Translator,用于异构数据转化;而原方案使用外部的consumer处理

yugong详细介绍,可以参考:
https://github.com/alibaba/yugong/wiki/AdminGuide
http://blog.csdn.net/sunnylinner/article/details/52064637
物化视图详细介绍,可以参考:
http://www.cnblogs.com/linjiqin/archive/2012/05/23/2514795.html

下面主要介绍下使用时遇到的问题:
1、SEQUENCE$$标识符无效,原因在于查询增量记录时以SEQUENCE$$进行排序,获取顺序操作记录。用于测试的表之前创建过物化视图日志,但是创建时未使用SEQUENCE
2、yugong.extractor.noupdate.thresold,需要注意该值的设置,小于等于0将一直处于增量状态。若大于0,处于追赶状态,执行增量次数超过该值将结束增量,释放资源给下一个表。需要与yugong.table.concurrent.size配合使用。比如:thresold=0、concurrent.size=1,同步两张表,只有一张表处于同步状态,另一张表处于等待状态。因为只有一个处理线程,而thresold=0线程一直得不到释放。同样,thresold=3、concurrent.size=5,还是同步两张表,同步线程不会因为并发线程数多而不释放。因此,如果需要持续同步大量表,就需要设置thresold=0、concurrent.size=n,n大于等于同步表数。n可能很大,也可以想办法让同步过的表再次加入同步队列。
3、大量表持续同步时开启并行模式,抽取、入目标库都使用多线程,因此应该独立部署。但前提与两个数据库都可直连
4、yugong每个表对应一个instance,负责表的迁移,包含extractor、translator、applier。extractor与translator、applier无法分离,独立部署,不适用目标库无法直连的情况

Clob字段类型无法正常同步的问题:Clob类型字段在数据抽取时被转换为String类型的值,但ColumnMeta.type没有改变,值与类型不匹配。在数据更新或插入目标库中时,ps.setObject(index,cv.getValue(),cv.getColumn().getType()),String类型无法转换为Clob类型,sql执行失败。解决方案:重新设置字段类型,与值保持一致,col.setType(Types.VARCHAR);
public abstract class AbstractOracleRecordExtractor extends AbstractRecordExtractor {
	
	protected ColumnValue getColumnValue(ResultSet rs, String encoding, ColumnMeta col) throws SQLException {
		if(){
			...
		}else if (YuGongUtils.isClobType(col.getType())) {
			value = rs.getString(col.getName());
			col.setType(Types.VARCHAR);
		}
		...
	}

}

Blob字段类型同样无法正常同步:和Clob同样的问题,获取数据时被转换为byte[],ps.setObject时值与类型不一致。

注意事项:最初考虑取值时不转换,直接取Blob,这样值与类型就一致了,但同样失败:表或视图不存在。原因在于Blob使用LOCATOR(定位器)实现,指向数据库中SQL BLOB,不能把A库中的BLOB作为值直接作用于B库中的BLOB。BLOB可参考:http://blog.csdn.net/terryzero/article/details/3939014

解决方案:在Applier中调用ps.setObject(index,cv.getValue(),cv.getColumn().getType())时,进行数据类型判断,当类型为Types.BLOB时,执行ps.setBinaryStream(index, new ByteArrayInputStream((byte[])cv.getValue()));
if(cv.getColumn().getType()==Types.BLOB){
	ps.setBinaryStream(index, new ByteArrayInputStream((byte[])cv.getValue()));
}else{
	ps.setObject(index, cv.getValue(), cv.getColumn().getType());
}

同样CLOB也可以用同样的解决方案:取值时仍然转换为String类型,占位符赋值时处理
if(cv.getColumn().getType()==Types.CLOB){
	ps.setCharacterStream(index, new StringReader((String)cv.getValue()));
}else{
	ps.setObject(index, cv.getValue(), cv.getColumn().getType());
}

java.lang.AbstractMethodError:oracle.jdbc.driver.T4CPreparedStatement.setBlob(ILjava/io/InputStream:Oracle驱动版本的问题,数据库驱动改ojdbc6.jar  即可

全局schema问题
1、源库:不指定表时,默认取当前连接schema下的所有表;指定表时,若不指定schema,有可能取到多个schema(一个表存在多个schema中),这种情况需要指定schema。源码详见:TableMetaGenerator.getTableMetasWithoutColumn
2、目标库:默认使用源库的schema,当源库和目标库不一致时,需要增加全局schema转换器,实现如下:
  • 增加配置项:yugong.applier.table.schema
  • 增加全局schema转换类:SchemaDataTranslator,代码如下:
public class SchemaDataTranslator extends AbstractDataTranslator implements DataTranslator {
	
	private String tableSchema;
	
	public SchemaDataTranslator(String tableSchema){
		this.tableSchema=tableSchema;
	}
	
	public String translatorSchema() {
        return tableSchema;
    }

	 public List<Record> translator(List<Record> records) {
		 for (Record record : records) {
			 String schema = translatorSchema();
            if (schema != null) {
                record.setSchemaName(schema);
            }
        }
        return records;
	}
}
  • 初始化instance时引入Translator:yugong.applier.table.schema,修改YuGongController.buildTranslator代码如下:
private DataTranslator buildTranslator(String name) throws Exception {
        String tableName = YuGongUtils.toPascalCase(name);
        String translatorName = tableName + "DataTranslator";
        String packageName = DataTranslator.class.getPackage().getName();
        Class clazz = null;
        try {
            clazz = Class.forName(packageName + "." + translatorName);
        } catch (ClassNotFoundException e) {
            File file = new File(translatorDir, translatorName + ".java");
            if (!file.exists()) {
                // 兼容下表名
                file = new File(translatorDir, tableName + ".java");
                if (!file.exists()) {
                	String targetSchema=config.getString("yugong.applier.table.schema", null);
                	if(StringUtils.isNotBlank(targetSchema)){
                		return new SchemaDataTranslator(targetSchema);
                	}
                    return null;
                }
            }

            String javaSource = StringUtils.join(IOUtils.readLines(new FileInputStream(file)), "\n");
            clazz = compiler.compile(javaSource);
        }

        return (DataTranslator) clazz.newInstance();
    }
1
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics