- 浏览: 83989 次
- 性别:
- 来自: 郑州
文章分类
- 全部博客 (69)
- java (28)
- linux (6)
- redis (4)
- C# (3)
- 架构 (10)
- java ee (1)
- web (1)
- 操作系统 (7)
- sqlserver (1)
- android (2)
- Hadoop (12)
- 大数据 (21)
- 分布式 事务 消息 (10)
- linux mysql (1)
- 数据库 (3)
- 关于hadoop之bootshell使用 (2)
- 关于hbase---HTableInterfaceFactory (1)
- Spring (3)
- Hbase (5)
- jstorm (10)
- nginx (1)
- 分布式 (1)
- 区块链 (3)
- dubbo (1)
- nacos (1)
- 阿里 (1)
- go (3)
- 缓存 (1)
- memcached (1)
- ssdb (1)
- 源码 (1)
最新评论
-
想个可以用的名字:
楼主,能不能给发一份源代码,1300246542@qqq.co ...
spring+websocket的使用 -
wahahachuang5:
web实时推送技术使用越来越广泛,但是自己开发又太麻烦了,我觉 ...
websocket -
dalan_123:
前提是你用的是spring mvc 才需要加的1、在web.x ...
spring+websocket的使用 -
string2020:
CharacterEncodingFilter这个filter ...
spring+websocket的使用
一、作用
构建一个Rotationg transaction的state类 用于完成partition的state管理及操作
二、源码分析
package storm.trident.topology.state;
import backtype.storm.utils.Utils;
import org.apache.zookeeper.KeeperException;
import java.util.HashSet;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
public class RotatingTransactionalState {
// state intitle接口
public static interface StateInitializer {
Object init(long txid, Object lastState);
}
// 事务状态 通过zookeeper来进行管理
private TransactionalState _state;
// 子目录
private String _subdir;
// 记录transactional id及其内容
private TreeMap<Long, Object> _curr = new TreeMap<Long, Object>();
public RotatingTransactionalState(TransactionalState state, String subdir) {
_state = state;
_subdir = subdir;
state.mkdir(subdir);
sync();
}
// 获取最近的transaction state 因为使用的TreeMap 最后一个元素即为最近的transaction state
public Object getLastState() {
if(_curr.isEmpty()) return null;
else return _curr.lastEntry().getValue();
}
// 若是transaction state发生改变时需要更新对应的zookeeper对应节点的内容
public void overrideState(long txid, Object state) {
_state.setData(txPath(txid), state);
_curr.put(txid, state);
}
// 根据指定的txid删除对应的transaction state内容
public void removeState(long txid) {
if(_curr.containsKey(txid)) {
_curr.remove(txid);
_state.delete(txPath(txid));
}
}
// 根据执行txId获取本地对应的transaction state
public Object getState(long txid) {
return _curr.get(txid);
}
// 根据指定的txId 和 对应的初始化器 获取对应的transaction state
public Object getState(long txid, StateInitializer init) {
if(!_curr.containsKey(txid)) {
// 获取小于指定txId的内容
SortedMap<Long, Object> prevMap = _curr.headMap(txid);
// 获取大于等于指定的txId的内容
SortedMap<Long, Object> afterMap = _curr.tailMap(txid);
// 判断preMap是否为空 不为空获取最后一个key即为prev state
Long prev = null;
if(!prevMap.isEmpty()) prev = prevMap.lastKey();
//
Object data;
if(afterMap.isEmpty()) {
Object prevData;
if(prev!=null) {
prevData = _curr.get(prev);
} else {
prevData = null;
}
data = init.init(txid, prevData);
} else {
// ??????????
data = null;
}
// 添加到本地
_curr.put(txid, data);
// 在zookeeper上创建对应的节点
_state.setData(txPath(txid), data);
}
// 将对应的transaction state内容返回
return _curr.get(txid);
}
public Object getPreviousState(long txid) {
SortedMap<Long, Object> prevMap = _curr.headMap(txid);
if(prevMap.isEmpty()) return null;
else return prevMap.get(prevMap.lastKey());
}
// 判断本地cache是否存在
public boolean hasCache(long txid) {
return _curr.containsKey(txid);
}
/**
* Returns null if it was created, the value otherwise.
*/
public Object getStateOrCreate(long txid, StateInitializer init) {
if(_curr.containsKey(txid)) {
return _curr.get(txid);
} else {
getState(txid, init);
return null;
}
}
// 删除指定txId对应的node内容 包括两个部分:本地 和 zookeeper
public void cleanupBefore(long txid) {
SortedMap<Long, Object> toDelete = _curr.headMap(txid);
for(long tx: new HashSet<Long>(toDelete.keySet())) {
_curr.remove(tx);
try {
_state.delete(txPath(tx));
} catch(RuntimeException e) {
// Ignore NoNodeExists exceptions because when sync() it may populate _curr with stale data since
// zookeeper reads are eventually consistent.
if(!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
throw e;
}
}
}
}
// 同步获取指定subdir的所有子节点 并获取对应的内容 同时完成在本地进行保存
private void sync() {
List<String> txids = _state.list(_subdir);
for(String txid_s: txids) {
Object data = _state.getData(txPath(txid_s));
_curr.put(Long.parseLong(txid_s), data);
}
}
private String txPath(long tx) {
return txPath("" + tx);
}
private String txPath(String tx) {
return _subdir + "/" + tx;
}
}
构建一个Rotationg transaction的state类 用于完成partition的state管理及操作
二、源码分析
package storm.trident.topology.state;
import backtype.storm.utils.Utils;
import org.apache.zookeeper.KeeperException;
import java.util.HashSet;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
public class RotatingTransactionalState {
// state intitle接口
public static interface StateInitializer {
Object init(long txid, Object lastState);
}
// 事务状态 通过zookeeper来进行管理
private TransactionalState _state;
// 子目录
private String _subdir;
// 记录transactional id及其内容
private TreeMap<Long, Object> _curr = new TreeMap<Long, Object>();
public RotatingTransactionalState(TransactionalState state, String subdir) {
_state = state;
_subdir = subdir;
state.mkdir(subdir);
sync();
}
// 获取最近的transaction state 因为使用的TreeMap 最后一个元素即为最近的transaction state
public Object getLastState() {
if(_curr.isEmpty()) return null;
else return _curr.lastEntry().getValue();
}
// 若是transaction state发生改变时需要更新对应的zookeeper对应节点的内容
public void overrideState(long txid, Object state) {
_state.setData(txPath(txid), state);
_curr.put(txid, state);
}
// 根据指定的txid删除对应的transaction state内容
public void removeState(long txid) {
if(_curr.containsKey(txid)) {
_curr.remove(txid);
_state.delete(txPath(txid));
}
}
// 根据执行txId获取本地对应的transaction state
public Object getState(long txid) {
return _curr.get(txid);
}
// 根据指定的txId 和 对应的初始化器 获取对应的transaction state
public Object getState(long txid, StateInitializer init) {
if(!_curr.containsKey(txid)) {
// 获取小于指定txId的内容
SortedMap<Long, Object> prevMap = _curr.headMap(txid);
// 获取大于等于指定的txId的内容
SortedMap<Long, Object> afterMap = _curr.tailMap(txid);
// 判断preMap是否为空 不为空获取最后一个key即为prev state
Long prev = null;
if(!prevMap.isEmpty()) prev = prevMap.lastKey();
//
Object data;
if(afterMap.isEmpty()) {
Object prevData;
if(prev!=null) {
prevData = _curr.get(prev);
} else {
prevData = null;
}
data = init.init(txid, prevData);
} else {
// ??????????
data = null;
}
// 添加到本地
_curr.put(txid, data);
// 在zookeeper上创建对应的节点
_state.setData(txPath(txid), data);
}
// 将对应的transaction state内容返回
return _curr.get(txid);
}
public Object getPreviousState(long txid) {
SortedMap<Long, Object> prevMap = _curr.headMap(txid);
if(prevMap.isEmpty()) return null;
else return prevMap.get(prevMap.lastKey());
}
// 判断本地cache是否存在
public boolean hasCache(long txid) {
return _curr.containsKey(txid);
}
/**
* Returns null if it was created, the value otherwise.
*/
public Object getStateOrCreate(long txid, StateInitializer init) {
if(_curr.containsKey(txid)) {
return _curr.get(txid);
} else {
getState(txid, init);
return null;
}
}
// 删除指定txId对应的node内容 包括两个部分:本地 和 zookeeper
public void cleanupBefore(long txid) {
SortedMap<Long, Object> toDelete = _curr.headMap(txid);
for(long tx: new HashSet<Long>(toDelete.keySet())) {
_curr.remove(tx);
try {
_state.delete(txPath(tx));
} catch(RuntimeException e) {
// Ignore NoNodeExists exceptions because when sync() it may populate _curr with stale data since
// zookeeper reads are eventually consistent.
if(!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
throw e;
}
}
}
}
// 同步获取指定subdir的所有子节点 并获取对应的内容 同时完成在本地进行保存
private void sync() {
List<String> txids = _state.list(_subdir);
for(String txid_s: txids) {
Object data = _state.getData(txPath(txid_s));
_curr.put(Long.parseLong(txid_s), data);
}
}
private String txPath(long tx) {
return txPath("" + tx);
}
private String txPath(String tx) {
return _subdir + "/" + tx;
}
}
发表评论
-
nacos单机源码调试
2018-12-17 11:35 1173首先从github上获取对应的源码Nacos源码git cl ... -
jstorm源码之TransactionalState
2016-03-21 19:31 844一、作用 主要是通过结合zookeeper,在zookee ... -
jstorm源码之PartitionedTridentSpoutExecutor
2016-03-21 19:28 846一、作用 Partition Spout对应的exec ... -
jstorm源码之 RichSpoutBatchExecutor
2016-03-21 19:28 0一、作用 RichSpoutBatchExecutor是IRi ... -
jstorm源码之RotatingMap
2016-03-21 19:27 836一、作用 基于LinkedList + HashM ... -
jstorm源码之 RichSpoutBatchExecutor
2016-03-21 19:24 566一、作用 RichSpoutBatchExecutor是IRi ... -
jstorm源码之TridentTopology
2016-03-16 18:12 2317在jstorm中对应TridentTopology的源码如下, ... -
jstorm操作命令
2016-03-15 18:04 2686启动ZOOPKEEPER zkServer.sh start ... -
JStorm之Supervisor简介
2016-03-15 18:02 1185一、简介Supervisor是JStorm中的工作节点,类似 ... -
JStorm介绍
2016-03-15 17:56 886一、简介Storm是开源的 ... -
mycat的使用---sqlserver和mysql
2016-01-11 14:33 8549数据库中间件mycat的使 ... -
jstorm安装
2015-12-03 19:43 1687关于jstorm单机安装可以 ... -
HBase系列一
2015-11-30 16:17 675关于hbase 一、客户端类 HTable 和 HTabl ... -
spring hadoop系列(六)---HbaseSystemException
2015-11-30 09:13 414一、源码 /** * HBase Data Access e ... -
spring hadoop系列(五)---spring hadoop hbase之HbaseSynchronizationManager
2015-11-27 18:16 843一、源码如下 /** * Synchronization m ... -
spring hadoop 系列(二)
2015-11-27 15:26 560一、源码分析 /** * * HbaseAccesso ... -
spring hadoop之batch处理(二)
2015-11-24 18:10 1470一、测试 public class MrBatchApp { ... -
spring hadoop之mapreduce batch
2015-11-24 15:51 583一、测试 // 定义hadoop configuration ... -
centos6.7 64位 伪分布 安装 cdh5.4.8 + jdk 8
2015-11-09 00:37 2269一、安装JAVA # 创建JAVA的目录 mkdir -p / ... -
spring hadoop系列二(MapReduce and Distributed cache)
2015-11-06 15:22 946关于MapReduce and Distributed Ca ...
相关推荐
jstorm源码解析之bolt异常处理方法.docx
Storm 源码分析 - 李明,王晓鹏
下面小编就为大家带来一篇jstorm源码解析之bolt异常处理方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
JStorm原始码学习:主要包含Storm重新启动,Nimbus启动,Supervisor启动,Executor创建和启动 风暴编程模型 Nimbus:负责资源分配和任务调度。 主管:负责接受nimbus分配的任务,启动和停止属于自己管理的worker...
阿里中间件性能挑战赛初赛-RocketMQ+JStorm+Tair实时统计双11交易金额源码+学习说明.zip阿里中间件性能挑战赛初赛-RocketMQ+JStorm+Tair实时统计双11交易金额源码+学习说明.zip阿里中间件性能挑战赛初赛-RocketMQ+...
jstorm 2.1.1 javadoc
【大纲】 现状 Jstorm概叙 & 流式计算 为什么开发Jstorm 特性 Question and Answer.
jstorm集成kafka插件demo
storm-core-1.0.3-sources.jar 源码文件,1.0.3版本
jstorm框架介绍,包含架构图、jstorm安装部署以及配置、如何在jstorm框架里写业务代码。
jstorm storm 入门demo,包含本地模式 和 集群模式。小小的demo,仅包含4个class。
jstorm-2.2.1
jstorm简单example,仅供本人网络不通,大家无需下载,https://github.com/alibaba/jstorm有免费资源
JStorm 2.2.1 百度网盘下载,Alibaba JStorm 是一个强大的企业级流式计算引擎
java实现jstorm+kafka集成实例,代码可运行。内有readme说明文档
jstorm简单示例,十分适合入门,希望能帮到大家
JStorm是参考storm的实时流式计算框架,在网络IO、线程模型、资源调度、可用性及稳定性上做了持续改进,已被...经过4年发展,阿里巴巴JStorm集群已经成为世界上最大的集群之一,基于JStorm的应用数量超过1000个。 6198}
jstorm框架default.yaml参数配置项列表详解
alibaba-jstorm
Jstorm 、zookeeper集群部署详细操作步骤 Centos7系统