与Queue不同的是,Topic实现的是发布/订阅模型,在下面的例子中,启动2个消费者共同监听一个Topic,然后循环给这个Topic中发送多个消息。
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
public class TopicTest {
/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException {
// TODO Auto-generated method stub
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER
, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
//JMS 客户端到JMS Provider 的连接
Connection connection = factory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Topic topic = new ActiveMQTopic("topicTest");
MessageConsumer comsumer1 = session.createConsumer(topic);
comsumer1.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("Consumer 1 get " + ((TextMessage)message).getText());
} catch (Exception e) {
e.printStackTrace();
}
}
});
MessageConsumer comsumer2 = session.createConsumer(topic);
comsumer2.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("Consumer 2 get " + ((TextMessage)message).getText());
} catch (Exception e) {
e.printStackTrace();
}
}
});
MessageProducer producer = session.createProducer(topic);
for (int i=0; i<10; i++) {
producer.send(session.createTextMessage("Message:" + i));
}
}
}
运行后得到下面的输出结果:
Consumer 1 get Message:0
Consumer 2 get Message:0
Consumer 1 get Message:1
Consumer 2 get Message:1
Consumer 1 get Message:2
Consumer 2 get Message:2
Consumer 1 get Message:3
Consumer 2 get Message:3
Consumer 1 get Message:4
Consumer 2 get Message:4
Consumer 1 get Message:5
Consumer 2 get Message:5
Consumer 1 get Message:6
Consumer 2 get Message:6
Consumer 1 get Message:7
Consumer 2 get Message:7
Consumer 1 get Message:8
Consumer 2 get Message:8
Consumer 1 get Message:9
Consumer 2 get Message:9
说明每一个消息都会被所有的消费者消费。
分享到:
相关推荐
springboot集成kafka实战项目代码 项目介绍地址:https://blog.csdn.net/qq_38105536/article/details/122308040
测评开始时,测评程序会启动10~20个Producer,每个Producer在一条线程中,然后每个Producer随机生产某个Topic或者附属于Queue的消息并发送到消息引擎; Topic: 消息主题。 Queue: 队列。抽象的概念,消息可以发送到...
[Manning Publications] ASP.NET MVC 4 实战 (英文版) [Manning Publications] ASP.NET MVC 4 in Action (E-Book) ☆ 图书概要:☆ ASP.NET MVC 4 in Action is a hands-on guide that shows you how to apply ...
深圳云栖大会阿里云IoT物联网套件开发实战
首先,让我们来看一下基础的消息(Message)相关术语: Topic: Kafka按照Topic分类来维护消息 Producer: 我们将发布(p
kafka从入门基础,深入理解、核心设计,包括:kafka结构中的broker\topic\Partition\Producer\Consumer等
1. 发送消息的时候指定会指定topic,如果producer 集合中没有会根据指定topic 到namesrv 获取 topic 发布信息TopicPublishInfo,并放入本地集合。 2. 定时从namesrv 更新topic 路由信息, Producer 与broker 间的...
KSQL是一个用于Apache kafka的... KSQL在内部使用Kafka的Streams API,并且它们共享与Kafka流处理相同的核心抽象,KSQL有两个核心抽象,它们对应于到Kafka Streams中的两个核心抽象,让你可以处理kafka的topic数据。
首先,让我们来看一下基础的消息(Message)相关术语: Topic: Kafka按照Topic分类来维护消息 Producer: 我们将发布(p
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("web.#"); } @Bean Binding bindingExchangeMessages(Queue ...
本文来自hahack,文章介绍如何利用 GitlabAPI实现一套简单灵活的数据同步机制,从而实现在多个Gitlab 站点间同步数据。 在继续写数学系列前,我想切回去之前的Git系列写点东西。...保存现场跳去另一个topic写点东
首先介绍了Kafka的核心概念,如Broker、Topic、Producer、Consumer等,随后详细阐述了Kafka的安装步骤、配置过程以及其与Zookeeper的交互。通过对Kafka生产者和消费者的代码示例分析,文档展示了消息的发送和接收...
软硬件技术ITIPS 联想超级公开课程四大提纲...• Topic 1: P多屏复制• Topic 2: Intel RAID状态 • Topic 3: BitLocker加密• Topic 4: PM981驱动。内容丰富,有图文解析问答!在日常IT运维软件设置必然用到实战上!
目标 建立一个 lesson4 项目,在其中编写代码。 ... "href": "http://cnodejs.org/topic/541ed2d05e28155f24676a12", "comment1": "呵呵呵呵" }, { "title": "发布一款 Sublime Text 下的 Java
微服务架构实战demo, 使用 go 语言技术栈,包含如下组件: 服务注册中心 etcd Api 网关 Feed 服务 Profile 服务 Topic 服务 监控组件: prometheus grafana 跟踪组件: ...
以下介绍来自阿里云https://developer.aliyun.com/topic/java20?utm_content=g_1000161793 阿里巴巴《Java开发手册(嵩山版)》今日重磅发布!1300个日夜兼程,虚静出内功,嵩山版首次新增前后端规约等内容,全面...
Kafka实战记录-目录:https://blog.csdn.net/weixin_39565597/article/details/104402046 1、启动kafka # 启动kafka kafka-server-start.sh [-daemon] server.properties [--override property=value]* # -daemon ...
《ChinaITLab Linux实战工程师网校课程-8CD》[RMVB] http://www.verycd.com/topics/97036/ http://www.verycd.com/topics/116878/ http://www.ixpub.net/thread-710414-1-1.html LINUX和Redhat (红帽RHCE)教程和...
注:该资源来源于CSDN游戏开发论坛,作者是gantleman,非常感谢他无私的分享了自己关于游戏服务端的开发经验。...原帖地址:http://topic.csdn.net/u/20100617/21/d9b20c80-6e95-4922-acaf-eddfcf54aaed.html
里面包含了在Fink实时数仓所要用到的大部分实现函数以及工具类,在项目实战中十分实用,在企业开发中可能也会用到。并且附赠了大部分操作的解释,无论借用还是学习都十分友好。后续会添加更多。。。。 函数:...