`
xurichusheng
  • 浏览: 335802 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

多线程多批量插入大数据

阅读更多

 

 

参考  https://blog.csdn.net/xunwei0303/article/details/80241340?utm_source=blogxgwz1

 

创建多个线程,每个线程处理一批数据。

 

1. 创建表(mysql)

CREATE TABLE TEST_BATCH_INSERT
(
  TEST_ID bigint PRIMARY key,
  TEST_NAME VARCHAR(100),
  AGE INT(5),
  CREATE_TIME DATETIME DEFAULT current_timestamp,
  UPDATE_TIME DATETIME DEFAULT current_timestamp
) comment '测试批量插入';

 

2. java bean

public class TestBatchInsertInfo {
    private Long testId;

    private String testName;

    private Integer age;

    private Date createTime;

    private Date updateTime;

    // 省略getter/setter
}

 

3. dao

public interface ITestBatchInsertMapper {

	void batchInsert(List<TestBatchInsertInfo> list);
}

 

4. mapper.xml

<insert id="batchInsert" parameterType="java.util.List">
  	INSERT INTO TEST_BATCH_INSERT 
  	(
  	 TEST_ID, TEST_NAME, AGE, CREATE_TIME, UPDATE_TIME
  	)
  	VALUES
  	<foreach collection="list" item="log" index= "index" separator =",">
  		(
  		#{log.testId, jdbcType=NUMERIC}, #{log.testName, jdbcType=VARCHAR}, #{log.age, jdbcType=NUMERIC}, 
  		sysdate(), sysdate()
  		)
  	</foreach>
  </insert>

 

5. 多线程

public class TestBatchInsertThread implements Runnable {

	private ITestBatchInsertMapper testBatchInsertMapper;

	/** 数据集合 */
	private List<TestBatchInsertInfo> list;
	/** 每个线程处理的起始数据 */
	private CountDownLatch begin;
	/** 每个线程处理的结束数据 */
	private CountDownLatch end;

	public TestBatchInsertThread() {
	}

	public TestBatchInsertThread(List<TestBatchInsertInfo> list, CountDownLatch begin, CountDownLatch end,
			ITestBatchInsertMapper testBatchInsertMapper) {
		this.list = list;
		this.begin = begin;
		this.end = end;
		this.testBatchInsertMapper = testBatchInsertMapper;
	}

	@Override
	public void run() {
		try {
			if (list != null && !list.isEmpty()) {
				testBatchInsertMapper.batchInsert(list);
			}
			// 执行完让线程直接进入等待
			begin.await();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 当一个线程执行完 了计数要减一不然这个线程会被一直挂起
			end.countDown();
		}
	}
}

 

6. service

多线程处理的方法是 batchInsertByThread;

 

普通批量处理的方法是 batchInsert

 

 

@Service(value = "testBatchInsertService")
public class TestBatchInsertServiceImpl implements ITestBatchInsertService {

    @Autowired
    private ITestBatchInsertMapper testBatchInsertMapper;

    @Override
    @Transactional
    public void batchInsertByThread(List<TestBatchInsertInfo> list) throws Exception {

        if (list == null || list.isEmpty()) {
            return;
        }
        // 一个线程处理300条数据
        int count = 1000;
        // 数据集合大小
        int listSize = list.size();
        // 开启的线程数
        int runSize = (listSize / count) + 1;
        // 存放每个线程的执行数据
        List<TestBatchInsertInfo> newList = null;
        // 创建一个线程池,数量和开启线程的数量一样
        ExecutorService executor = Executors.newFixedThreadPool(runSize);
        // 创建两个个计数器
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(runSize);

        for (int i = 0; i < runSize; i++) {
            /* 计算每个线程执行的数据 */
            if ((i + 1) == runSize) {
                int startIdx = (i * count);
                int endIdx = list.size();

                newList = list.subList(startIdx, endIdx);
            } else {
                int startIdx = (i * count);
                int endIdx = (i + 1) * count;

                newList = list.subList(startIdx, endIdx);
            }
            TestBatchInsertThread thread = new TestBatchInsertThread(newList, begin, end, testBatchInsertMapper);

            executor.execute(thread);
        }
        begin.countDown();
        end.await();

        executor.shutdown();
    }

    @Override
    public void batchInsert(List<TestBatchInsertInfo> list) {

        if (list == null || list.isEmpty()) {
            return;
        }

        List<TestBatchInsertInfo> tempList = new LinkedList<>();

        for (int i = 0; i < list.size(); i++) {
           
            tempList.add(list.get(i));
           
            if (i % 1000 == 0) {
                testBatchInsertMapper.batchInsert(tempList);
                tempList.clear();
            }
        }
        testBatchInsertMapper.batchInsert(tempList);
    }
}

 

7. junit4 测试方法

import java.util.LinkedList;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.jieshun.springboot.mybatis.MybatisApplication;
import com.jieshun.springboot.mybatis.bean.po.TestBatchInsertInfo;
import com.jieshun.springboot.mybatis.service.ITestBatchInsertService;


@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = MybatisApplication.class/*, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT*/)
public class TestBatchInsertService {

	@Autowired
	private ITestBatchInsertService testBatchInsertService;

	@Test
	public void batchInsertByThread() {

		long startTime = System.currentTimeMillis();

		try {
			List<TestBatchInsertInfo> list = new LinkedList<>();

			TestBatchInsertInfo info = null;

			for (int i = 0; i < 100301; i++) {

				Integer ig = i;

				info = new TestBatchInsertInfo();
				info.setTestId(ig.longValue());
				info.setTestName("test名称_" + i);
				info.setAge(i);

				list.add(info);
			}

			testBatchInsertService.batchInsertByThread(list);

			System.out.println("------Batch Insert Success------");

		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));
	}

	@Test
	public void batchInsert() {

		long startTime = System.currentTimeMillis();

		try {
			List<TestBatchInsertInfo> list = new LinkedList<>();

			TestBatchInsertInfo info = null;

			for (int i = 0; i < 100301; i++) {

				Integer ig = i;

				info = new TestBatchInsertInfo();
				info.setTestId(ig.longValue());
				info.setTestName("test名称_" + i);
				info.setAge(i);

				list.add(info);
			}

			testBatchInsertService.batchInsert(list);

			System.out.println("------Batch Insert Success------");

		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));

	}
}

 

8. springboot 启动类

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
 * 应用启动类
 * 
 * @author 
 * @date 2018年10月17日
 * @since JDK 1.8
 */
@SpringBootApplication
@EnableTransactionManagement
@ComponentScan(basePackages = { "com.jieshun.springboot.mybatis" })
@MapperScan(basePackages = { "com.jieshun.springboot.mybatis.dao" })
public class MybatisApplication {

	public static void main(String[] args) {
		SpringApplication.run(MybatisApplication.class, args);
	}

}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics