`
jiayj198609
  • 浏览: 147430 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

java实现高性能的数据同步

    博客分类:
  • Java
 
阅读更多
     最近在做一个银行的生产数据脱敏系统,今天写代码时遇到了一个“瓶颈”,脱敏系统需要将生产环境上Infoxmix里的数据原封不动的Copy到另一台 Oracle数据库服务器上,然后对Copy后的数据作些漂白处理。为了将人为干预的因素降到最低,在系统设计时采用Java代码对数据作Copy,思路 如图:



    首 先在代码与生产库间建立一个Connection,将读取到的数据放在ResultSet对象,然后再与开发库建立一个Connection。从 ResultSet取出数据后通过TestConnection插入到开发库,以此来实现Copy。代码写完后运行程序,速度太慢了,一秒钟只能Copy 一千条数据,生产库上有上亿条数据,按照这个速度同步完要到猴年马月呀,用PreparedStatement批处理速度也没有提交多少。我想能不能用多 线程处理,多个人干活总比一个人干活速度要快。
    假设生产库有1万条数据,我开5个线程,每个线程分2000条数据,同时向开发库里插数据,Oracle支持高并发这样的话速度至少会提高好多倍,按照这 个思路重新进行了编码,批处理设置为1万条一提交,统计插入数量的变量使用 java.util.concurrent.atomic.AtomicLong,程序一运行,传输速度飞快CPU利用率在70%~90%,现在一秒钟可 以拷贝50万条记录,没过几分钟上亿条数据一条不落地全部Copy到目标库。

在查询的时候我用了如下语句
String queryStr = "SELECT * FROM xx";
ResultSet coreRs = PreparedStatement.executeQuery(queryStr);

实习生问如果xx表里有上千万条记录,你全部查询出来放到ResultSet, 那内存不溢出了么?Java在设计的时候已经考虑到这个问题了,并没有查询出所有的数据,而是只查询了一部分数据放到ResultSet,数据“用完”它 会自动查询下一批数据,你可以用setFetchSize(int rows)方法设置一个建议值给ResultSet,告诉它每次从数据库Fetch多少条数据。但我不赞成,因为JDBC驱动会根据实际情况自动调整 Fetch的数量。另外性能也与网线的带宽有直接的关系。
相关代码

package com.dlbank.domain;  
  
import java.sql.Connection;  
import java.sql.PreparedStatement;  
import java.sql.ResultSet;  
import java.sql.Statement;  
import java.util.List;  
import java.util.concurrent.atomic.AtomicLong;  
  
import org.apache.log4j.Logger;  
  
/** 
 *<p>title: 数据同步类 </p>   
 *<p>Description: 该类用于将生产核心库数据同步到开发库</p>   
 *@author Tank Zhang  
 */  
public class CoreDataSyncImpl implements CoreDataSync {  
      
    private List<String> coreTBNames; //要同步的核心库表名  
    private ConnectionFactory connectionFactory;  
    private Logger log = Logger.getLogger(getClass());  
      
    private AtomicLong currentSynCount = new AtomicLong(0L); //当前已同步的条数  
      
    private int syncThreadNum;  //同步的线程数  
  
    @Override  
    public void syncData(int businessType) throws Exception {  
          
        for (String tmpTBName : coreTBNames) {  
            log.info("开始同步核心库" + tmpTBName + "表数据");  
            // 获得核心库连接  
            Connection coreConnection = connectionFactory.getDMSConnection(4);  
            Statement coreStmt = coreConnection.createStatement();  
            //为每个线程分配结果集  
            ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);  
            coreRs.next();  
            //总共处理的数量  
            long totalNum = coreRs.getLong(1);  
            //每个线程处理的数量  
            long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));   
            log.info("共需要同步的数据量:"+totalNum);  
            log.info("同步线程数量:"+syncThreadNum);  
            log.info("每个线程可处理的数量:"+ownerRecordNum);  
            // 开启五个线程向目标库同步数据  
            for(int i=0; i < syncThreadNum; i ++){  
                StringBuilder sqlBuilder = new StringBuilder();  
                //拼装后SQL示例  
                //Select * From dms_core_ds Where id between 1 And 657398  
                //Select * From dms_core_ds Where id between 657399 And 1314796  
                //Select * From dms_core_ds Where id between 1314797 And 1972194  
                //Select * From dms_core_ds Where id between 1972195 And 2629592  
                //Select * From dms_core_ds Where id between 2629593 And 3286990  
                //..  
                sqlBuilder.append("Select * From ").append(tmpTBName)  
                        .append(" Where id between " ).append(i * ownerRecordNum +1)  
                        .append( " And ")  
                        .append((i * ownerRecordNum + ownerRecordNum));  
                Thread workThread = new Thread(  
                        new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));  
                workThread.setName("SyncThread-"+i);  
                workThread.start();  
            }  
            while (currentSynCount.get() < totalNum);  
            //休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作);  
            //Thread.sleep(1000 * 3);  
            log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据");  
        }  
    }// end for loop  
      
    public void setCoreTBNames(List<String> coreTBNames) {  
        this.coreTBNames = coreTBNames;  
    }  
  
    public void setConnectionFactory(ConnectionFactory connectionFactory) {  
        this.connectionFactory = connectionFactory;  
    }  
      
    public void setSyncThreadNum(int syncThreadNum) {  
        this.syncThreadNum = syncThreadNum;  
    }  
      
    //数据同步线程  
    final class WorkerHandler implements Runnable {  
        ResultSet coreRs;  
        String queryStr;  
        int businessType;  
        String targetTBName;  
        public WorkerHandler(String queryStr,int businessType,String targetTBName) {  
            this.queryStr = queryStr;  
            this.businessType = businessType;  
            this.targetTBName = targetTBName;  
        }  
        @Override  
        public void run() {  
            try {  
                //开始同步  
                launchSyncData();  
            } catch(Exception e){  
                log.error(e);  
                e.printStackTrace();  
            }  
        }  
        //同步数据方法  
        void launchSyncData() throws Exception{  
            // 获得核心库连接  
            Connection coreConnection = connectionFactory.getDMSConnection(4);  
            Statement coreStmt = coreConnection.createStatement();  
            // 获得目标库连接  
            Connection targetConn = connectionFactory.getDMSConnection(businessType);  
            targetConn.setAutoCommit(false);// 设置手动提交  
            PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");  
            ResultSet coreRs = coreStmt.executeQuery(queryStr);  
            log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);  
            int batchCounter = 0; //累加的批处理数量  
            while (coreRs.next()) {  
                targetPstmt.setString(1, coreRs.getString(2));  
                targetPstmt.setString(2, coreRs.getString(3));  
                targetPstmt.setString(3, coreRs.getString(4));  
                targetPstmt.setString(4, coreRs.getString(5));  
                targetPstmt.setString(5, coreRs.getString(6));  
                targetPstmt.addBatch();  
                batchCounter++;  
                currentSynCount.incrementAndGet();//递增  
                if (batchCounter % 10000 == 0) { //1万条数据一提交  
                    targetPstmt.executeBatch();  
                    targetPstmt.clearBatch();  
                    targetConn.commit();  
                }  
            }  
            //提交剩余的批处理  
            targetPstmt.executeBatch();  
            targetPstmt.clearBatch();  
            targetConn.commit();  
            //释放连接   
            connectionFactory.release(targetConn, targetPstmt,coreRs);  
        }  
    }  
}  


  • 大小: 14.5 KB
分享到:
评论
15 楼 superyang 2010-11-25  
用天河一号,应该超快...
14 楼 月落码农 2010-11-25  
sunnylocus 写道
jiayj198609 写道
imacback 写道
为什么要用java导呢,有没有别的解决方案???这个实在是慢,搞数据仓库的不错。

呵呵。。。解决方案有很多的;我也是实际项目赶到这儿了;所以就用java线程模式处理了!

老兄,你怎么随便拿我的文章到处贴,您也在大连银行工作?不过我好象没有见过你



呵呵,既然乱转文章。
13 楼 jiayj198609 2010-11-25  
yjwxfpl 写道
PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");

你同步的表结构都只有五个字段  ,targetPstmt.setString()  类型全都是String?



那具体的表结构当然是看你具体的操作了;我的表只有五个字段;当然就set五个了;难道还要set6个?
12 楼 vivid_gxp 2010-11-25  
这种操作用JAVA?Oracle不光是存储数据用的……
话不多说了,多用用数据库吧。搞开发的也要了解点数据库知识的。
11 楼 jpacm 2010-11-25  
ORACLE中有支持infomix的透明网关吧,通过透明网关建立dblink,然后select一把就可以了。
10 楼 yjwxfpl 2010-11-25  
PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");

你同步的表结构都只有五个字段  ,targetPstmt.setString()  类型全都是String?
9 楼 Mr.China 2010-11-25  
如果没有大字段,可导出为文本格式,再导入。
将informix导出的文本数据导入oracle数据库。
关于数据的导入,使用ORACLE的SQL Loader。

其它工具:
ORACLE Migration bench(支持informix,db2的迁移)。

PS:原来这是转载的?
8 楼 sunnylocus 2010-11-25  
有人转了我的文章,我就顺便说一下,计算每个线程数据传输量的算法有bug,有可能会丢一条数据,把原来的long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));
改成
long ownerRecordNum = ((totalNum % syncThreadNum) == 0) ? (totalNum / syncThreadNum):((totalNum / syncThreadNum) + 1);就OK了
7 楼 sunnylocus 2010-11-25  
jiayj198609 写道
imacback 写道
为什么要用java导呢,有没有别的解决方案???这个实在是慢,搞数据仓库的不错。

呵呵。。。解决方案有很多的;我也是实际项目赶到这儿了;所以就用java线程模式处理了!

老兄,你怎么随便拿我的文章到处贴,您也在大连银行工作?不过我好象没有见过你
6 楼 limengchengg 2010-11-25  
用GoldenGate
最优选择
就是比较贵
5 楼 smithfox 2010-11-25  
Sorry, Informix -> Oracle, DBlink不行
也不会有1) 和 2)的问题,
但是类似问题还是有的
4 楼 smithfox 2010-11-25  
我有几个问题:
1) 有没有考虑CLOB/BLOB字段?
2) 没有考虑过一次性查询过多数据, 很可能会有 "ORA-1013", "ORA-1555"?
3) 如果你所处理的情况是target database肯定是没有数据的(也就是说insert时不会产生主键冲突), 你为什么不尝试用dblink?
3 楼 leemny 2010-11-25  
...没有DBA吧。。。说实话用java辅助导入是程序员最不爱干的,不妨试试语句处理
2 楼 jiayj198609 2010-11-25  
imacback 写道
为什么要用java导呢,有没有别的解决方案???这个实在是慢,搞数据仓库的不错。

呵呵。。。解决方案有很多的;我也是实际项目赶到这儿了;所以就用java线程模式处理了!
1 楼 imacback 2010-11-25  
为什么要用java导呢,有没有别的解决方案???这个实在是慢,搞数据仓库的不错。

相关推荐

    NIO框架Netty实现高性能高并发

    Java异步NIO框架Netty实现高性能高并发无标题笔记 1. 背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨 节点...

    Java_BitSail是一个分布式高性能数据集成引擎,支持批量流和增量场景。BitSail被广泛应用于每天同步数以万.zip

    Java_BitSail是一个分布式高性能数据集成引擎,支持批量流和增量场景。BitSail被广泛应用于每天同步数以万

    从根上理解高性能、高并发(四):深入操作系统,彻底理解同步与异步-其它分享_专项技术区 - 即时通讯开发者社区!.pdf

    文章首先介绍了同步和异步的基本概念,然后深入操作系统,探讨了同步和异步在高并发、高性能服务器中的实现。 知识点1:同步和异步的概念 同步和异步是高并发、高性能服务器中的两个重要概念。同步指的是程序执行...

    java 多线程同步

    不客气地说,创建 java.util.concurrent 的目的就是要实现 Collection 框架对数据结构所执行的并发操作。通过提供一组可靠的、高性能并发构建块,开发人员可以提高并发类的线程安全、可伸缩性、性能、可读性和可靠性...

    JAVA上百实例源码以及开源项目源代码

     Java实现的FTP连接与数据浏览程序,实现实例化可操作的窗口。  部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText(); //得到服务器地址  ...

    java开源包4

    WARTS是一个纯Java数据库工具,可以执行字符编码识别的数据同步。开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的...

    Java整合springboot2.3+modbusTcp协议+netty高性能物联网服务源码

    Java整合springboot2.3+modbusTcp协议+netty高性能物联网服务源码 1、Netty NIO high performance高性能. 2、Modbus Function sync/aync 同步/异步非阻塞。 3、Modbus IoT Data Connector Supports工业物联网平台IoT...

    SeaTunnel 是一个非常易用的支持海量数据实时同步的超高性能分布式数据集成平台

    简单易用,灵活配置,无需开发,实时流式处理,离线多源数据分析,高性能、海量数据处理能力,模块化和插件化,易于扩展,支持利用 SQL 做数据处理和聚合,支持 Spark Structured Streaming,支持 Spark 2.x

    java开源包3

    WARTS是一个纯Java数据库工具,可以执行字符编码识别的数据同步。开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的...

    java开源包11

    WARTS是一个纯Java数据库工具,可以执行字符编码识别的数据同步。开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的...

    java开源包6

    WARTS是一个纯Java数据库工具,可以执行字符编码识别的数据同步。开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的...

    java开源包9

    WARTS是一个纯Java数据库工具,可以执行字符编码识别的数据同步。开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的...

    MyDataHarbor是一个致力于解决异构数据源之间的分布式、高扩展性、高性能、微事务(至少一次保证)、准实时的数据同步中间件

    MyDataHarbor是一个致力于解决异构数据源之间的分布式、高扩展性、高性能、微事务(至少一次保证)、准实时的数据同步中间件。它可以帮助用户可靠、快速、稳定的对海量数据进行准实时增量同步或者定时全量同步,主要...

    java开源包101

    WARTS是一个纯Java数据库工具,可以执行字符编码识别的数据同步。开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的...

    java开源包5

    WARTS是一个纯Java数据库工具,可以执行字符编码识别的数据同步。开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的...

    JAVA上百实例源码以及开源项目

    百度云盘分享 ... Java实现的FTP连接与数据浏览程序,实现实例化可操作的窗口。  部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText();...

    java开源包8

    WARTS是一个纯Java数据库工具,可以执行字符编码识别的数据同步。开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的...

    java开源包10

    WARTS是一个纯Java数据库工具,可以执行字符编码识别的数据同步。开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的...

    java开源包1

    WARTS是一个纯Java数据库工具,可以执行字符编码识别的数据同步。开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的...

Global site tag (gtag.js) - Google Analytics