Queue实现的是点到点模型,在下面的例子中,启动2个消费者共同监听一个Queue,然后循环给这个Queue中发送多个消息,我们依然采用ActiveMQ。
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class QueueTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
//创建一个Queue
Queue queue = new ActiveMQQueue("testQueue");
//创建一个Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//注册消费者1
MessageConsumer comsumer1 = session.createConsumer(queue);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//注册消费者2
MessageConsumer comsumer2 = session.createConsumer(queue);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer2 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//创建一个生产者,然后发送多个消息。
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
}
}
运行这个例子会得到下面的输出结果:
Consumer1 get Message:0
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:9
可以看出每个消息直被消费了一次,但是如果有多个消费者同时监听一个Queue的话,无法确定一个消息最终会被哪一个消费者消费。
分享到:
相关推荐
JMS中topic和queue两种实现方式
concurrentqueue, 一种快速多消费者多消费者锁空闲并发队列 moodycamel::ConcurrentQueue面向 C 的工业强度锁自由队列。注意:如果你只需要一个生产者,单个消费者队列,我有其中的一个太多。特性Knock-your-socks-...
C++ Queue(带上限的) 上限可以改,做小题可以
MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战
msqueue_config_loader 定义消息队列配置结构并实现LoadByFile和LoadByBytes接口。 type QueueConfig struct {Url string `..."queue_name" yaml:"queue_name"`MaxDequeueCount int `json:"max_dequeue_count" yaml:"m
jms资料 Message Queue 3 技术概述
moodycamel :: ConcurrentQueue C ++的工业级无锁队列。 注意:如果您需要的只是一个单一生产者,单一消费者队列,那么我也可以选择。 特征 击倒你的。 单头实现。 只需将其放入您的项目中即可。 完全线程安全的...
javax.jms.Queue.class javax.jms.TopicSubscriber.class javax.jms.QueueBrowser.class javax.jms.TemporaryQueue.class javax.jms.TemporaryTopic.class javax.jms.ServerSession.class javax.jms....
spring-jms使用queue发送消息简单例子,详细参考:http://blog.csdn.net/xiejx618/article/details/38849249
NULL 博文链接:https://jjjssh.iteye.com/blog/2099106
1.抽象数据类型Queue 1.1 1.2 2.Python实现ADT Queue ...class Queue: #初始化 def _init_(self): self.items=[] def isEmpty(self): return self.items==[] #插入:队尾 def enqueue(self,item): self.it
#PhpQueue PhpQueue库为执行php脚本提供了队列。... ## Queue ###简单队列 ###将任务添加到队列 PhpQueue具有以下组件:Queue,QueueDriver,TaskPerformer和Task。 #####示例 <? use PhpQueue \ Queue ; use PhpQ
此板条箱提供了三种实现: 无限( deadqueue::unlimited::Queue ) 基于crossbeam_queue::SegQueue 无限容量,无推背压力通过Cargo.toml的unlimited功能启用可调整deadqueue::resizable::Queue ( deadqueue::...
moodycamel :: ConcurrentQueue用于C ++的工业强度无锁队列。 注意:如果您需要的只是一个单一生产者,单一消费者队列,那么我也可以选择其中之一。 特色快如闪电般的快速pe moodycamel :: ConcurrentQueue C ++的...
是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,和J2EE1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的Java虚拟机。消息异步接受,减少软件多系统集成...
Joern Reder的与DVD :: Rip一起使用的项目。 使您可以将现有的DVDRip项目排队,这样就不必麻烦您的计算机了。
TypeError: _queue_reduction(): incompatible function arguments. The following argument types are supported: 1. (process_group: torch.distributed.ProcessGroup, grads_batch: List[List[at::Tensor]], ...
php-queue 是 PHP开发的磁盘存储消息队列服务,基于leveldb和swoole ,在4核机器上处理能力可以达到2.5W/s 。leveldb: https://github.com/google/leveldbphp-leveldb: https://github.com/reeze/php-leveldbswoole:...
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), @...