`

rabbitMQ ConfirmListener

 
阅读更多
消息消费者
操作步骤:
1. 创建连接工厂ConnectionFactory
2. 获取连接Connection
3. 通过连接获取通信通道Channel
4. 声明交换机Exchange:交换机类型分为四类:

        Fanout Exchange: 将消息分发到所有的绑定队列,无routingkey的概念

        Headers Exchange :通过添加属性key-value匹配

        Direct Exchange:按照routingkey分发到指定队列

        Topic Exchange:多关键字匹配

5. 声明队列Queue

6. 将队列和交换机绑定

7. 创建消费者

8. 执行消息的消费

package org.lkl.mq.rabbitmq.test;  
  
import java.io.IOException;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.TimeoutException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.ConsumerCancelledException;  
import com.rabbitmq.client.QueueingConsumer;  
import com.rabbitmq.client.QueueingConsumer.Delivery;  
import com.rabbitmq.client.ShutdownSignalException;  
  
/**  
 * 客户端01  
 *   
 * @author liaokailin  
 * @version $Id: Receive01.java, v 0.1 2015年11月01日 下午3:47:58 liaokailin Exp $  
 */  
public class Receive01 {  
    public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,  
                                          ConsumerCancelledException, InterruptedException {  
        ConnectionFactory facotry = new ConnectionFactory();  
        facotry.setUsername("test");  
        facotry.setPassword("test");  
        facotry.setVirtualHost("test");  
        facotry.setHost("localhost");  
  
        Connection conn = facotry.newConnection(); //获取一个链接  
        //通过Channel进行通信  
        Channel channel = conn.createChannel();  
        int prefetchCount = 1;  
        channel.basicQos(prefetchCount); //保证公平分发  
  
        boolean durable = true;  
        //声明交换机  
        channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", durable); //按照routingKey过滤  
        //声明队列  
        String queueName = channel.queueDeclare("queue-01", true, true, false, null).getQueue();  
        //将队列和交换机绑定  
        String routingKey = "lkl-0";  
        //队列可以多次绑定,绑定不同的交换机或者路由key  
        channel.queueBind(queueName, Send.EXCHANGE_NAME, routingKey);  
  
        //创建消费者  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
          
        //将消费者和队列关联  
        channel.basicConsume(queueName, false, consumer); // 设置为false表面手动确认消息消费  
  
        //获取消息  
  
        System.out.println(" Wait message ....");  
        while (true) {  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());  
            String key = delivery.getEnvelope().getRoutingKey();  
  
            System.out.println("  Received '" + key + "':'" + msg + "'");  
            System.out.println(" Handle message");  
            TimeUnit.SECONDS.sleep(3); //mock handle message  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //确定该消息已成功消费  
        }  
  
    }  
}  



消息生产者
操作步骤:
1. 创建连接工厂ConnectionFactory
2. 获取连接Connection
3. 通过连接获取通信通道Channel
4. 发送消息

package org.lkl.mq.rabbitmq.test;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.ConfirmListener;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;  
  
/**  
 * 消息publish  
 *   
 * @author liaokailin  
 * @version $Id: Send.java, v 0.1 2015年10月22日 下午3:48:09 liaokailin Exp $  
 */  
public class Send {  
    public final static String EXCHANGE_NAME = "test-exchange";  
  
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  
        /**  
         * 配置amqp broker 连接信息  
         */  
        ConnectionFactory facotry = new ConnectionFactory();  
        facotry.setUsername("test");  
        facotry.setPassword("test");  
        facotry.setVirtualHost("test");  
        facotry.setHost("localhost");  
  
        Connection conn = facotry.newConnection(); //获取一个链接  
        //通过Channel进行通信  
        Channel channel = conn.createChannel();  
  
        // channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", true); //如果消费者已创建,这里可不声明  
        channel.confirmSelect(); //Enables publisher acknowledgements on this channel  
        channel.addConfirmListener(new ConfirmListener() {  
  
            @Override  
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {  
                System.out.println("[handleNack] :" + deliveryTag + "," + multiple);  
  
            }  
  
            @Override  
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {  
                System.out.println("[handleAck] :" + deliveryTag + "," + multiple);  
            }  
        });  
  
        String message = "lkl-";  
        //消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN  
        //发送多条信息,每条消息对应routekey都不一致  
        for (int i = 0; i < 10; i++) {  
            channel.basicPublish(EXCHANGE_NAME, message + (i % 2), MessageProperties.PERSISTENT_TEXT_PLAIN,  
                (message + i).getBytes());  
            System.out.println("[send] msg " + (message + i) + " of routingKey is " + (message + (i % 2)));  
        }  
  
    }  
}  


在设置消息被消费的回调前需显示调用
引用
channel.confirmSelect() 

否则回调函数无法调用

先执行消费者,消费者会轮询是否有消息的到来,在web控制也可以观察哦~~,再启动生产者发送消息。

rabbitmq 为每一个channel维护了一个delivery tag的计数器,这里采用正向自增,新消息投递时自增,当消息响应时自减

参考:http://blog.csdn.net/liaokailin/article/details/49558605

     http://blog.csdn.net/stonexmx/article/details/51885745
分享到:
评论

相关推荐

    3.3)消息的confirm机制1

    2. **添加确认监听器**:为了监听RabbitMQ服务器的确认响应,生产者需要添加一个`ConfirmListener`。当服务器发送ack时,监听器的`handleAck`方法会被调用。 3. **发送消息**:生产者使用`channel.basicPublish`...

    3.3)消息的确认Confirm1

    channel.addConfirmListener(new ConfirmListener() {...}); channel.basicPublish(...); ``` - **消费者**: 消费者的代码主要负责订阅和消费消息,不涉及Confirm机制。不过,确保正确配置和声明exchange、...

    ASP技术访问WEB数据库.docx

    ASP技术访问WEB数据库.docx

    2010-2019年上市公司排污费数据.xlsx

    2010-2019年上市公司排污费数据 1、时间:2010-2019年 2、来源:上市公司披露BG 3、指标:代码、日期、名称、本期支出 4、范围:417家上市公司 5、相关研究:胡珺,宋献中,王红建.非正式制度、家乡认同与企业环境治理

    六轴桌面机械臂上位机与下位机源码解析与实现

    内容概要:本文详细介绍了六轴桌面机械臂的上位机(PC)和下位机(单片机)源码实现及其应用场景。上位机使用Python编写,通过pyserial库进行串口通信,实现了用户交互和指令发送功能;下位机则使用Arduino平台,通过C/C++语言编写代码,实现了机械臂的动作控制。文中不仅展示了基本的通信协议和控制逻辑,还深入探讨了逆运动学计算、PID控制、数据同步等问题,并提供了多个实用的代码片段和调试经验。 适合人群:对机器人技术和嵌入式开发感兴趣的开发者,尤其是有一定编程基础和技术背景的人群。 使用场景及目标:适用于六轴桌面机械臂的开发和调试,帮助读者理解上下位机的协同工作原理,掌握机械臂控制的关键技术,如串口通信、逆运动学、PID调节等。 其他说明:文章强调了实际开发中的注意事项和常见问题,如数据同步、指令校验、运动规划等,并提供了一些优化建议和解决方案。此外,还提到了系统的扩展性和安全性措施,如限位保护和扩展接口的设计。

    青藏高原降水水汽来源模拟数据集(1998-2018)

    青藏高原降水的水汽来源及输送机制一直是国际水文气候学界关注的热点问题。由于高原地面观测站数量有限,且分布极不均匀,从而导致降水溯源存在很大不确定性。作者通过引入卫星降水数据来弥补站点观测降水的不足,从而对高原整体降水的水汽来源进行模拟性评估。作者通过1998-2018年间水汽追踪数值模型模拟高原整体降水的水汽来源,模型使用ERA-Interim再分析资料、TRMM卫星降水和GLDAS OAFlux蒸发作为数据驱动,并设置对比实验进行验证,最终生成高原整体降水的水汽来源月尺度数据。数据集内容包括:(1)青藏高原范围;(2)高原1998-2018年逐月降水水汽贡献数据,空间分辨率为1°×1°,单位:mm/mon;(3)高原1998-2018年逐月降水量。数据集存储为.nc、.shp和.xlsx格式,由8个数据文件组成,数据量为55 MB(压缩为1个文件,40.9 MB)。基于该数据集的分析研究成果已发表在《Environmental Research Letters》2020年15卷。Zhang, C. Moisture source assessment and the varying characteristics for the Tibetan Plateau precipitation using TRMM [J]. Environmental Research Letters, 2020, 15(10): 104003.

    Motorcad设计案例:内转子式永磁同步电机的高功率密度与强大过载能力,电磁场计算解析

    内容概要:本文详细介绍了利用MotorCAD进行32极36槽内转子永磁同步电机的设计过程,涵盖电磁场计算、极槽配合选择、绕组设计、磁钢布局、冷却系统设计等方面。通过分数槽配置、双层短距绕组、V型磁钢布局以及高效的冷却系统,实现了70kW输出、525rpm转速、2.5倍过载能力和高达5kW/kg的功率密度。文中还讨论了具体的参数设置及其背后的物理意义,如极距、绕组因数、磁钢涡流损耗控制等。 适合人群:从事电机设计的专业工程师和技术人员,尤其是对高功率密度和高性能电机感兴趣的读者。 使用场景及目标:适用于电动工程机械等需要短时爆发力的应用场合,旨在提高电机的功率密度和过载能力,同时确保高效稳定运行。 其他说明:文章提供了详细的参数配置代码片段,便于读者理解和复现设计过程。此外,还分享了一些实用的设计经验和优化技巧,如磁钢分段设计、转子冲片造型等。

    苏苏源码-python010-python-网络课程在线学习平台.zip

    标题Python网络课程在线学习平台研究AI更换标题第1章引言介绍Python网络课程在线学习平台的研究背景、意义、国内外现状和研究方法。1.1研究背景与意义阐述Python在线学习平台的重要性和研究意义。1.2国内外研究现状概述国内外Python在线学习平台的发展现状。1.3研究方法与论文结构介绍本文的研究方法和整体论文结构。第2章相关理论总结在线学习平台及Python教育的相关理论。2.1在线学习平台概述介绍在线学习平台的基本概念、特点和发展趋势。2.2Python教育理论阐述Python语言教学的理论和方法。2.3技术支持理论讨论构建在线学习平台所需的技术支持理论。第3章Python网络课程在线学习平台设计详细介绍Python网络课程在线学习平台的设计方案。3.1平台功能设计阐述平台的核心功能,如课程管理、用户管理、学习跟踪等。3.2平台架构设计给出平台的整体架构,包括前后端设计、数据库设计等。3.3平台界面设计介绍平台的用户界面设计,强调用户体验和易用性。第4章平台实现与测试详细阐述Python网络课程在线学习平台的实现过程和测试方法。4.1平台实现介绍平台的开发环境、技术栈和实现细节。4.2平台测试对平台进行功能测试、性能测试和安全测试,确保平台稳定可靠。第5章平台应用与效果分析分析Python网络课程在线学习平台在实际应用中的效果。5.1平台应用案例介绍平台在实际教学或培训中的应用案例。5.2效果评估与分析通过数据分析和用户反馈,评估平台的应用效果。第6章结论与展望总结Python网络课程在线学习平台的研究成果,并展望未来发展方向。6.1研究结论概括本文关于Python在线学习平台的研究结论。6.2研究展望提出未来Python在线学习平台的研究方向和发展建议。

    西门子S7-1200PLC自定义堆栈FB块实现:先进先出与后进后出功能的数据管理程序

    内容概要:本文详细介绍了为西门子S7-1200 PLC开发的一个自定义堆栈程序。由于S7-1200未提供内置堆栈功能,作者使用SCL(Structured Control Language)编写了一个通用型堆栈功能块(FB),能够实现FIFO(先进先出)和LIFO(后进先出)的数据管理。该堆栈程序支持多种数据类型(如BOOL、REAL、DWORD等),并提供了入栈、出栈、清空等功能。文中还讨论了具体的实现细节,如边界检测、指针管理和环形缓冲区的设计,以及在实际工业环境中的应用效果。 适合人群:从事PLC编程、自动化控制系统开发的技术人员,尤其是熟悉西门子S7-1200系列PLC的工程师。 使用场景及目标:适用于需要临时存储和管理数据的应用场景,如生产线上的配方管理、设备故障回溯、日志记录等。通过自定义堆栈程序,可以提高数据处理效率,减少因缺乏内置堆栈功能而带来的不便。 其他说明:该堆栈程序已在实际生产环境中运行超过三个月,处理了大量数据,表现出良好的稳定性和性能。未来计划进一步优化,如改进为环形缓冲区以提升性能。

    GIS在林业管理系统中的应用.pdf

    GIS在林业管理系统中的应用.pdf

    C语言专业课程设计销售标准管理系统.doc

    C语言专业课程设计销售标准管理系统.doc

    基于 Python 的高校学生职业推荐系统的设计与实现LW+PPT.zip

    基于 Python 的高校学生职业推荐系统的设计与实现LW+PPT

    电动汽车Simulink仿真模型:整车动力性及NEDC工况能耗测试模型

    内容概要:本文详细介绍了基于Simulink平台构建的电动汽车仿真模型,涵盖整车动力性测试(如最高车速、最大爬坡能力和加速时间)和NEDC工况下的能耗测试。模型由驾驶员模型、VCU控制模型、电机系统和电池系统四个主要部分构成,通过协同工作完成各项性能指标的仿真测试。文中还展示了多个关键环节的具体实现细节,如PID控制、扭矩限制、电池能量管理等。 适合人群:从事电动汽车研发的技术人员、高校相关专业师生、对电动汽车仿真感兴趣的工程爱好者。 使用场景及目标:①用于电动汽车的设计阶段,评估不同设计方案的动力性能和能耗水平;②作为教学工具,帮助学生理解电动汽车的工作原理和技术难点;③为企业提供技术支持,优化现有车型的性能表现。 其他说明:文中提供了大量MATLAB/Simulink代码片段,便于读者理解和复现实验结果。同时强调了模型的实际应用价值及其对未来电动汽车发展的指导意义。

    2025年计算机二级考试C试卷及答案.doc

    2025年计算机二级考试C试卷及答案.doc

    苏苏源码-python011-django基于Python的毕业生去向反馈调查平台的设计与实现(论文+PPT).zip

    标题Django基于Python的毕业生去向反馈调查平台设计与实现AI更换标题第1章引言介绍研究背景、意义,分析国内外相关平台的现状,并阐述论文的研究方法和创新点。1.1研究背景与意义说明毕业生去向反馈的重要性及现有调查方式的不足。1.2国内外研究现状概述国内外在毕业生去向反馈调查平台方面的发展现状。1.3研究方法与创新点阐述本文采用的研究方法和在平台设计中的创新之处。第2章相关理论与技术介绍Django框架、Python语言以及相关的Web开发技术。2.1Django框架概述简述Django框架的特点、优势及其在Web开发中的应用。2.2Python语言基础概述Python语言的基本语法、特点及其在Web开发中的作用。2.3Web开发相关技术介绍与平台设计相关的Web前端技术、数据库技术等。第3章平台需求分析对毕业生去向反馈调查平台进行需求分析,包括功能需求和非功能需求。3.1功能需求分析详细阐述平台应具备的各项功能,如用户管理、问卷调查、数据分析等。3.2非功能需求分析分析平台的性能、安全性、易用性等非功能需求。第4章平台设计根据需求分析结果,设计平台的整体架构、功能模块和数据库。4.1平台整体架构设计给出平台的整体架构图,并说明各个组成部分的作用。4.2功能模块设计详细设计平台的各个功能模块,包括用户模块、问卷模块、数据分析模块等。4.3数据库设计设计平台的数据库结构,包括数据表的设计、数据关系的建立等。第5章平台实现与测试介绍平台的实现过程、关键代码以及测试方法和结果。5.1平台实现阐述平台的实现过程,包括开发环境的搭建、代码的编写等。5.2关键代码展示展示实现平台功能的关键代码片段,如用户认证、问卷调查等。5.3平台测试说明平台的测试方法,包括功能测试、性能测试等,并给出测试结果。第6章结论与展望总结论文的研究成果,指出平台的优点与不足,并展望未来的研究方向。6.

    详解C# TCP IP 客户端与服务器工程源码,包含字节、字、浮点数、字符串数据的交互,并实现中英文聊天功能,以西门子S7-200Smart通讯为例

    内容概要:本文详细介绍了使用C#实现TCP/IP客户端与服务器之间的数据交互,涵盖字节、整型、浮点数、字符串等多种数据类型的处理,并特别强调了中英文字符串的交互功能。此外,文章深入探讨了与西门子S7-200Smart工业设备的通讯方式,包括协议适配、字节序处理、数据帧构建等关键技术点。文中提供了丰富的代码示例,如TcpListener的初始化、客户端连接、数据读取与发送、以及针对工业设备的特殊数据处理方法。同时,作者分享了许多实践经验,如避免字节序错误、处理浮点数精度问题、使用Wireshark抓包工具等。 适合人群:具有一定C#编程基础,尤其是对网络编程和工业自动化感兴趣的开发者和技术爱好者。 使用场景及目标:适用于需要实现C# TCP/IP通信的项目,特别是涉及工业设备通讯的场景。目标是掌握TCP/IP通信的基本原理及其在工业自动化领域的应用,能够独立完成与西门子S7-200Smart设备的通讯开发。 其他说明:文章不仅提供理论讲解,还有大量实战代码和技巧分享,帮助读者快速理解和应用所学知识。建议读者在实践中结合Wireshark等工具进行调试,以便更好地理解数据传输过程。

    腹部CT扫描用于检测癌症的轴向切片数据集

    腹部CT扫描 用于检测癌症的轴向切片 腹部CT扫描数据集 用于检测癌症的轴向切片 欢迎使用这个强大的数据集,该数据集以腹部CT扫描的轴向切片为特色,在诊断癌症的过程中收集。 该资源是医学影像爱好者的金矿,非常适合推进医疗技术的研究和构建创新工具! 该数据集包含在轴向切片中采集的腹部计算机断层扫描(CT),最初是为了识别癌症的体征而采集的。无论您是从事医学成像、图像分割还是自动诊断,这些图像都为探索和创新提供了绝佳的机会。 里面是什么? 可能是带有CT扫描的ZIP文件](93.9 MB)一个压缩的档案,其中包含腹部CT图像,可能是DICOM或其他标准医疗格式。打开它以显示完整的收藏! 你如何使用它 通过这些激动人心的应用程序释放您的创造力: 胃癌症检测:建立和测试算法,像专业人士一样在CT扫描中发现癌症迹象。 图像分割:掌握精确勾勒腹部器官和潜在肿瘤的艺术。 医学影像研究:突破CT图像分析和处理技术的界限。 标签 医学影像·图像分割·癌症·CT扫描

    恒压供水一拖二控制程序图纸:西门子PLC与昆仑通态触摸屏的变频互锁控制,双PID调速,稳定可靠,多重选择应用

    内容概要:本文详细介绍了基于西门子S7-224XP PLC和昆仑通态触摸屏的恒压供水一拖二控制系统。该系统不仅支持工频和变频切换,还能作为纯变频方案使用。硬件方面,采用224XP带两个串口连接触摸屏和MODBUS通讯,配备EM232模拟量输出模块发送控制信号。软件部分展示了关键的梯形图代码,包括主泵切换逻辑、双PID调节、工变频互锁等。此外,还提供了触摸屏组态建议,确保系统的高扩展性和灵活性。文中强调了调试技巧和注意事项,如模拟量输出的软件滤波、变频器故障信号隔离等。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是对PLC编程和恒压供水系统感兴趣的读者。 使用场景及目标:适用于需要主备泵轮换或同时运行的恒压供水系统。目标是帮助工程师理解和实施高效稳定的工变频互锁控制方案,提高系统的可靠性和适应性。 其他说明:文中提到的具体代码片段和硬件配置有助于实际项目的快速部署和调试。对于希望深入了解PLC编程和恒压供水系统的人来说,本文提供了宝贵的实践经验和技术细节。

    20XX年农村信息化建设方案.docx

    20XX年农村信息化建设方案.docx

    Arduino串口源代码100%好用.7z

    Arduino串口源代码100%好用.7z

Global site tag (gtag.js) - Google Analytics