`

多线程多批量插入大数据

阅读更多

 

参考  https://blog.csdn.net/xunwei0303/article/details/80241340?utm_source=blogxgwz1

 

创建多个线程,每个线程处理一批数据。

 

1. 创建表(mysql)

Sql代码  收藏代码
  1. CREATE TABLE TEST_BATCH_INSERT  
  2. (  
  3.   TEST_ID bigint PRIMARY key,  
  4.   TEST_NAME VARCHAR(100),  
  5.   AGE INT(5),  
  6.   CREATE_TIME DATETIME DEFAULT current_timestamp,  
  7.   UPDATE_TIME DATETIME DEFAULT current_timestamp  
  8. ) comment '测试批量插入';  

 

2. java bean

Java代码  收藏代码
  1. public class TestBatchInsertInfo {  
  2.     private Long testId;  
  3.   
  4.     private String testName;  
  5.   
  6.     private Integer age;  
  7.   
  8.     private Date createTime;  
  9.   
  10.     private Date updateTime;  
  11.   
  12.     // 省略getter/setter  
  13. }  

 

3. dao

Java代码  收藏代码
  1. public interface ITestBatchInsertMapper {  
  2.   
  3.     void batchInsert(List<TestBatchInsertInfo> list);  
  4. }  

 

4. mapper.xml

Xml代码  收藏代码
  1. <insert id="batchInsert" parameterType="java.util.List">  
  2.     INSERT INTO TEST_BATCH_INSERT   
  3.     (  
  4.      TEST_ID, TEST_NAME, AGE, CREATE_TIME, UPDATE_TIME  
  5.     )  
  6.     VALUES  
  7.     <foreach collection="list" item="log" index"index" separator =",">  
  8.         (  
  9.         #{log.testId, jdbcType=NUMERIC}, #{log.testName, jdbcType=VARCHAR}, #{log.age, jdbcType=NUMERIC},   
  10.         sysdate(), sysdate()  
  11.         )  
  12.     </foreach>  
  13.   </insert>  

 

5. 多线程

Java代码  收藏代码
  1. public class TestBatchInsertThread implements Runnable {  
  2.   
  3.     private ITestBatchInsertMapper testBatchInsertMapper;  
  4.   
  5.     /** 数据集合 */  
  6.     private List<TestBatchInsertInfo> list;  
  7.     /** 每个线程处理的起始数据 */  
  8.     private CountDownLatch begin;  
  9.     /** 每个线程处理的结束数据 */  
  10.     private CountDownLatch end;  
  11.   
  12.     public TestBatchInsertThread() {  
  13.     }  
  14.   
  15.     public TestBatchInsertThread(List<TestBatchInsertInfo> list, CountDownLatch begin, CountDownLatch end,  
  16.             ITestBatchInsertMapper testBatchInsertMapper) {  
  17.         this.list = list;  
  18.         this.begin = begin;  
  19.         this.end = end;  
  20.         this.testBatchInsertMapper = testBatchInsertMapper;  
  21.     }  
  22.   
  23.     @Override  
  24.     public void run() {  
  25.         try {  
  26.             if (list != null && !list.isEmpty()) {  
  27.                 testBatchInsertMapper.batchInsert(list);  
  28.             }  
  29.             // 执行完让线程直接进入等待  
  30.             begin.await();  
  31.         } catch (Exception e) {  
  32.             e.printStackTrace();  
  33.         } finally {  
  34.             // 当一个线程执行完 了计数要减一不然这个线程会被一直挂起  
  35.             end.countDown();  
  36.         }  
  37.     }  
  38. }  

 

6. service

多线程处理的方法是 batchInsertByThread;

 

普通批量处理的方法是 batchInsert

 

 

@Service(value = "testBatchInsertService")
public class TestBatchInsertServiceImpl implements ITestBatchInsertService {

    @Autowired
    private ITestBatchInsertMapper testBatchInsertMapper;

    @Override
    @Transactional
    public void batchInsertByThread(List<TestBatchInsertInfo> list) throws Exception {

        if (list == null || list.isEmpty()) {
            return;
        }
        // 一个线程处理300条数据
        int count = 1000;
        // 数据集合大小
        int listSize = list.size();
        // 开启的线程数
        int runSize = (listSize / count) + 1;
        // 存放每个线程的执行数据
        List<TestBatchInsertInfo> newList = null;
        // 创建一个线程池,数量和开启线程的数量一样
        ExecutorService executor = Executors.newFixedThreadPool(runSize);
        // 创建两个个计数器
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(runSize);

        for (int i = 0; i < runSize; i++) {
            /* 计算每个线程执行的数据 */
            if ((i + 1) == runSize) {
                int startIdx = (i * count);
                int endIdx = list.size();

                newList = list.subList(startIdx, endIdx);
            } else {
                int startIdx = (i * count);
                int endIdx = (i + 1) * count;

                newList = list.subList(startIdx, endIdx);
            }
            TestBatchInsertThread thread = new TestBatchInsertThread(newList, begin, end, testBatchInsertMapper);

            executor.execute(thread);
        }
        begin.countDown();
        end.await();

        executor.shutdown();
    }

    @Override
    public void batchInsert(List<TestBatchInsertInfo> list) {

        if (list == null || list.isEmpty()) {
            return;
        }

        List<TestBatchInsertInfo> tempList = new LinkedList<>();

        for (int i = 0; i < list.size(); i++) {
            
            tempList.add(list.get(i));
            
            if (i % 1000 == 0) {
                testBatchInsertMapper.batchInsert(tempList);
                tempList.clear();
            }
        }
        testBatchInsertMapper.batchInsert(tempList);
    }
}

 

7. junit4 测试方法

Java代码  收藏代码
  1. import java.util.LinkedList;  
  2. import java.util.List;  
  3.   
  4. import org.junit.Test;  
  5. import org.junit.runner.RunWith;  
  6. import org.springframework.beans.factory.annotation.Autowired;  
  7. import org.springframework.boot.test.context.SpringBootTest;  
  8. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;  
  9.   
  10. import com.jieshun.springboot.mybatis.MybatisApplication;  
  11. import com.jieshun.springboot.mybatis.bean.po.TestBatchInsertInfo;  
  12. import com.jieshun.springboot.mybatis.service.ITestBatchInsertService;  
  13.   
  14.   
  15. @RunWith(SpringJUnit4ClassRunner.class)  
  16. @SpringBootTest(classes = MybatisApplication.class/*, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT*/)  
  17. public class TestBatchInsertService {  
  18.   
  19.     @Autowired  
  20.     private ITestBatchInsertService testBatchInsertService;  
  21.   
  22.     @Test  
  23.     public void batchInsertByThread() {  
  24.   
  25.         long startTime = System.currentTimeMillis();  
  26.   
  27.         try {  
  28.             List<TestBatchInsertInfo> list = new LinkedList<>();  
  29.   
  30.             TestBatchInsertInfo info = null;  
  31.   
  32.             for (int i = 0; i < 100301; i++) {  
  33.   
  34.                 Integer ig = i;  
  35.   
  36.                 info = new TestBatchInsertInfo();  
  37.                 info.setTestId(ig.longValue());  
  38.                 info.setTestName("test名称_" + i);  
  39.                 info.setAge(i);  
  40.   
  41.                 list.add(info);  
  42.             }  
  43.   
  44.             testBatchInsertService.batchInsertByThread(list);  
  45.   
  46.             System.out.println("------Batch Insert Success------");  
  47.   
  48.         } catch (Exception e) {  
  49.             e.printStackTrace();  
  50.         }  
  51.         System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));  
  52.     }  
  53.   
  54.     @Test  
  55.     public void batchInsert() {  
  56.   
  57.         long startTime = System.currentTimeMillis();  
  58.   
  59.         try {  
  60.             List<TestBatchInsertInfo> list = new LinkedList<>();  
  61.   
  62.             TestBatchInsertInfo info = null;  
  63.   
  64.             for (int i = 0; i < 100301; i++) {  
  65.   
  66.                 Integer ig = i;  
  67.   
  68.                 info = new TestBatchInsertInfo();  
  69.                 info.setTestId(ig.longValue());  
  70.                 info.setTestName("test名称_" + i);  
  71.                 info.setAge(i);  
  72.   
  73.                 list.add(info);  
  74.             }  
  75.   
  76.             testBatchInsertService.batchInsert(list);  
  77.   
  78.             System.out.println("------Batch Insert Success------");  
  79.   
  80.         } catch (Exception e) {  
  81.             e.printStackTrace();  
  82.         }  
  83.         System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));  
  84.   
  85.     }  
  86. }  

 

8. springboot 启动类

Java代码  收藏代码
  1. import org.mybatis.spring.annotation.MapperScan;  
  2. import org.springframework.boot.SpringApplication;  
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;  
  4. import org.springframework.context.annotation.ComponentScan;  
  5. import org.springframework.transaction.annotation.EnableTransactionManagement;  
  6.   
  7. /** 
  8.  * 应用启动类 
  9.  *  
  10.  * @author  
  11.  * @date 2018年10月17日 
  12.  * @since JDK 1.8 
  13.  */  
  14. @SpringBootApplication  
  15. @EnableTransactionManagement  
  16. @ComponentScan(basePackages = { "com.jieshun.springboot.mybatis" })  
  17. @MapperScan(basePackages = { "com.jieshun.springboot.mybatis.dao" })  
  18. public class MybatisApplication {  
  19.   
  20.     public static void main(String[] args) {  
  21.         SpringApplication.run(MybatisApplication.class, args);  
  22.     }  
  23.   
  24. }  

 

 

本文转自:http://xurichusheng.iteye.com/blog/2433024

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics