`
sunnylocus
  • 浏览: 869630 次
  • 性别: Icon_minigender_1
  • 来自: 美国图森
社区版块
存档分类
最新评论

经验总结:高性能的数据同步

    博客分类:
  • Java
阅读更多

     最近在做一个银行的生产数据脱敏系统,今天写代码时遇到了一个“瓶颈”,脱敏系统需要将生产环境上Infoxmix里的数据原封不动的Copy到另一台Oracle数据库服务器上,然后对Copy后的数据作些漂白处理。为了将人为干预的因素降到最低,在系统设计时采用Java代码对数据作Copy,思路如图

    首先在代码与生产库间建立一个Connection,将读取到的数据放在ResultSet对象,然后再与开发库建立一个Connection。从ResultSet取出数据后通过TestConnection插入到开发库,以此来实现Copy。代码写完后运行程序,速度太慢了,一秒钟只能Copy一千条数据,生产库上有上亿条数据,按照这个速度同步完要到猴年马月呀,用PreparedStatement批处理速度也没有提交多少。我想能不能用多线程处理,多个人干活总比一个人干活速度要快。

    假设生产库有1万条数据,我开5个线程,每个线程分2000条数据,同时向开发库里插数据,Oracle支持高并发这样的话速度至少会提高好多倍,按照这个思路重新进行了编码,批处理设置为1万条一提交,统计插入数量的变量使用java.util.concurrent.atomic.AtomicLong,程序一运行,传输速度飞快CPU利用率在70%~90%,现在一秒钟可以拷贝50万条记录,没过几分钟上亿条数据一条不落地全部Copy到目标库。

 

在查询的时候我用了如下语句

String queryStr = "SELECT * FROM xx";

ResultSet coreRs = PreparedStatement.executeQuery(queryStr);

实习生问如果xx表里有上千万条记录,你全部查询出来放到ResultSet,那内存不溢出了么?Java在设计的时候已经考虑到这个问题了,并没有查询出所有的数据,而是只查询了一部分数据放到ResultSet,数据“用完”它会自动查询下一批数据,你可以用setFetchSize(int rows)方法设置一个建议值给ResultSet,告诉它每次从数据库Fetch多少条数据。但我不赞成,因为JDBC驱动会根据实际情况自动调整Fetch的数量。另外性能也与网线的带宽有直接的关系。

相关代码

package com.dlbank.domain;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

/**
 *<p>title: 数据同步类 </p>  
 *<p>Description: 该类用于将生产核心库数据同步到开发库</p>  
 *@author Tank Zhang 
 */
public class CoreDataSyncImpl implements CoreDataSync {
	
	private List<String> coreTBNames; //要同步的核心库表名
	private ConnectionFactory connectionFactory;
	private Logger log = Logger.getLogger(getClass());
	
	private AtomicLong currentSynCount = new AtomicLong(0L); //当前已同步的条数
	
	private int syncThreadNum;  //同步的线程数

	@Override
	public void syncData(int businessType) throws Exception {
		
		for (String tmpTBName : coreTBNames) {
			log.info("开始同步核心库" + tmpTBName + "表数据");
			// 获得核心库连接
			Connection coreConnection = connectionFactory.getDMSConnection(4);
			Statement coreStmt = coreConnection.createStatement();
			//为每个线程分配结果集
			ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);
			coreRs.next();
			//总共处理的数量
			long totalNum = coreRs.getLong(1);
			//每个线程处理的数量
			long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum)); 
			log.info("共需要同步的数据量:"+totalNum);
			log.info("同步线程数量:"+syncThreadNum);
			log.info("每个线程可处理的数量:"+ownerRecordNum);
			// 开启五个线程向目标库同步数据
			for(int i=0; i < syncThreadNum; i ++){
				StringBuilder sqlBuilder = new StringBuilder();
				//拼装后SQL示例
				//Select * From dms_core_ds Where id between 1 And 657398
				//Select * From dms_core_ds Where id between 657399 And 1314796
				//Select * From dms_core_ds Where id between 1314797 And 1972194
				//Select * From dms_core_ds Where id between 1972195 And 2629592
				//Select * From dms_core_ds Where id between 2629593 And 3286990
				//..
				sqlBuilder.append("Select * From ").append(tmpTBName)
						.append(" Where id between " ).append(i * ownerRecordNum +1)
						.append( " And ")
						.append((i * ownerRecordNum + ownerRecordNum));
				Thread workThread = new Thread(
						new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));
				workThread.setName("SyncThread-"+i);
				workThread.start();
			}
			while (currentSynCount.get() < totalNum);
			//休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作);
			//Thread.sleep(1000 * 3);
			log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据");
		}
	}// end for loop
	
	public void setCoreTBNames(List<String> coreTBNames) {
		this.coreTBNames = coreTBNames;
	}

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
	}
	
	public void setSyncThreadNum(int syncThreadNum) {
		this.syncThreadNum = syncThreadNum;
	}
	
	//数据同步线程
	final class WorkerHandler implements Runnable {
		ResultSet coreRs;
		String queryStr;
		int businessType;
		String targetTBName;
		public WorkerHandler(String queryStr,int businessType,String targetTBName) {
			this.queryStr = queryStr;
			this.businessType = businessType;
			this.targetTBName = targetTBName;
		}
		@Override
		public void run() {
			try {
				//开始同步
				launchSyncData();
			} catch(Exception e){
				log.error(e);
				e.printStackTrace();
			}
		}
		//同步数据方法
		void launchSyncData() throws Exception{
			// 获得核心库连接
			Connection coreConnection = connectionFactory.getDMSConnection(4);
			Statement coreStmt = coreConnection.createStatement();
			// 获得目标库连接
			Connection targetConn = connectionFactory.getDMSConnection(businessType);
			targetConn.setAutoCommit(false);// 设置手动提交
			PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");
			ResultSet coreRs = coreStmt.executeQuery(queryStr);
			log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);
			int batchCounter = 0; //累加的批处理数量
			while (coreRs.next()) {
				targetPstmt.setString(1, coreRs.getString(2));
				targetPstmt.setString(2, coreRs.getString(3));
				targetPstmt.setString(3, coreRs.getString(4));
				targetPstmt.setString(4, coreRs.getString(5));
				targetPstmt.setString(5, coreRs.getString(6));
				targetPstmt.addBatch();
				batchCounter++;
				currentSynCount.incrementAndGet();//递增
				if (batchCounter % 10000 == 0) { //1万条数据一提交
					targetPstmt.executeBatch();
					targetPstmt.clearBatch();
					targetConn.commit();
				}
			}
			//提交剩余的批处理
			targetPstmt.executeBatch();
			targetPstmt.clearBatch();
			targetConn.commit();
			//释放连接 
			connectionFactory.release(targetConn, targetPstmt,coreRs);
		}
	}
}

 

6
1
分享到:
评论
6 楼 cfying 2016-09-18  
不会有锁表的现象吗?楼主
5 楼 xiaokang1582830 2012-06-27  
这代码难道没问题,CoreDataSync是哪来的接口
4 楼 sunnylocus 2011-10-13  
cczakai 写道
感觉楼主的多线程理解有点偏差。



如果一条线程下来,1秒copy1000条数据,1分钟6000条,确实很难完成上亿条数据的任务量。



像楼主处理,应该是超线程CPU模式。


这里有篇文章:

随着计算机技术的发展,编程模型也越来越复杂多样化。但多线程编程模型是目前计算机系统架构的最终模型。随着CPU主频的不断攀升,X86架构的硬件已经成为瓶,在这种架构的CPU主频最高为4G。事实上目前3.6G主频的CPU已经接近了顶峰。
  如果不能从根本上更新当前CPU的架构(在很长一段时间内还不太可能),那么继续提高CPU性能的方法就是超线程CPU模式。那么,作业系统、应用程序要发挥CPU的最大性能,就是要改变到以多线程编程模型为主的并行处理系统和并发式应用程序。

http://hi.baidu.com/baixuejiyi1111/blog/item/1eee6ff50bd8d042342acc02.html



哇,好东西呀,多谢!
3 楼 cczakai 2011-10-13  
感觉楼主的多线程理解有点偏差。



如果一条线程下来,1秒copy1000条数据,1分钟6000条,确实很难完成上亿条数据的任务量。



像楼主处理,应该是超线程CPU模式。


这里有篇文章:

随着计算机技术的发展,编程模型也越来越复杂多样化。但多线程编程模型是目前计算机系统架构的最终模型。随着CPU主频的不断攀升,X86架构的硬件已经成为瓶,在这种架构的CPU主频最高为4G。事实上目前3.6G主频的CPU已经接近了顶峰。
  如果不能从根本上更新当前CPU的架构(在很长一段时间内还不太可能),那么继续提高CPU性能的方法就是超线程CPU模式。那么,作业系统、应用程序要发挥CPU的最大性能,就是要改变到以多线程编程模型为主的并行处理系统和并发式应用程序。

http://hi.baidu.com/baixuejiyi1111/blog/item/1eee6ff50bd8d042342acc02.html


2 楼 sunnylocus 2010-11-26  
3
yuhui0531 写道
顺便说一下,计算每个线程数据传输量的算法有bug,有可能会丢一条数据,把原来的long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));
改成 long ownerRecordNum = ((totalNum % syncThreadNum) == 0) ? (totalNum / syncThreadNum):((totalNum / syncThreadNum) + 1);就OK了

这个是我在论坛里的回复
1 楼 yuhui0531 2010-11-26  
顺便说一下,计算每个线程数据传输量的算法有bug,有可能会丢一条数据,把原来的long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));
改成 long ownerRecordNum = ((totalNum % syncThreadNum) == 0) ? (totalNum / syncThreadNum):((totalNum / syncThreadNum) + 1);就OK了

相关推荐

    Linux高性能服务器编程

    内容简介《Linux高性能服务器编程》是Linux服务器编程领域的经典著作,由资深Linux软件开发工程师撰写,从网络协议、服务器编程核心要素、原理机制、工具框架等多角度全面阐释了编写高性能Linux服务器应用的方法、...

    集群好书《高性能Linux服务器构建实战》 试读章节下载

    由国内著名技术社区联合推荐的2012年IT技术力作:《高性能Linux服务器构建实战:运维监控、性能调优与集群应用》,即将上架发行,此书从Web应用、数据备份与恢复、网络存储应用、运维监控与性能优化、集群高级应用等...

    数据仓库数据仓库和我们经常提到的数据库有哪些区别

    随着人们对大型数据系统研究、管理、维护等方面的深刻识认和不断完善,在总结、丰富、集中多行企业信息的经验之后,为数据仓库给出了更为精确的定义,即“数据仓库是在企业管理和决策中面向主题的、集成的、与时间...

    MySQL管理之道 性能调优、高可用与监控.part2.rar

    书中内容以实战为导向,所有内容均来自于笔者多年实践经验的总结和对新知识的拓展,同时也针对运维人员、dba等相关工作者会遇到的有代表性的疑难问题给出了实用的情景模拟,并给出了解决方案。不论你目前有没有遇到...

    数据仓库的概念及特点

    随着人们对大型数据系统研究、管理、维护等方面的深刻识认和不断完善,在总结、丰富、集中多行企业信息的经验之后,为数据仓库给出了更为精确的定义,即“数据仓库是在企业管理和决策中面向主题的、集成的、与时间...

    2021 云和恩墨大讲堂PPT汇总(50份).zip

    2021 云和恩墨大讲堂PPT汇总,共50份。...Oracle技术加油站:快速处理紧急性能问题的工具与经验 Oracle诊断性能问题时常用脚本工具 PostgreSQL日常工作分享 PostgreSQL实践分享 wls、was中间件故障基本分析介绍

    sqlserver数据库大型应用解决方案经验总结

    随着互联网应用的广泛普及,海量数据的存储和访问成为了系统设计的瓶颈问题。对于一个大型的互联网应用,每天百万级甚至上亿的PV无疑对数据库造成了相当高的负载。对于系统的稳定性和扩展性造成了极大的问题。 一、...

    (重要)AIX command 使用总结.txt

    AIX常用命令://查看机器序列号,IBM的基本信息都可以通过该命令查询得到 #prtconf #oslevel -r == uname -a //操作系统版本 #oslevel //查看操作系统版本ex :5.1.0.0 #oslevel -r //ex:5100-04 == oslevel -q ...

    2017最新老男孩MySQL高级专业DBA实战课程全套【清晰不加密】,看完教程月入40万没毛病

    04-主从同步的应用场景及切换从库不丢数据多方案介绍.avi 05-mysql数据库读写分离介绍及企业生产实现方案.avi 06-根据企业业务拆分业务应用到不同的从库思想.avi 07-mysql主从复制原理画图深入浅出讲解.avi 08-mysql...

    开涛高可用高并发-亿级流量核心技术

    16.3 我们的性能数据 327 16.4 单品页流量特点 327 16.5 单品页技术架构发展 327 16.5.1 架构1.0 328 16.5.2 架构2.0 328 16.5.3 架构3.0 330 16.6 详情页架构设计原则 332 16.6.1 数据闭环 332 16.6.2 数据维度化 ...

    从程序员到CTO大牛企业内部PDF与PPT合集.zip

    Go在大数据开发中的经验总结 基于Go构建滴滴核心业务平台的实践 京东分布式K-V存储设计与挑战 去哪网数据库架构发展历程 58速运数据库降压优化实践 云时代的数据库演变之路 阿里万亿级数据洪峰下的消息引擎 小米生态...

    深入理解Android:卷I--详细书签版

    ,以及高性能网络服务器和多核并行开发等也有一定的研究。 目录 封面 -17 封底 489 扉页 -16 版权 -15 推荐序 -14 前言 -12 致谢 -9 目录 -7 第1章 阅读前的准备工作 1 1.1 系统架构 2 1.1.1 Android系统架构 2 ...

    SuperMap Deskpro 2008 5.35 安装文件包

    广征需求,从中筛选,确定产品目标,之后经过了需求设计、接口及界面设计、开发、验证等过程,一次次精益求精,让我们在塑造更完美产品的同时,规范化了产品的整个过程,也让项目组团队总结了经验,得到了成长。...

    asp.net知识库

    通过作业,定时同步两个数据库 SQLSERVER高级注入技巧 利用反射实现ASP.NET控件和数据实体之间的双向绑定,并且在客户端自动验证输入的内容是否合法 asp.net报表解决方法 SQLDMO类的使用 SQL过程自动C#封装,支持从表到...

    云端CRM客户关系管理系统免费版v5.28.rar

    【性能稳定】5年行业经验、帮助3万家企业用户实施信息化办公管理,品牌值得信赖。 【适用范围】集团企业、中型企业、小型企业、微型企业、创业公司、工作室等。 【性价比高】一次购买、终身使用、永久升级、用户不限...

    财务软件调研报告.doc

    ---丰富的中小企业信息化实践经验 ·金蝶KIS独特优势之三 ----高性能价格比 ·金蝶拥有20多万客户,其中接近15万客户是中小企业 ·金蝶和其客户经历了从小企业成长为中型企业和继续成长的过程 ·金蝶的解决方案也...

    测试培训教材

     -测试总结和报告。 一个好的测试管理工具应该能把以上几个阶段都管理起来。 测试人员每时每刻都在度量别人的工作成果,而测试人员的工作成果又由谁来度量呢?度量的标准和依据是什么呢?软件测试的度量是测试...

    C#微软培训资料

    第十八章 高 级 话 题 .235 18.1 注册表编程 .235 18.2 在 C #代码中调用 C++和 VB 编写的组件 .240 18.3 版 本 控 制 .249 18.4 代 码 优 化 .252 18.5 小 结 .254 第五部分 附 录 .255 附录 A 关 键 ...

    Visual C++ 2005入门经典--源代码及课后练习答案

    Horton拥有丰富的教学经验(教学内容包括C、C++、Fortran、PL/1、APL等),同时还是机械、加工和电子CAD系统、机械CAM系统和DNC/CNC系统方面的专家。Ivor Horton还著有Beginning Visual C++ 6、Beginning C ...

    Visual C++ 2010入门经典(第5版)--源代码及课后练习答案

    大家一致认为,他的著作独具风格,无论是编程新手,还是经验丰富的编程人员,都很容易理解其内容。在个人实践中,Ivor Horton也是一名系统顾问。他从事程序设计教学工作已经超过了25年。  苏正泉,1995年毕业于解放...

Global site tag (gtag.js) - Google Analytics