LargeDataBatch
Introduction
LargeDataBatch是一个轻量级大数据处理工具,提供简单易用的API。
Overview
在项目开发过程中,经常会遇到大数据问题处理问题,比如上G的数据需要入库,同时对数据的处理 通常会有内存限制和处理时间要求。基于此,开发此代码,可通过调整参数达到要求。
Features
1、通过调整线程数和批次数,可调节数据库CPU的高低,充分压榨数据库
2、默认提供对Mybatis的支持
3、提供定时读取和处理功能,适合比如读取MQ等不确定信息,但需要轮训并及时处理的情况
4、自动对数据进行分片
5、充分利用线程池,避免空闲线程
6、数据处理完毕后,可继续执行同步交易
Getting Started
Maven dependency
<dependency>
<groupId>cn.ymotel</groupId>
<artifactId>largedatabtach</artifactId>
<version>1.0.2</version>
</dependency>
Gradle dependency
compile group: 'cn.ymotel', name: 'largedatabtach', version:'1.0.2'
MybatisBatchDataConsumer mybatisbatch=new MybatisBatchDataConsumer();
mybatisbatch.setSqlSessionFactory(sqlSession);
LargeDataBatch batchhelp=new ThreadSafeLargeDataBatchHelp();
batchhelp.init(100,10,mybatisbatch);
MybatisResultHandler result=new MybatisResultHandler();
result.setSql("xx");
result.setDatabatch(batchhelp)
sqlSession.select("xxx",result);
batchhelp.end();
代码调用逻辑:
- 1、 batchhelp.init(100,10,mybatisbatch) 初始化批次数,线程数和数据处理类.
- 2、使用sqlSession.select()方法,调用MybatisResultHandler,将取得的数据放入batchhelp的队列中,
- 3、在batchhelp中的数据达到一个批次后,将数据和数据处理类推送给消费者(MybatisBatchDataConsumer)进行处理
- 4、 最后调用end方法,将队列中不够一个批次的数据推送给消费者类(MybatisBatchDataConsumer)进行处理
## Principle
在数据提供者提供数据后,将数据放入队列,如果队列达到批次数,将数据提交给线程池。
线程池有空闲队列,消费数据。线程池无空闲队列,阻塞主线程,防止过量获取数据。
线程池有空闲线程后,将队列数据再此提交给线程,同时唤醒主线程,使得主线程可继续提供数据
## Usage
数据提供者可以是数据库,也可以是文件或者其他类型,多种多样,只要能调用addData或者addSql方法即可。
数据提供者和消费者可自由组合
BatchDataConsumer接口介绍
消费者可通过spring配置或者通过new方法实例化进行定义。
程序得到消费者对象后,会生成多个副本,以提高性能
消费者类需要实现BatchDataConsumer接口,程序中因为将会生成多个副本,程序在调用end方法后
,会销毁程序中的副本,为了避免内存泄漏,请注意在close方法中关闭相应对象
程序会调用Runnable的run方法进行数据处理
ThreadSafeLargeDataBatchHelp 方法介绍:
LargeDataBatch 目前有LargeDataBatchHelp和ThreadSafeLargeDataBatchHelp两个实现类
,推荐使用ThreadSafeLargeDataBatchHelp
ThreadSafeLargeDataBatchHelp可在spring 中进行配置,供其他类引用。
/**
* 设置在一个jvm内的总的最高可并行的线程数,
* 在一个jvm中可能会多个地方调用ThreadSafeLargeDataBatchHelp类。
* 如果不加控制,可能导致应用服务器在同一时间线程数过大,应用服务器处理异常情况发生,
* 如果消费者同时超过数据库也可能导致数据库超过阈值情况发生
*不设置,则不会对线程池中的线程数进行总体控制
* @param totalThreadCount
*/
public void setTotalThread(int totalThreadCount) {
init方法中参数介绍:
beanName是spring中配置的beanName,需配置为singleton="false",需要实现BatchDataInterface 接口
使用此参数需要ThreadSafeLargeDataBatchHelp可得到ApplicationContext对象,
推荐在spring中进行自动注入,
batchsize在达到批次数后,会将一批数据整体提交到线程池进行处理,
在数据库中为了提高性能,对大数据的处理一般都是批次提交,
通过开启此batchsize后,可将数据自动分组
threadsize线程数,开启多少个线程同时处理此数据,
如果是数据处理结果是数据库入库操作,通过调整此大小,可提高或者降低数据库的CPU
timeout 超过时间,线程会将未达阀值数据自动提交,如果不设置超时时间
,则默认不开启。
需要定时轮训读取流信息,无法调用end方法的场景可通过增加此参数,实现自动提交
BatchDataInterface t 是通过new 方法实例化的bean
/**
* @param batchsize 在thread中的每次执行条数
* @param threadsize 同时并发的线程数
* @param beanName 在配置文件中改Aciton 需配置为singleton="false"
*/
public void init(int batchsize, int threadsize, String beanName)
/**
* @param batchsize 在thread中的每次执行条数
* @param threadsize 同时并发的线程数
* @param beanName 需配置为singleton="false"
* @param timeout 超过时间,线程会将未达阀值数据,自动提交
*/
public void init(int batchsize, int threadsize, String beanName, long timeout)
/**
* @param batchsize 在thread中的每次执行条数
* @param threadsize 同时并发的线程数
* @param t 实现 BatchDataInterface 的对象
*/
public void init(int batchsize, int threadsize, BatchDataInterface t)
/**
* @param batchsize 在thread中的每次执行条数
* @param threadsize 同时并发的线程数
* @param t 实现 BatchDataInterface 的对象
* @param timeout 超过时间,线程会将未达阀值数据,自动提交
*/
public void init(int batchsize, int threadsize, BatchDataInterface t, long timeout) {
addSql方法介绍:
程序会将sql和obj数据组成一个数组,放入队列中,在达到阈值将数据放入List中供消费者调用
/**
* 程序会将sql和数据组成一个数组,放入队列中,供消费者调用
* @param sql
* @param obj
*/
public void addSql(String sql, Object obj) {
/**
* 程序会将obj,放入队列中,在达到阈值将数据放入List中供消费者调用
* @param obj
*/
public void addData(Object obj){
/**
* 数据处理结尾一般会剩余一些未达到batchsize的数据未处理,通过调用此方法
*,可将未达到阈值的数据提交到线程池中进行处理
*/
public void end() {
网址:https://github.com/allon2/lightLargeDataBatch
分享到:
相关推荐
#资源达人分享计划#
一个轻量级的、灵活的命令行JSON处理器,类似于JSON数据处理工具、awk、grep等。它用可移植的C语言编写,没有运行时依赖,允许您轻松地切片、过滤、映射和转换结构化数据。jq 是一个强大的工具,用于在命令行环境中...
一款轻量级的自然语言处理(NLP)工具包,目标是减少用户项目中的工程型代码,例如数据处理循环、训练循环、多卡运行等。
fastNLP:面向自然语言处理NLP的轻量级框架 fastNLP具有如下的特性: 统一的Tabular式数据容器,简化数据预处理过程; 内置多种数据集的Loader和Pipe,省去预处理代码; 各种方便的NLP工具,例如Embedding加载(包括...
fastNLP是一款轻量级的自然语言处理(NLP)工具包,目标是减少用户项目中的工程型代码,例如数据处理循环、训练循环、多卡运行等。 fastNLP具有如下的特性: 便捷。在数据处理中可以通过apply函数避免循环、使用多...
fastNLP是一款面向自然语言处理(NLP)的轻量级框架,目标是快速实现NLP任务以及构建复杂模型。 fastNLP具有如下的特性: 统一的Tabular式数据容器,简化数据预处理过程; 内置多种数据集的Loader和Pipe,省去...
jq 是一个轻量级而且灵活的命令行 JSON 处理器。类似用于 JSON 数据的 sed 工具。
sandbox是一个基于django框架开发的轻量级办公平台,主要模块有:权限控制、资产(库存)管理、设备管理、客户信息管理和工单流程管理,其目的在于建立一套规范化、统一化和清晰化的标准服务流程,能够清晰的处理、...
特点: 1、程序简单,由PHP编写,用Excell做迁移配置。 2、目前支持mysql, sqlite,Excell三种数据源 3、操作方便,在php的环境中执行,由dos shell启动 4、配置灵活,适合处理10万以下的小批量数据
一款轻量级的自然语言处理(NLP)工具包自然语言处理(NLP)工具包是一种用于处理和分析人类语言的软件。它包括许多不同的工具和技术,可以帮助计算机理解和处理语言。 NLP工具包通常包括以下功能: - 分词:将...
传统DOM处理需要编写繁杂的选择器,逐级操作还原服务器需要的JSON数据格式,不但操作繁琐且易出现致命错误。而通过Vue.js的响应式双向绑定数据,实时反映数据的真实变化并映射到数据源上,避免前端页面开发中DOM选择器...
Rope是一款轻量级别的ETL(Extract-Transform-Load)工具。主要用于从不同源获取/接受数据,然后统一处理数据后,写入到各种目标源;系统采用多级缓冲和数据缓存,每秒可处理上万级别的数据;而且系统采用插件扩展系统...
基于VB6自主研发的轻量级数据库查询客户端工具,启动速度较快,所占资源较小。 附源码,对初学数据库编程的童鞋有所裨益! 1、支持SQLServer、Oracle、Sybase、DB2数据库平台 2、单句执行:可以提交单一的SQL语句...
PIKA:基于Pytorch和(Py)Kaldi的轻量级语音处理工具包PIKA是基于Pytorch和(Py)Kaldi的轻量级语音处理工具包。 第一个版本侧重于端到端语音识别。 我们使用作为深度学习引擎,使用进行数据格式化和特征提取。主要...
基于C++实现的轻量级Web服务器源码+项目说明.zip 开发部署环境 操作系统: Ubuntu 16.04 编译器: g++ 5.4 版本控制: git 自动化构建: cmake 集成开发工具: CLion 编辑器: Vim 压测工具:WebBench 核心功能及...
数据库客户端工具用于连接、查询、更新数据库,ORM(对象关系映射)工具简化了数据操作和持久化层的开发工作。 总之,开发工具极大地提升了软件工程师的工作效率,保证了开发过程中的准确性与一致性,同时也促进了...
memorious是一个轻量级的网页抓取工具包。 它支持收集结构化或非结构化数据的抓取工具。 这包括以下用例: 使爬虫模块化和可重复使用的简单任务 提供实用功能来执行常见任务,例如数据存储、HTTP 会话管理 将爬虫...
MinQuery是一款针对WeChat-Min-App的一套轻量级代码工具库。旨在简化小程序开发管理流程,降低开发门槛,提高开发效率。 API文档 中文API文档 主要特性 针对已有的小程序原生API进行简单二次封装,支持更多对原生...
当前数据类型支持 弦乐 清单 套装 散列 排序集 位图和HyperLogLogs 待办事项列表位于。 软件特点 用Python2.7编写(Python3.x在JSON序列化上失败-正在处理)。 所有依赖项都包含在源中(不需要pip或...
数据库客户端工具用于连接、查询、更新数据库,ORM(对象关系映射)工具简化了数据操作和持久化层的开发工作。 总之,开发工具极大地提升了软件工程师的工作效率,保证了开发过程中的准确性与一致性,同时也促进了...