`
m635674608
  • 浏览: 4929447 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

RocketMQ原理解析-producer 4.发送分布式事物消息

    博客分类:
  • MQ
 
阅读更多

为什么消息要具备事务能力


还是比较清晰的。简单的说 就是在你业务逻辑过程中,需要发送一条消息给订阅消息的人,但是期望是 此逻辑过程完全成功完成之后才能使订阅者收到消息。
业务逻辑过程 假设是这样的:
逻辑部分a-->发消息给MQ-->逻辑部分b
假设我们在发送消息给MQ之后执行逻辑部分b时产生了异常,那如果MQ不具备事务消息能力时,订阅者也收到了消息。这是我们不希望见到的。

分布式事务基础概念

  1. 关于分布式事务、两阶段提交协议、三阶提交协议
  2. 理解分布式事务的两阶段提交2pc
  3. 分布式事务(一)两阶段提交及JTA
  4. 分布式系统常用思想和技术总结
  5. 【整理】脑裂问题
  6. 分布式系统的事务处理
  7. 多版本并发控制(MVCC)在分布式系统中的应用
  8. 戏说PAXOS
  9. 阿里云消息队列 MQ关于事务消息的文档

rocketmq具备事务能力的demo

参见TransactionProducerDemo.java

向producer注册的TransactionCheckListener实现并没有用,因为返回LocalTransactionState.UNKNOW状态时,在3.2.6版本中,是不支持此状态下回调TransactionCheckListener的,具体参见以下两个issue。

事务消息 LocalTransactionState.UNKNOW 状态下不回查 #221
开源版本支持事务消息吗 #364
测试过程中发现返回UNKNOW状态是不能正确达到期望的,但是返回ROLLBACK_MESSAGE状态还是能达到期望的。

实现分析入口

这个实现的入口还是比较容易找的,只要搜寻ROLLBACK_MESSAGE这个变量的引用即可发现。顺着搜索查看,其实很容易发现,客户端在收到业务逻辑返回的事务状态时会发送一条结束事务的指令给broker。

// com.alibaba.rocketmq.client.impl.MQClientAPIImpl.endTransactionOneway(String, EndTransactionRequestHeader, String, long) 871行  
RemotingCommand request =
                RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);

按broker对外部指令的常规做法,一般会有一个Processor与之对应。是EndTransactionProcessor,看BrokerController374行其注册的地方,没错。

EndTransactionProcessor分析(broker侧)

如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.ROLLBACK_MESSAGE时,EndTransactionProcessor会清空message的body的置成null,queueOffset也不会更新,那么consumer就收不到消息了。

//--EndTransactionProcessor.processRequest  200行--
if (MessageSysFlag.TransactionRollbackType == requestHeader.getCommitOrRollback()) {
    msgInner.setBody(null);
}

如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.COMMIT_MESSAGE,那么EndTransactionProcessor则会照常put message。

事务消息分为两个阶段,prepare阶段与commit阶段。prepare阶段的消息会写入store,只是写完之后的queueOffset(逻辑位置)为0(commit阶段写完消息后的queueOffset就不是0了。);

 

 

复制代码
// -- com.alibaba.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, Object) 1002行 --
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queue
case MessageSysFlag.TransactionPreparedType:
case MessageSysFlag.TransactionRollbackType:
    queueOffset = 0L;
    break;
case MessageSysFlag.TransactionNotType:
case MessageSysFlag.TransactionCommitType:
default:
    break;
复制代码

待分析问题列表:
1. prepare阶段已经将消息发了过去,commit的时候是否还会再发送一次消息?
2. rollback的时候是否会将prepare的消息删除?

 

http://www.cnblogs.com/simoncook/p/6478196.html

 

分布式事物是基于二阶段提交的

1)      一阶段,向broker发送一条prepared的消息,返回消息的offset即消息地址commitLog中消息偏移量。Prepared状态消息不被消费

发送消息ok,执行本地事物分支, 本地事物方法需要实现rocketmq的回调接口2)2)2) LocalTransactionExecuter,处理本地事物逻辑返回处理的事物状态LocalTransactionState

3)  二阶段,处理完本地事物中业务得到事物状态, 根据offset查找到commitLog中的prepared消息,设置消息状态commitType或者rollbackType, 让后将信息添加到commitLog中, 其实二阶段生成了两条消息

 

 

 

事物消息发送



 
http://blog.csdn.net/quhongwei_zhanqiu/article/details/39142389/

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics