`
xiangxingchina
  • 浏览: 507790 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

多线程往Oracle数据库里插入数据的优化

阅读更多
昨 天做了一个东西,要实现解析txt文件,然后入库的功能。开始试验了一下单线程插入,速度实在太慢了,半个小时才插入了2W多条数据,后来改用Java的 线程池启用了30个线程,并发的执行,插入100W条数据用了一个多小时。后来又对业务层的事务做了一些调整,每1000条insert之后才提交一次, 一共开了20个线程,最后100W条数据入库一共用了14分钟不到,平均一分钟7.1W条左右。 代码如下:
  1. /**
  2. * 分析Apache日志的定时任务.每天运行数次.
  3. *
  4. * @author <a href="mailto:HL_Qu@hotmail.com">Along</a>
  5. *
  6. * @version $Revision$
  7. *
  8. * @since 2009-2-9
  9. */
  10. public class ApacheLogAnalysisTask {
  11.         /**
  12.          * Logger for this class
  13.          */
  14.         private static final Log logger = LogFactory.getLog(ApacheLogAnalysisTask.class);
  15.        
  16.         //总线程数
  17.         private static final int THREAD_COUNT = 20;
  18.        
  19.         //每个线程插入的日志数
  20.         private static final long LOG_COUNT_PER_THREAD = 1000;
  21.        
  22.         //日志文件的位置
  23.         private static final String LOG_FILE = Property.LOG_FILE_PATH + "formatLog.txt";
  24.        
  25.         private IObjectActionDetailService objectActionDetailService;
  26.        

  27.         public void setObjectActionDetailService(IObjectActionDetailService objectActionDetailService) {
  28.                 this.objectActionDetailService = objectActionDetailService;
  29.         }

  30.         public void execute() {
  31.                 this.multiAnalysisLog();
  32.         }
  33.        
  34.         private void multiAnalysisLog() {
  35.                 ExecutorService exePool = Executors.newFixedThreadPool(THREAD_COUNT);
  36.                
  37.                 FileReader fr = null;
  38.                 BufferedReader br = null;
  39.                 long beginLine = 1;
  40.                 long endLine = 0;
  41.                 String logFileBack = Property.LOG_FILE_PATH + "/formatLog_" + DateUtil.getSystemCurrentDate() + "_" + System.currentTimeMillis() + ".txt";
  42.                
  43.                 try {
  44.                         //文件拷贝出来一个新的,并将原先的删除.
  45.                         FileUtil.copyfile(LOG_FILE, logFileBack, true);
  46.                         FileUtil.deleteFile(LOG_FILE);
  47.                         System.out.println(logFileBack);
  48.                        
  49.                         fr = new FileReader(logFileBack);
  50.                         br = new BufferedReader(fr);
  51.                        
  52.                         while ((br.readLine()) != null){
  53.                                 endLine++;
  54.                                
  55.                                 //每个线程分配固定的行数
  56.                                 if((endLine - beginLine + 1) == LOG_COUNT_PER_THREAD) {
  57.                                         exePool.execute(new AnalysisLogTask(logFileBack, beginLine, endLine));
  58.                                         beginLine = endLine + 1;
  59.                                 }
  60.                         }
  61.                        
  62.                         //最后一个线程
  63.                         if (endLine > beginLine) {
  64.                                 exePool.execute(new AnalysisLogTask(logFileBack, beginLine, endLine));
  65.                         }

  66.                 } catch (FileNotFoundException e) {
  67.                         e.printStackTrace();
  68.                 } catch (IOException e) {
  69.                         e.printStackTrace();
  70.                 } finally {
  71.                         if (br != null) {
  72.                                 try {
  73.                                         br.close();
  74.                                         br = null;
  75.                                 } catch (IOException e) {
  76.                                         if (logger.isErrorEnabled()) {
  77.                                                 logger.error("run()", e);
  78.                                         }

  79.                                         e.printStackTrace();
  80.                                 }
  81.                         }
  82.                        
  83.                         if (fr != null) {
  84.                                 try {
  85.                                         fr.close();
  86.                                         fr = null;
  87.                                 } catch (IOException e) {
  88.                                         if (logger.isErrorEnabled()) {
  89.                                                 logger.error("run()", e);
  90.                                         }

  91.                                         e.printStackTrace();
  92.                                 }
  93.                         }
  94.                        
  95.                         exePool.shutdown();
  96.                         while (true) {
  97.                                 if (exePool.isTerminated()) {
  98.                                         System.out.println("ShutDown");
  99.                                         FileUtil.deleteFile(logFileBack);
  100.                                         break;
  101.                                 }
  102.                         }
  103.                        
  104.                 }
  105.         }
  106.        
  107.         private class AnalysisLogTask implements Runnable {
  108.                 //起始行
  109.                 private long beginLine;
  110.                
  111.                 //结束行
  112.                 private long endLine;
  113.                
  114.                 private String logFilePath;

  115.                 public AnalysisLogTask(String logFilePath, long beginLine, long endLine) {
  116.                         super();
  117.                         this.logFilePath = logFilePath;
  118.                         this.beginLine = beginLine;
  119.                         this.endLine = endLine;
  120.                 }

  121.                 @Override
  122.                 public void run() {
  123.                         FileReader fr = null;
  124.                         BufferedReader br = null;
  125.                         String tempStr = null;
  126.                         String[] tempArray = null;
  127.                         long currentLine = 0;
  128.                         List <ObjectActionDetail> resultList = new ArrayList<ObjectActionDetail>();
  129.                        
  130.                         ObjectActionDetail tempObjectActionDetailVO = null;
  131.                         try {
  132.                                 fr = new FileReader(logFilePath);
  133.                                 br = new BufferedReader(fr);
  134.                                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  135.                                
  136.                                 //跳过前置的行数
  137.                                 if (beginLine != 1) {
  138.                                         while ((currentLine < (beginLine-1)) && br.readLine() != null) {
  139.                                                 ++currentLine;
  140.                                         }
  141.                                 }
  142.                                
  143.                                 while ((tempStr = br.readLine()) != null && currentLine++ < endLine) {
  144.                                         tempArray = tempStr.split("\t");
  145.                                         tempObjectActionDetailVO = new ObjectActionDetail();

  146.                                         tempObjectActionDetailVO.setIp(tempArray[0]);
  147.                                         tempObjectActionDetailVO.setActionTime(sdf.parse(tempArray[1]));
  148.                                         tempObjectActionDetailVO.setSrcObjTypeId(Integer.parseInt(tempArray[2]));
  149.                                         tempObjectActionDetailVO.setSrcObjId(Integer.parseInt(tempArray[3]));
  150.                                         tempObjectActionDetailVO.setTarObjTypeId(Integer.parseInt(tempArray[4]));
  151.                                         tempObjectActionDetailVO.setTarObjId(Integer.parseInt(tempArray[5]));
  152.                                         tempObjectActionDetailVO.setActionId(Integer.parseInt(tempArray[6]));
  153.                                        
  154.                                         tempObjectActionDetailVO.setScore(0);
  155.                                         tempObjectActionDetailVO.setStatus(1);
  156.                                        
  157.                                         resultList.add(tempObjectActionDetailVO);
  158.                                 }
  159.                                
  160.                                 logger.info("Thread:" + Thread.currentThread().getName() + "   beginLine=" + beginLine + "   endLine=" + endLine);
  161.                                
  162.                                 objectActionDetailService.insertObjectActionDetailBatch(resultList);
  163.                         } catch (FileNotFoundException e) {
  164.                                 if (logger.isErrorEnabled()) {
  165.                                         logger.error("run()", e);
  166.                                 }

  167.                                 e.printStackTrace();
  168.                         } catch (IOException e) {
  169.                                 if (logger.isErrorEnabled()) {
  170.                                         logger.error("run()", e);
  171.                                 }

  172.                                 e.printStackTrace();
  173.                         } catch (ParseException e) {
  174.                                 if (logger.isErrorEnabled()) {
  175.                                         logger.error("run()", e);
  176.                                 }

  177.                                 e.printStackTrace();
  178.                         } finally {
  179.                                 if (br != null) {
  180.                                         try {
  181.                                                 br.close();
  182.                                                 br = null;
  183.                                         } catch (IOException e) {
  184.                                                 if (logger.isErrorEnabled()) {
  185.                                                         logger.error("run()", e);
  186.                                                 }

  187.                                                 e.printStackTrace();
  188.                                         }
  189.                                 }
  190.                                
  191.                                 if (fr != null) {
  192.                                         try {
  193.                                                 fr.close();
  194.                                                 fr = null;
  195.                                         } catch (IOException e) {
  196.                                                 if (logger.isErrorEnabled()) {
  197.                                                         logger.error("run()", e);
  198.                                                 }

  199.                                                 e.printStackTrace();
  200.                                         }
  201.                                 }
  202.                         }
  203.                 }
  204.         }
  205. }

今天又试验了一下先将整个文件拆分成小的单个文件,然后每个线程再解析自己的文件。测试后感觉这样的效果不好,明显不如多线程从一个文章读数据好,27分钟插入了100W条数据,平均一分钟3.7W条左右。怀疑是多线程读取文件,本地磁盘的I/O受限导致性能低下。 代码如下:

  1. /**
  2. * 分析Apache日志的定时任务.每天运行数次.
  3. *
  4. * @author <a href="mailto:HL_Qu@hotmail.com">Along</a>
  5. *
  6. * @version $Revision$
  7. *
  8. * @since 2009-2-9
  9. */
  10. public class ApacheLogAnalysisTask {
  11.         /**
  12.          * Logger for this class
  13.          */
  14.         private static final Log logger = LogFactory.getLog(ApacheLogAnalysisTask.class);
  15.        
  16.         //总线程数
  17.         private static final int THREAD_COUNT = 10;
  18.        
  19.         //每个线程插入的日志数
  20.         private static final long LOG_COUNT_PER_THREAD = 3000;
  21.        
  22.         //日志文件的位置
  23.         private static final String LOG_FILE = Property.LOG_FILE_PATH + "/formatLog.txt";
  24.        
  25.         private IObjectActionDetailService objectActionDetailService;
  26.        

  27.         public void setObjectActionDetailService(IObjectActionDetailService objectActionDetailService) {
  28.                 this.objectActionDetailService = objectActionDetailService;
  29.         }

  30.         public void execute() {
  31.                 this.multiAnalysisLog();
  32.         }
  33.        
  34.         private void multiAnalysisLog() {
  35.                 ExecutorService exePool = Executors.newFixedThreadPool(THREAD_COUNT);
  36.                
  37.                 FileReader fr = null;
  38.                 FileWriter fw = null;
  39.                 BufferedReader br = null;
  40.                 int threadCount = 0;
  41.                 long tempLineCount = 0;
  42.                 String tempReadLineStr = null;
  43.                 long now = System.currentTimeMillis();
  44.                 String logFileBackFile = Property.LOG_FILE_PATH + "/old/formatLog_" + DateUtil.getSystemCurrentDate() + "_" + now + ".txt";
  45.                 String logFilePerThreadName = Property.LOG_FILE_PATH + "/old/formatLog_" + DateUtil.getSystemCurrentDate() + "_" + now;
  46.                
  47.                 try {
  48.                         //文件拷贝出来一个新的,并将原先的删除.
  49.                         FileUtil.copyfile(LOG_FILE, logFileBackFile, true);
  50.                         //FileUtil.deleteFile(LOG_FILE);
  51.                        
  52.                         fr = new FileReader(logFileBackFile);
  53.                         br = new BufferedReader(fr);
  54.                         fw = new FileWriter(logFilePerThreadName + "_" + ++threadCount + ".txt");
  55.                        
  56.                         while ((tempReadLineStr = br.readLine()) != null){
  57.                                 tempLineCount++;
  58.                                 fw.append(tempReadLineStr).append("\r\n");
  59.                                
  60.                                 //每个线程分配固定的行数
  61.                                 if(tempLineCount == LOG_COUNT_PER_THREAD) {
  62.                                         fw.flush();
  63.                                         fw.close();
  64.                                         exePool.execute(new AnalysisLogTask(logFilePerThreadName + "_" + threadCount + ".txt"));

  65.                                         //创建新的文件,临时变量清零.
  66.                                         fw = new FileWriter(logFilePerThreadName + "_" + ++threadCount + ".txt");
  67.                                         tempLineCount = 0;
  68.                                 }
  69.                         }
  70.                        
  71.                         //最后一个线程有文件则写入执行,没有,则删除最后一个建立的文件.
  72.                         if (tempLineCount != 0) {
  73.                                 fw.flush();
  74.                                 fw.close();
  75.                                 exePool.execute(new AnalysisLogTask(logFilePerThreadName + "_" + threadCount + ".txt"));
  76.                         } else {
  77.                                 fw.flush();
  78.                                 fw.close();
  79.                                
  80.                                 FileUtil.deleteFile(logFilePerThreadName + "_" + threadCount + ".txt");
  81.                         }

  82.                 } catch (FileNotFoundException e) {
  83.                         e.printStackTrace();
  84.                 } catch (IOException e) {
  85.                         e.printStackTrace();
  86.                 } finally {
  87.                         if (br != null) {
  88.                                 try {
  89.                                         br.close();
  90.                                         br = null;
  91.                                 } catch (IOException e) {
  92.                                         if (logger.isErrorEnabled()) {
  93.                                                 logger.error("run()", e);
  94.                                         }

  95.                                         e.printStackTrace();
  96.                                 }
  97.                         }
  98.                        
  99.                         if (fr != null) {
  100.                                 try {
  101.                                         fr.close();
  102.                                         fr = null;
  103.                                 } catch (IOException e) {
  104.                                         if (logger.isErrorEnabled()) {
  105.                                                 logger.error("run()", e);
  106.                                         }

  107.                                         e.printStackTrace();
  108.                                 }
  109.                         }
  110.                        
  111.                         FileUtil.deleteFile(logFileBackFile);
  112.                         logger.info("File has deleted:" + logFileBackFile);
  113.                        
  114.                         exePool.shutdown();
  115.                         //判断是不是所有的任务都执行完毕,执行完删除日志文件.
  116.                         while (true) {
  117.                                 if (exePool.isTerminated()) {
  118.                                         logger.info("All task has shutdown.");
  119.                                         break;
  120.                                 }
  121.                         }
  122.                        
  123.                 }
  124.         }
  125.        
  126.         private class AnalysisLogTask implements Runnable {

  127.                 //每个线程要处理的日志文件
  128.                 private String logFilePath;

  129.                 public AnalysisLogTask(String logFilePath) {
  130.                         super();
  131.                         this.logFilePath = logFilePath;
  132.                 }

  133.                 @Override
  134.                 public void run() {
  135.                         logger.info("Thread:" + Thread.currentThread().getName() + " running.");
  136.                        
  137.                         FileReader fr = null;
  138.                         BufferedReader br = null;
  139.                         String tempStr = null;
  140.                         String[] tempArray = null;
  141.                         List <ObjectActionDetail> resultList = new ArrayList<ObjectActionDetail>();
  142.                        
  143.                         ObjectActionDetail tempObjectActionDetailVO = null;
  144.                         try {
  145.                                 fr = new FileReader(logFilePath);
  146.                                 br = new BufferedReader(fr);
  147.                                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  148.                                
  149.                                 while ((tempStr = br.readLine()) != null) {
  150.                                         tempArray = tempStr.split("\t");
  151.                                         tempObjectActionDetailVO = new ObjectActionDetail();

  152.                                         tempObjectActionDetailVO.setIp(tempArray[0]);
  153.                                         tempObjectActionDetailVO.setActionTime(sdf.parse(tempArray[1]));
  154.                                         tempObjectActionDetailVO.setSrcObjTypeId(Integer.parseInt(tempArray[2]));
  155.                                         tempObjectActionDetailVO.setSrcObjId(Integer.parseInt(tempArray[3]));
  156.                                         tempObjectActionDetailVO.setTarObjTypeId(Integer.parseInt(tempArray[4]));
  157.                                         tempObjectActionDetailVO.setTarObjId(Integer.parseInt(tempArray[5]));
  158.                                         tempObjectActionDetailVO.setActionId(Integer.parseInt(tempArray[6]));
  159.                                        
  160.                                         tempObjectActionDetailVO.setScore(0);
  161.                                         tempObjectActionDetailVO.setStatus(1);
  162.                                        
  163.                                         resultList.add(tempObjectActionDetailVO);
  164.                                 }
  165.                                 objectActionDetailService.insertObjectActionDetailBatch(resultList);
  166.                         } catch (FileNotFoundException e) {
  167.                                 if (logger.isErrorEnabled()) {
  168.                                         logger.error("run()", e);
  169.                                 }

  170.                                 e.printStackTrace();
  171.                         } catch (IOException e) {
  172.                                 if (logger.isErrorEnabled()) {
  173.                                         logger.error("run()", e);
  174.                                 }

  175.                                 e.printStackTrace();
  176.                         } catch (ParseException e) {
  177.                                 if (logger.isErrorEnabled()) {
  178.                                         logger.error("run()", e);
  179.                                 }

  180.                                 e.printStackTrace();
  181.                         } finally {
  182.                                 if (br != null) {
  183.                                         try {
  184.                                                 br.close();
  185.                                                 br = null;
  186.                                         } catch (IOException e) {
  187.                                                 if (logger.isErrorEnabled()) {
  188.                                                         logger.error("run()", e);
  189.                                                 }

  190.                                                 e.printStackTrace();
  191.                                         }
  192.                                 }
  193.                                
  194.                                 if (fr != null) {
  195.                                         try {
  196.                                                 fr.close();
  197.                                                 fr = null;
  198.                                         } catch (IOException e) {
  199.                                                 if (logger.isErrorEnabled()) {
  200.                                                         logger.error("run()", e);
  201.                                                 }

  202.                                                 e.printStackTrace();
  203.                                         }
  204.                                 }
  205.                                
  206.                                 //删除本线程负责的日志文件
  207.                                 FileUtil.deleteFile(logFilePath);
  208.                                 logger.info("Thread:" + Thread.currentThread().getName() + "   logFilePath has deleted:" + logFilePath);
  209.                                 logger.info("Thread:" + Thread.currentThread().getName() + " shutdown.");
  210.                         }
  211.                 }
  212.         }
  213. }

后来又找系统管理员优化了一下网络,现在数据入库的速度是100W条7分钟 。相信应用和数据库移动到生产环境中,性能会进一步提升。

分享到:
评论

相关推荐

    Springboot Druid多数据源 多线程

    一个基于Springboot的小项目,采用Druid多数据源的设计,可以同时操作Mysql与Oracle数据库,配置了多线程处理任务,为刚接触springboot的朋友提供参考。

    oracle数据库dba管理手册

    4.4.3 具有历史数据的产品OLTP数据库 设计 71 4.4.4 数据仓库设计 72 4.5 文件位置 75 4.6 数据库空间使用概述 76 4.6.1 storage子句的意义 77 4.6.2 表段 78 4.6.3 索引段 79 4.6.4 回滚段 79 4.6.5 临时段 79 ...

    java快速插入千万级数据

    java快速插入千万级数据,亲测91秒插入1700万数据!!!

    oracle数据库经典题目

    在Oracle数据库中,数据库的操作模式分为专用服务器(DELICATED SERVER)模式和多线程服务器(MULTITHREADED SERVER)模式两种。其中,在专用服务器模式中为每个用户进程创建一个服务器进程,用户进程与服务器进程之间...

    ORACLE9i_优化设计与系统调整

    第8章ORACLE数据库系统优化安装 91 §7.1 应用系统环境规划和Oracle系统安装考虑 91 §7.1.1 操作系统安装考虑 91 §7.1.2 Oracle系统安装考虑 92 §7.2 关于创建多个Oracle实例问题 93 §7.3 Oracle系统安装后的...

    一个oracle客户端(oracle sql handler)

    多线程、多连接、智能/彩色SQL编辑器、中英文双语界面并能切换、支持 PL/SQL、批量SQL运行、高效的块操作、方便的表格操作,不需要安装 Oracle 客户端,能运行于所有的主流平台包括 Windows、Linux 、Unix 及 Mac ...

    jdbc连接程序实现不同数据库数据的迁移

    采用jdbc连接数据库,将两种不同类型的数据(SqlServer、Oracle)进行转换,来完成数据库数据的迁移。 实现方式,查询SqlServer数据,批处理插入到Oracle中。

    数据库系统的核心.doc

    常见的数据库系统有以下几种: MySQL MySQL是一个快速的、多线程、多用户和健壮的SQL数据库服务器。MySQL服务器支持 关键任务、重负载生产系统的使用,也可以将它嵌入到一个大配置(mass- deployed)的软件中去。 SQL...

    实例讲解Java批量插入、更新数据

    片文章介绍了一个Java批量添加数据,多个字段同时添加多条数据具体实例,面向的是Oracle数据库,需要的朋友可以参考下

    BaiduBaikeSpider:百度百科多线程爬虫Java源码,数据存储采用了Oracle11g

    百度百科多线程爬虫Java源码,数据存储采用了Oracle11g 简介 采用了MyEclipes作为集成开发环境,应该是兼容的eclips 使用方法 下载此原始代码后使用(导入或导入)操作导入此项目 各个类介绍 HtmlDAO.java 主要是...

    Oracle SQL Handler (Oracle客户端工具) V3.1

    本工具是用 Java 开发的、专门用于 Oracle 数据库操作的一种图形界面工具: 多线程、多连接、支持 PL/SQL、功能实用、操作简便,能运行于所有平台包括 Windows、Linux 及 Unix,勿需安装 Oracle 客户端,仅 2.8 MB ...

    数据库系统的核心是什么(2).docx

    常见数据库系统: MySQL MySQL是一个快速的、多线程、多用户和健壮的SQL数据库服务器。MySQL服务器支持关键任务、重负载生产系统的使用,也可以将它嵌入到一个大配置(mass- deployed)的软件中去。 SQL Server SQL ...

    震撼推出超方便实用的Oracle开发工具 - Oracle SQL Handler,双语界面,智能SQL编辑器,免装Oracle客户端,能运行于Windows, 双语界面

    将 SELECT 语句的查询结果显示在工作表,可以直接对查询结果进行再操作,如 修改、插入行、删除行、提 交(将表格中的数据改变写入相应的数据库表中)、多功能拷贝、将选择的单元格数据导出为 XLS /CSV /INSERT SQL ...

    Oracle开发工具 - Oracle SQL Handler(功能强大,超方便好用, 免装客户端, Windows / Linux)

    将 SELECT 语句的查询结果显示在工作表,可以直接对查询结果进行再操作,如 修改、 插入行、删除行、提交(将表格中的数据改变写入相应的数据库表中)、多功能拷贝、将选 择的单元格数据导出为 XLS /CSV /INSERT SQL...

    orcale常用命令

    Oracle数据库有哪几种启动方式 说明: 有以下几种启动方式: 1、startup nomount 非安装启动,这种方式启动下可执行:重建控制文件、重建数据库 读取init.ora文件,启动instance,即启动SGA和后台进程,这种启动...

    Oracle2Mysql:可以将oracle sql dump转换为mysql兼容sql的软件

    所有文件都是通过多线程过程创建和转换的,从而使其速度更快将所有sql存储在Folder AllSql中(在每次转换时生成并清理)必需的MariaDb 10x和更多NetCore3.1或更高导航(比插入sql更好的phpmyadmin)怎么做首先对您的...

    mysql数据库my.cnf配置文件

    # 如果线程重新被请求,那么请求将从缓存中读取,如果缓存中是空的或者是新的请求,那么这个线程将被重新创建,如果有很多新的线程, # 增加这个值可以改善系统性能.通过比较Connections和Threads_created状态的变量,...

    PLSQL Developer 10.0.4.1708〖附中文补丁和注册机〗

    PL/SQL Developer是一个集成开发环境,专门面向Oracle数据库存储程序单元的开发。如今,有越来越多的商业逻辑和应用逻辑转向了Oracle Server,因此,PL/SQL编程也成了整个开发过程的一个重要组成部分。PL/SQL ...

    pl/sql developer 9 + 注册机

    PL/SQL Developer是一个集成开发环境,专门面向Oracle数据库存储程序单元的开发。如今,有越来越多的商业逻辑和应用逻辑转向了Oracle Server,因此,PL/SQL编程也成了整个开发过程的一个重要组成部分。PL/SQL ...

Global site tag (gtag.js) - Google Analytics