背景:
日终处理分户账记录的转逾期,数据量越来越大,单机处理时间已经不能忍受,考虑重构批处理逻辑。
场景特性:分户账转逾期处理,每条记录和别的记录互不相干
因此可以考虑把分户账信息分类处理,该方案可以方便的让执行程序随着分户账数据的不断增加,任意扩展到多个虚拟机,或者在同一个JVM内使用多线程处理。
待完善部分:某个任务处理失败,需要在回调函数增加处理,记录失败的Id号,因为是跑批另外最后两个类里面还可以加上工作日信息,在处理逻辑中加一层校验;在后台修复数据后,增加添加任务接口,让剩余的这个ID的创建新的节点,并重新执行。
第一步;
数据预处理,基于分户账记录的主键,hash后对128(数值可以取大点)取模,把数据分成128份,在此字段建索引
第二步:
利用zookeeper,建立阻塞消息队列
第三部:
任务分发系统MissionMaker定时执行,从数据库取出128这个值和任务名称,在/Queue/operation_yuqi节点下,以此创建128个持久化排序队列,把i值放到节点取值里面
第四步:
任务执行系统TaskExecuter监听/Queue/operation_yuqi节点的队列变化情况,发生变更,就在里面取出一个节点,并读出节点的数据(0-127),然后执行自己管理的部分分户账数据
阻塞队列
public class DistributedBlockingQueue<T> {
protected final ZkClient zkClient;
protected final String root;
protected static final String Node_NAME = "n_";
public DistributedBlockingQueue(ZkClient zkClient, String root,String taskName) {
this.zkClient = zkClient;
this.root = root.concat("/").concat(taskName);
}
public boolean offer(T element) throws Exception{
String nodeFullPath = root .concat( "/" ).concat( Node_NAME );
try {
zkClient.createPersistentSequential(nodeFullPath , element);
}catch (ZkNoNodeException e) {
zkClient.createPersistent(root);
offer(element);
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
return true;
}
public T poll(TaskCallBack back) throws Exception {
while (true){
final CountDownLatch latch = new CountDownLatch(1);
final IZkChildListener childListener = new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds)
throws Exception {
System.out.println(Thread.currentThread().getName()+",发现任务队列长度发生变化!");
latch.countDown();
}
};
zkClient.subscribeChildChanges(root, childListener);
try{
List<String> list = zkClient.getChildren(root);
T node = null;
if (list.size() == 0) {
}else{
Collections.sort(list, new Comparator<String>() {
public int compare(String lhs, String rhs) {
return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
}
});
}
String nodeN = "";
for ( String nodeName : list ){
String nodeFullPath = root.concat("/").concat(nodeName);
try {
node = (T) zkClient.readData(nodeFullPath);
Boolean bb = zkClient.delete(nodeFullPath);
if(bb){
nodeN = nodeName;
}
} catch (ZkNoNodeException e) {
node=null;
// ignore
}
break;
}
if (node != null && null!=nodeN && !"".equals(nodeN)){
back.doTask(node);
}else{
latch.await();
}
}finally{
zkClient.unsubscribeChildChanges(root, childListener);
}
}
}
public T getPoll() throws Exception {
try {
List<String> list = zkClient.getChildren(root);
if (list.size() == 0) {
return null;
}
Collections.sort(list, new Comparator<String>() {
public int compare(String lhs, String rhs) {
return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
}
});
for ( String nodeName : list ){
String nodeFullPath = root.concat("/").concat(nodeName);
try {
T node = (T) zkClient.readData(nodeFullPath);
zkClient.delete(nodeFullPath);
return node;
} catch (ZkNoNodeException e) {
// ignore
}
}
return null;
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
}
private String getNodeNumber(String str, String nodeName) {
int index = str.lastIndexOf(nodeName);
if (index >= 0) {
index += Node_NAME.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
}
//创建任务
public class MissionMaker implements Runnable{
private String root;
private ZkClient zkClient;
public MissionMaker(String url,String root){
this.zkClient = new ZkClient(url, 5000, 5000, new SerializableSerializer());
this.root = root;
}
public TaskBean getTaskBean(){
TaskBean taskBean = new TaskBean();
taskBean.setTaskName("operation_yuqi");//逾期
taskBean.setHashNum(16);
return taskBean;
}
//创建任务
public void createTasks() throws Exception {
TaskBean taskBean = getTaskBean();
DistributedBlockingQueue queueMaker = new DistributedBlockingQueue<TaskBean>(zkClient,root,taskBean.getTaskName());
System.out.println("create mission:"+taskBean.getHashNum());
for(int i=0;i<taskBean.getHashNum();i++){
OprBean oprBean = new OprBean();
oprBean.setHashId(i+"");
queueMaker.offer(oprBean);
}
}
public void run(){
try {
while (true) {
createTasks();
Thread.sleep(30000);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//执行任务
public class TaskExecuter implements Runnable{
private String root;
private ZkClient zkClient;
private String taskName;
private DistributedBlockingQueue queueMaker;
public TaskExecuter(String url,String root,String taskName){
this.zkClient = new ZkClient(url, 5000, 5000, new SerializableSerializer());
this.root = root;
this.taskName = taskName;
this.queueMaker = new DistributedBlockingQueue<TaskBean>(zkClient,root,taskName);
}
public void run(){
try {
queueMaker.poll(new TaskCallBack<OprBean>() {
@Override
public void doTask(OprBean oprBean) {
// TODO Auto-generated method stub
try {
System.out.println("线程"+Thread.currentThread().getName()+"执行计算任务,taskName="+taskName+",hashId = "+oprBean.getHashId());
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void afterTask(String hashId) {
// TODO Auto-generated method stub
System.out.println("更新数据库记录,taskName="+taskName+",hashId = "+hashId);
}
});
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//回调接口
public interface TaskCallBack<T> {
void doTask(T oprBean);
void afterTask(String hashId);
}
序列化到zookeeper的类
public class OprBean implements Serializable{
/**
*
*/
private static final long serialVersionUID = 2840329402832770757L;
private String hashId;
private String oprResult;
public String getHashId() {
return hashId;
}
public void setHashId(String hashId) {
this.hashId = hashId;
}
public String getOprResult() {
return oprResult;
}
public void setOprResult(String oprResult) {
this.oprResult = oprResult;
}
}
获取数据库信息的类
public class TaskBean {
private String taskName;
private Integer hashNum;
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public Integer getHashNum() {
return hashNum;
}
public void setHashNum(Integer hashNum) {
this.hashNum = hashNum;
}
}
分享到:
相关推荐
基于 DUBBO + ZOOKEEPER 的计量服务平台是互联网与传统计量检测服务业相结合的产物,平台系统设计充分考虑了计量行业特点,在此基础上 确定系统的建设目标,重点解决分布式系统集群中数据共享、数据一致性与服务...
cdc技术分析文章,基于flink-cdc做监控数据源的实时变更数据捕获。该文从基础定义、使用场景、分析、实际使用流程、源码分析、常见问题及解决方式等几个方向讲解了这个技术栈。如果公司有要使用搜索方案,涉及到业务...
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献...
它提供了一些简单的操作,使得分布式应用可以基于这些接口实现诸如同步、配置维护和分集群或者命名的服务。Zookper很容易编程接入,它使用了一个和文件树结构相似的数据模型。 虽然ZooKeeper是一个Java应用程序,但C...
基于Zookeeper实现分布式服务器动态上下线感知 深入分析Zookeeper Zab协议及选举机制源码解读 Dubbo 使用Dubbo对单一应用服务化改造 Dubbo管理中心及及监控平台安装部署 Dubbo分布式服务模块划分(领域驱动) ...
图形处理和机器学习 Storm 分布式实时⼤数据处理系统 毫秒级别的实时数据处理能⼒ 实时分析的领导者 8 数据查询分析 Hive 基于hadoop的数据仓库,结构化 SparkSQL 处理结构化数据的spark组件 分布式的SQL查询引擎 ...
swall是一个基于zookeeper实现的分布式基础信息管理系统(Infrastructure Management)可以用于管理特别是架构比较灵活的服务,比如游戏。用swall, 你不用登陆到具体的服务器去操作,你指需要在一台机器上面就可以...
加了 NoSQL 支持,以及基于 Zookeeper 的分布式扩展功能 SolrCloud。SolrCloud 的说明可 以参看:SolrCloud 分布式部署。它的主要特性包括:高效、灵活的缓存功能,垂直搜索功 能,Solr 是一个高性能,采用 Java5 ...
基于Zookeeper的服务注册和发现完美集成Spring / SpringBoot项目HTTP传输协议多种负载均衡策略进行下的容错处理(FailOver / FailFast)拦截器处理,插件式扩展客户端自动恢复动态注册/卸载服务管理员后台动态修改...
基于netty和zookeeper实现的简单远程调用框架 项目结构 1.客户端组件 RpcClient 使用Netty向服务提供方发起服务调用连接。 RpcProxy 创建远程服务调用代理。 ServiceDiscovery 连接Zookeeper,获取服务地址。 2....
Hadoop/HDFS/Zookeeper/HBase/MapReduce/Yarn/Hive/Flink/Flume/Kafka/Loader/ElasticSearch/R edis) 数据预处理 特征选择与降维 有监督学习 无监督学习 模型评估与优化 数据挖掘综合应用 Spark MLlib 数据挖掘 ...
答:①ArrayList和LinkedList可想从名字分析,它们一个是Array(动态数组)的数据结构,一个是Link(链表)的数据结构,此外,它们两个都是对List接口的实现。 前者是数组队列,相当于动态数组;后者为双向链表结构,也...
Mycat关键特性 关键特性 支持SQL92标准 支持MySQL、Oracle、DB2、SQL Server、PostgreSQL等DB的常见SQL语法 遵守Mysql原生协议,跨语言...集群基于ZooKeeper管理,在线升级,扩容,智能优化,大数据处理(2.0开发版)。
「⼤数据⼲货」基于 「⼤数据⼲货」基于Hadoop的⼤数据平台实施 的⼤数据平台实施——整体架构设 整体架构设 计 计 ⼤数据的热度在持续的升温,继云计算之后⼤数据成为⼜⼀⼤众所追捧的新星。我们暂不去讨论⼤数据...
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop...
Handling spikes and congestion:Heron 具有一个背压机制,即在执行时的一个topology中动态地调整数据流,从而不影响数据的准确性。这在流量峰值和管道堵塞时非常有用。 图3:Heron UI,显示逻辑计划、物理计划...
3 传统数据仓库在⾯对更⼤规模数据时显得⼒不从⼼,在寄希望于⼤数据平台时,MapReduce 编程门槛让很多数据分析师望⽽却步,⽽Hive 是基于Hadoop的⼀个数据仓库⼯具,可以将结构化的数据⽂件映射为⼀张数据库表,并...
修改默认的beanName生成策略,controller参数扩展 1.0.14 分布式session使用zookeeper 1.0.15 zookeeper工具类优化 增加工具类 1.0.16 页面html标志修改 httpclient中文支持 工具类增强(zip,reflect,thread) 1.0.17 ...