`
tyny
  • 浏览: 74610 次
  • 性别: Icon_minigender_1
  • 来自: 黄冈
社区版块
存档分类
最新评论

Rhino.Queues随笔4 发送和接收

 
阅读更多

    讨论完网络发送接收的流程后,再来看看具体的消息队列中消息的发送接收的流程。这里需要提到的是,为了从全局看待这些东西,省略了很多细节,这并不表示他们不重要,只是他们很多东西在纵向方面很难讨论,自己觉得在横向的系统的方面,可能稍微清楚一点,他们将在后续的文章会再进一步讨论他们。同时这里面有个棘手的问题,自己以前没有了解两段式事务提交的原理,这里用到的很多都是相关知识,如发现有错误,敬请指教。以后有时间补一补相关知识。

    先来看看消息队列的发送过程,通常的理解就是通过消息队列管理器发送消息,底层不用多说,一般人都能理解,即事务性的存储消息。成功发送一般指消息的存储成功,包括相应的一些相应的日志或者计数器等的改变。发送成功后等待底层发送服务的发送。

    但是这里需要注意的是,尽管这里底层的存储使用esent数据库,但是消息发送的事务控制相对复杂。这里有个疑问就是为什么不直接把发送消息以已准备的状态插入发送消息表,而是分作两端事务提交。两端提交第一段就是把消息以未准备好状态插入发送消息表,第二段是根据第一段事务的处理结果来决定是执行提交或者回滚:提交即把上一段提交的消息的状态位置为准备好,回滚则直接从消息发送表删除上一段插入的消息。具体的原因不是很清楚,猜测是作者把esent本身的事务仅仅作为原子操作的保证,而不是借助他的事务管理来管理自己的事务。

    相对于发送的简洁性,接收略显复杂。

    先谈谈他们的区别,发送的简单源于他的事务控制简单和操作是同步的,而且发送消息只有两个状态:未准备和已准备。同时也不需要消息队列自身管理数据源的事务完整性,这个在后面会提到。于此相对,接收却没那么简单。

    首先,较于发送的同步操作,接收可以同步接收,同样可以限期阻塞接收,还有可能就是异步回调接收。当然目前还没发现有支持异步接收的方法。仅仅是限期接收就需要使用事件通知机制,从QueueManager的接收方法可以很很明显的看到:

    首先同步获取,如果有接收的消息立即返回;

     如果没有消息则加锁newMessageArrivedLock(需要这个锁的激活来通知当前线程有新消息到来),重新同步获取,如果有消息立即返回;

     如果没有则在newMessageArrivedLock上等待接收消息事件发生,如果等待超时则抛出异常,否则如果新消息到来事件发生,则重新执行获取流程。这里之所以在等到接收消息事件时,没有立即直接取出消息而是再次执行获取流程,原因在于因为是消息队列支持多用户操作,他可以同时通知多个接收者线程,但是消息只有一个接收者线程能接收到,所以收到通知不一定能接收到消息。

    最后,相对于前面的,这个最为复杂,而且还牵扯到前面两个的一些处理步骤。这个就是保持事务的可恢复性。

    接收的流程一般是指是从接收消息表获取一条消息,同时重置相应的标志位为正在处理(Processing),如果接收过程正常,提交事务时,会把相应的消息存到历史接收消息表,然后从当前的接收消息表删除。如果接收过程出错,则回滚,把消息的状态为重置为可以处理(ReadyToDeliver)。这是正常流程。但是如果接收处理的过程出现异常,在提交或者回滚时,系统崩掉,重启消息队列时,这些信息需要重新恢复,即这些中断的事务都需要回滚。熟悉数据库的都会马上想到一个东西:日志。消息队列也是依靠事务日志来恢复。所以在接收事务的处理过程中,都是有严格日志记录的,但是跟数据库的差别在于,正常处理完毕后,一般都是删除这些事务日志的。所有这些事务日志都是记录在一个事务日志表里面的。

    这里有个细节需要稍微指出一下,可能跟此消息队列的作者的架构有关系,整个消息队列的所有的事务全部依赖于一个事务控制类。而不是每个操作事务对应一个事务处理类,所以需要根据当前操作才能从事务控制类判断究竟做了什么操作。所以在接收消息操作里面,在获取消息之后,提交之前都会记录日志 RegisterUpdateToReverse,然后提交或者回滚时都会从事务日志表删除相应的事务日志。

    说到这里,回过头来看发送会发现这里却没有操作日志的。首先简单明确一下,所有的操作必须都在事务里面进行的,否则消息队列直接抛出异常的。自己简单的理解是数据源不在消息队列的控制范围之内,同时数据源一般都会有自己的事务控制,如果出现异常,事务不会提交,数据源一般都会有相应的恢复措施,类似于接收流程的恢复。所以这个发送过程是可以重现的。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics