`
uule
  • 浏览: 6308783 次
  • 性别: Icon_minigender_1
  • 来自: 一片神奇的土地
社区版块
存档分类
最新评论

RabbitMQ消息确认机制

阅读更多

应用RabbitMQ可靠性传输机制实现Redis缓存的实时更新

 

 

消息中间件集群崩溃,如何保证百万生产数据不丢失?

RabbitMQ暂时放在了自己的内存中,还没来得及投递给下游的仓储服务呢,此时RabbitMQ突然宕机了,会怎么样?

答案其实很简单,默认情况下,按照我们目前的代码和配置,这个数据就会丢失了。

 

持久化

队列持久化:

//queuechannel.queue_declare(queue='hello2',durable=True)

channel.queueDeclare(
 "warehouse_schedule_delivery",
 true, 
 false,
 false,
 null);

 

核心在于第二个参数,第二个参数是true。意思就是说,这个创建的queue是durable的,也就是支持持久化的。

这样,RabbitMQ会把这queue的相关信息持久化的存储到磁盘上去,即使RabbitMQ宕机后重启,也会恢复之前创建好的这个queue。

 

消息持久化:

 

现在你的queue的信息可以持久化了,RabbitMQ宕机重启后会自动恢复queue。但是,你的queue里的message数据呢?

queue里都是订单服务发送过去的订单消息数据,如果RabbitMQ还没来得及投递queue里的订单消息到仓储服务,结果RabbitMQ就宕机了。

 

那此时RabbitMQ重启之后,他可以恢复queue的信息,但是queue的message数据是没法恢复了。

 

所以就需要在你的订单服务发送消息到RabbitMQ的时候,【定义这条消息也是durable】,即持久化的。

#channel.basic_publish(exchange='',routing_key='hello2',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))


/* 参数:

 * 向server发布一条消息
 * 参数1:exchange名字,若为空则使用默认的exchange
 * 参数2:routing key
 * 参数3:其他的属性
 * 参数4:消息体

 * 【RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,

 * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃】
 */ 

   

channel.basicPublish("", "warehouse_schedule_delivery",MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

 通过上面的方式来发送消息,就可以让发送出去的消息是持久化的。

 

一旦标记了消息是持久化之后,就会让RabbitMQ把消息持久化写入到磁盘上去,此时如果RabbitMQ还没投递数据到仓储服务,结果就突然宕机了。那么再次重启的时候,就会把磁盘上持久化的消息给加载出来。

 

但是这里要注意一点,RabbitMQ的消息持久化,是不承诺100%的消息不丢失的。

 

因为有可能【RabbitMQ接收到了消息,但是还没来得及持久化到磁盘,他自己就宕机了】,【这个时候消息还是会丢失的】。

 

如果要完全100%保证写入RabbitMQ的数据必须落地磁盘,不会丢失,需要依靠其他的机制。

 

 

 

RabbitMQ消息确认机制

 

Message acknowledgment

消费者消息确认机制

 

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消费者消息确认机制(message acknowledgement)。采用消息确认机制之后,消费者就有足够的时间来处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待并持有消息,直到消费者确认了该消息。

 

当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load。

 

【默认情况下,RabbitMQ会顺序的分发每个Message。当每个【收到ack后,会将该Message删除】,然后将下一个Message分发到下一个Consumer】这种分发方式叫做round-robin。这种分发还有问题。

每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,【每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了】。

 

如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。【为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack】。而应该是在【处理完数据后发送ack】。(在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了,【如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer】。这样就保证了在Consumer异常退出的情况下数据也不会丢失)

 

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;【如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理】。【这里不存在timeout概念】,【一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开】。

这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑。

 

RPC

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 

但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

 

【RabbitMQ开启手动ack机制保证消费端数据不丢失的时候,prefetch机制对消费者的吞吐量以及内存消耗的影响。

通过分析,我们知道了prefetch过大容易导致内存溢出,prefetch过小又会导致消费吞吐量过低,所以在实际项目中需要慎重测试和设置。

------------------------------------------------------------------------------------------------------

 

生产者消息确认机制

 

当消息发送出去之后,我们如何知道消息有没有正确到达exchange呢?如果在这个过程中,消息丢失了,我们根本不知道发生了什么,也不知道是什么原因导致消息发送失败了

 

为解决这个问题,主要有如下两种方案:

 

【通过事务】机制实现

【通过生产者消息确认机制】(publisher confirm)实现

 

但是使用【事务机制实现会严重降低RabbitMQ的消息吞吐量】,我们采用一种轻量级的方案——生产者消息确认机制

  

什么是生产者消息确认机制?

 

简而言之,就是:生产者发送的消息一旦被投递到所有匹配的队列之后,就会发送一个确认消息给生产者,这就使得生产者知晓消息已经正确到达了目的地。

 

如果消息和队列是持久化存储的,那么确认消息会在消息写入磁盘之后发出。

 

再补充一个Mandatory参数:【当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息。】

 

 

分享到:
评论

相关推荐

    SpringBoot整合RabbitMQ 实现消息发送确认与消息接收确认机制 源码及教材

    SpringBoot整合RabbitMQ 实现消息发送确认与消息接收确认机制 源码及教材 可以参考博客: https://blog.csdn.net/qq_29914837/article/details/93376741

    RabbitMQ消息模式之Confirm确认消息

    理解Confirm消息确认机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心...

    RabbitMQ消息确认(ACK)机制实战

    对RabbitMQ消息确认(ACK)原理的理解及实践

    springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    主要介绍了springboot + rabbitmq 如何实现消息确认机制,本文给大家分享小编实际开发中的一点踩坑经验,内容简单易懂,需要的朋友可以参考下

    基于Springboot 版本 2.3.2.RELEASE版本开发的Example

    Springboot 2.3.2.RELEASE的 Example共计有9个,可...RabbitMQ的五种消息发送模式,RabbitMQ消息确认机制,RabbitMQ死信队列的定义,RabbitMQ消息100%可靠性方案实践,定时任务3种方式:@Scheduled,Quartz,XXL-JOB。

    RabbitMq+springboot

    demo中介绍了rabbitmq的三种模式,分别为Direct,topic,Fanout并且集成了消息确认机制,消息重发机制集以及集群, 需要用到的同学可以下载看看,少走弯路。

    SpringBoot整合RabbitMQ.zip

    SpringBoot整合RabbitMQ的详细过程 **1.该篇博文首先讲述了交换机和队列之间的绑定关系** ①direct、②fanout、③topic ...消费者接收到消息的消息确认机制** ①自动确认、②手动确认、③根据情况确认

    springboot-rabbitmq.rar

    于rabbitmq的基本概念和相关的理论这里就不做过多介绍了,在之前的篇幅中有过相应的介绍,也可以查询一些资料详细了解一下...1、rabbitmq的消息确认机制; 2、rabbitmq的延时队列,也称作为死信队列的一些研究心得分享

    Java思维导图xmind文件+导出图片

    RabbitMQ消息确认机制 Redis redis数据结构分析 Redis主从复制原理及无磁盘复制分析 Redis管道模式详解 Redis缓存与数据库一致性问题解决方案 基于redis实现分布式实战 图解Redis中的AOF和RDB持久化策略的...

    RabbitMQ Work Queue实例

    基于RabbitMQ的工作队列实现,包括消息确认机制、消息持久化机制、消息的公平调度等。

    最详细RabbitMQ学习资料(源码)

    同时,它支持消息确认机制,确保消息被正确接收和处理。 灵活的路由:RabbitMQ支持多种交换机类型和路由策略,可以根据消息的内容、标签等属性将消息路由到不同的队列或消费者。 多语言支持:RabbitMQ提供了丰富的...

    RabbitMq实例以及安装包

    持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制。 3.综合技术实现 可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。 RabbitMq ...

    rabbitMQ.doc

    rabbitMq简单介绍还有下载,安装及配置,包括一些队列案例(Simble简单队列,.work queues 工作队列 公平分发 轮询分发,订阅模式 publish/subscribe,routing路由模式,Topic 主题模式,rabbitMq的消息确认机制)

    Java开发面试-RabbitMQ专区

    在面试中,可能会涉及到如何使用Java客户端API创建和管理RabbitMQ的队列、交换机和绑定等操作,以及如何发送和接收消息,如何处理消息的确认和拒绝等问题。此外,面试官还可能会询问关于RabbitMQ的高级特性,例如...

    RabbitMQ 学习整理.pdf

    RabbitMQ 学习整理 包括RabbitMQ 配置详解,交换机有四种类型,分别为Direct,topic,headers,Fanout,消息确认机制,多线程处理消息,消费端的限流策略,回调等。都有代码,保证代码正确性都是自己的的从工程中直接...

    深入理解RabbitMQ消息队列的使用

    RabbitMQ单节点服务搭建以及集群的搭建、RabbitMQ的整体架构及各个组件的功能、RabbitMQ当中生产者以及消费者的具体实现、消费者如何做到消息的确认、RabbitMQ如何做到消息的公平分发、RabbitMQ当中fanout、direct、...

    RabbitMQ介绍.zip

    rabbitmq.RabbitMQ的主要特点和优势包括: 可扩展性:RabbitMQ可以通过添加更多的节点和队列来实现水平扩展,满足不断增长的消息处理需求。...此外,RabbitMQ还提供消息确认机制,以确保消息被成功传递和处理。

    rabbitmq 消息100%可靠性投递的解决方案实现 ack手动确认方式.zip

    这种数据驱动的方法有助于减少不确定性,提高决策的准确性。 团队协作: 复杂的问题通常需要多个人的协同努力。方案提供了一个共同的框架,帮助团队成员理解各自的职责和任务,促进协作并确保整个团队朝着共同的...

Global site tag (gtag.js) - Google Analytics