package com.yl.common.activemq; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.springframework.jms.support.converter.MessageType; import com.yl.common.utils.LogUtil; /** * ActiveMq消息相关操作 * @author huangzp * @date 2015-3-30 */ public class ActiveMqMessage { /** activemq 消息队列名 */ public static final String QUEUE_NAME = "im_queue"; /** activemq 主题队列名 */ public static final String TOPIC_NAME = "im_topic"; private static final String MESSAGE_HEAD = "uid"; /** * 发送消息到activemq * @param connection activemq长连接 * @param destinationType 队列消息或主题消息 * @param destinationName 消息队列名称 * @param msgType 消息类型,如文本 * @param msg 消息 * @param toUserId 接收的用户ID * @param priority 消息优先级 0-9,数字越大越优先 * @return isSuccess 是否发送成功 */ protected boolean send(Connection connection, DestinationType destinationType, MessageType msgType, String msg, String toUserId, Integer priority) { boolean isSuccess = true; // Session: 一个发送或接收消息的线程 Session session = null; // Destination :消息的目的地;消息发送给谁. Destination destination = null; // MessageProducer:消息发送者 MessageProducer producer = null; try { // 启动 connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); if(DestinationType.QUEUE.equals(destinationType)){ //队列消息 destination = session.createQueue(QUEUE_NAME); }else if(DestinationType.TOPIC.equals(destinationType)){ //主题消息 destination = session.createTopic(TOPIC_NAME); }else{ throw new RuntimeException("DestinationType非法,不是队列消息或主题消息"); } producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); if(null != priority){ producer.setPriority(priority); } // 发送消息 sendMessage(session, producer, msgType, msg, toUserId); session.commit(); } catch (Exception e) { LogUtil.log.error(e.getCause(), e); isSuccess = false; } return isSuccess; } private void sendMessage(Session session, MessageProducer producer, MessageType msgType, String msg, String toUserId) throws Exception { Message message = null; if(MessageType.TEXT.equals(msgType)){ //文本消息 message = session.createTextMessage(msg); }else{ //默认以文本消息处理 message = session.createTextMessage(msg); } message.setStringProperty(MESSAGE_HEAD, toUserId); // message.setJMSExpiration(3*30*24*60*60*1000); //设置消息过期时间 producer.send(message); } /** * 从activemq接收消息 * @param connection * @param destinationType 队列消息或主题消息 * @param userId 接收指定用户ID的消息 * @param client 当前用户的socket连接 * @throws Exception */ protected MessageConsumer receveive(Connection connection, DestinationType destinationType, String userId) throws Exception { // Destination :消息的目的地;消息发送给谁. Destination destination = null; // 消费者,消息接收者 MessageConsumer consumer = null; Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); // 客户端手动签收消息 if (DestinationType.QUEUE.equals(destinationType)) { // 队列消息 destination = session.createQueue(QUEUE_NAME); } else if (DestinationType.TOPIC.equals(destinationType)) { // 主题消息 destination = session.createTopic(TOPIC_NAME); } else { throw new RuntimeException("DestinationType非法,不是队列消息或主题消息"); } String messageSelector = MESSAGE_HEAD + "='" + userId + "'"; // 消息过滤器 consumer = session.createConsumer(destination, messageSelector); return consumer; } }
相关推荐
SpringBoot+ActiveMq+MQTT实现消息的发送和接收 后台消费者、生产者、消息发送接口、发送消息业务类等相关配置
activeMQ的发送消息后接收者返回信息
activemq 通过ajax发送接收消息简单例子
本代码关于activemq-cpp的核心代码参考的chenxun2009的博客园,其他部分包括:从配置文件中读取消息通道,过滤条件等信息。
springboot集成activemq实现消息接收demo
这是我精力整理的ActiveMQ发送和接收protobuf协议消息的实例。 也对ActiveMQ进行了简化封装,也配置了自动重连机制,亲测可用!
springboot整合ActiveMQ源码,适合范围消息队列入门小伙伴,对ActiveMQ消息队列不太了解,不知道如何发送消息,接收消息可以围观。
activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行
实现了ActiveMQ的初步封装,比较适合新手入门学习,简单明了
Android基于MQTT协议利用ActiveMQ发送消息给Android端接收,属于长连接那种,类似Socket通信
今天小编就为大家分享一篇python 发送和接收ActiveMQ消息的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
使用ActiveMQ由C++控制台程序发送消息,C#控制台程序接收消息。
主要介绍了php 使用ActiveMQ发送消息,与处理消息操作,结合实例形式分析了php使用ActiveMQ实现消息的发送与接收处理相关操作技巧,需要的朋友可以参考下
这个demo完整实现了activeMQ发送消息,接收消息,以及MQ详解文档
基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:Spring ...本例通过详细的说明和注释,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以通过JmsTemplate来方便地实现消息服务。
关于jms activemq demo,内容实现了消息的发送和接收,并含有一定的注释,容易理解
activeMQ JMS 消息推送 Spring整合activeMQ 消息发送者 接收者