点对点通讯:消息发送-Queue-消息接收,消息发送到队列,消息接收者阻塞式接收消息。
如果没有消息,接收消息方法receive()阻塞。
package mq.p2p;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producter {
//ActiveMq 的默认用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//ActiveMq 的默认登录密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//ActiveMQ 的链接地址
//private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String BROKEN_URL = "failover://tcp://192.168.191.128:61616";
AtomicInteger count = new AtomicInteger(0);
//链接工厂
ConnectionFactory connectionFactory;
//链接对象
Connection connection;
//事务管理
Session session;
ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
public void init(){
try {
//创建一个链接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
//从工厂中创建一个链接
connection = connectionFactory.createConnection();
//开启链接
connection.start();
//创建一个事务(这里通过参数可以设置事务的级别)
session = connection.createSession(true,Session.SESSION_TRANSACTED);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void sendMessage(String disname){
try {
//创建一个消息队列
Queue queue = session.createQueue(disname);
//消息生产者
MessageProducer messageProducer = null;
if(threadLocal.get()!=null){
messageProducer = threadLocal.get();
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while(true){
Thread.sleep(1000);
int num = count.getAndIncrement();
//创建一条消息
TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
"productor:我是大帅哥,我现在正在生产东西!,count:"+num);
System.out.println(Thread.currentThread().getName()+
"productor:我是大帅哥,我现在正在生产东西!,count:"+num);
//发送消息
messageProducer.send(msg);
//提交事务
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package mq.p2p;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String BROKEN_URL = "failover://tcp://192.168.191.128:61616";
ConnectionFactory connectionFactory;
Connection connection;
Session session;
ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
AtomicInteger count = new AtomicInteger();
public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void getMessage(String disname){
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;
if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){
Thread.sleep(1000);
System.out.println("..");
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("....");
if(msg!=null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package mq.p2p;
public class ProducterTest {
public static void main(String[] args){
Producter producter = new Producter();
producter.init();
ProducterTest testMq = new ProducterTest();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
}
private class ProductorMq implements Runnable{
Producter producter;
public ProductorMq(Producter producter){
this.producter = producter;
}
@Override
public void run() {
while(true){
try {
producter.sendMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
package mq.p2p;
public class ConsumerTest {
public static void main(String[] args){
Consumer comsumer = new Consumer();
comsumer.init();
ConsumerTest testConsumer = new ConsumerTest();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
}
private class ConsumerMq implements Runnable{
Consumer comsumer;
public ConsumerMq(Consumer comsumer){
this.comsumer = comsumer;
}
@Override
public void run() {
while(true){
try {
comsumer.getMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
相关推荐
SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386
ActiveMQ消息中间件的点对点模式point to point 消息队列 * 消息消费者从queue中取出并且消费消息 * 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息 * queue中支持存在多个消费...
1、介绍ActiveMQ5.x消息队列基础特性和本地快速安装 2、SpringBoot2.x整合ActiveMQ实战之点对点消息
SpringBoot整合ActiveMQ。消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。...消息形式支持点对点和订阅-发布。
基于springboot构建消息队列通信demo,针对kafka、activemq初学者,安装部署好activemq和kafka后,修改application.yml 。启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer ...
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。
让你更了解互联网是如何解决高并发 学完SSM框架的同学就可以学习,能让你切身感受到企业级开发环境目标1:理解消息中间件、JMS等概念目标2:掌握JMS点对点与发布订阅模式的收发消息目标3:掌握SpringJms目标4:完成...
springboot整合 activeMq 生产者 发送消息 包含队列模式点对点发送消息 以及 主题模式一对多发送消息 这是生产者的demo producer; 需要配合消费者的demo consumer 使用
点对点的消息传递中,目的地被称为队列(Queue)点对点消息传递的特点如下:(1)每个消息只能有一个消费者,类似1对1的关系,好比个人快递自己领取自己的(2)消息的生产者和消费者之间没有时间上的相关性。...
任务队列系统使用了点对点模式。消息生产者发送工作消息到 JMS 队列,消费者从这个队列中接收消息并处理。点对点模式不需要生产者和消费者同时在线。队列会一直保留收到的消息,直到有消费者把它消费掉。当消费者...
前言 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经...点对点或队列模式 包含三个角色:消息队列(Queue),发送
PearMQ-消息队列对等替代 PearMQ是传统JMS和MQ中间件基础结构的替代方案。 它将REST Web服务的简单性与JMS的性能和可管理性结合在一起,而无需其他中间件基础结构。 为什么要点对点? 传统上,计算机系统以客户端-...
总的来说,消息规范里面定义最常见的几种消息通信模式主要有发布-订阅、点对点这两种。另外,通过结合这些模式的具体应用,我们在处理某些应用场景的时候也衍生出来了一种请求应答的模式。下面,我们针对这几种方式...
ActiveMQ | 消息队列 | [http://activemq.apache.org/](http://activemq.apache.org/) JStorm | 实时流式计算框架 | [http://jstorm.io/](http://jstorm.io/) FastDFS | 分布式文件系统 | ...
消息队列 为什么使用消息队列?消息队列有什么优点和缺点?Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么优点和缺点? 如何保证消息队列的高可用? 如何保证消息不被重复消费?(如何保证消息消费的幂等性) 如何...
开发人员提供一个对 ActiveMQ 消息传递感到熟悉的接口。 在这一点上,它是一个相当低级的面向连接的接口,可以使用 AMQ 队列和主题,将消息发送到端点,并使用临时队列执行请求-回复消息,Camel 风格。 将根据需要/...