`

activemq消息队列-点对点通讯

    博客分类:
  • java
阅读更多

 

  点对点通讯:消息发送-Queue-消息接收,消息发送到队列,消息接收者阻塞式接收消息。

                       如果没有消息,接收消息方法receive()阻塞。

 

  package mq.p2p;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producter {

    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的链接地址
    //private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String BROKEN_URL = "failover://tcp://192.168.191.128:61616";
   
    AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection  = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(这里通过参数可以设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
           while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

package mq.p2p;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    //private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String BROKEN_URL = "failover://tcp://192.168.191.128:61616";
   
    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                System.out.println("..");
                TextMessage msg = (TextMessage) consumer.receive();
                System.out.println("....");
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

package mq.p2p;

public class ProducterTest {
    public static void main(String[] args){
        Producter producter = new Producter();
        producter.init();
        ProducterTest testMq = new ProducterTest();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
    }

    private class ProductorMq implements Runnable{
        Producter producter;
        public ProductorMq(Producter producter){
            this.producter = producter;
        }

        @Override
        public void run() {
            while(true){
                try {
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

package mq.p2p;

public class ConsumerTest {
    public static void main(String[] args){
        Consumer comsumer = new Consumer();
        comsumer.init();
        ConsumerTest testConsumer = new ConsumerTest();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

    private class ConsumerMq implements Runnable{
        Consumer comsumer;
        public ConsumerMq(Consumer comsumer){
            this.comsumer = comsumer;
        }

        @Override
        public void run() {
            while(true){
                try {
                    comsumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

分享到:
评论

相关推荐

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386

    boot-example-activemq-queue-2.0.5

    ActiveMQ消息中间件的点对点模式point to point 消息队列 * 消息消费者从queue中取出并且消费消息 * 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息 * queue中支持存在多个消费...

    消息队列介绍和SpringBoot2.x整合RockketMQ、ActiveMQ

    1、介绍ActiveMQ5.x消息队列基础特性和本地快速安装 2、SpringBoot2.x整合ActiveMQ实战之点对点消息

    activemq-demo.zip

    SpringBoot整合ActiveMQ。消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。...消息形式支持点对点和订阅-发布。

    消息队列学习(springboot+kafka+activemq)

    基于springboot构建消息队列通信demo,针对kafka、activemq初学者,安装部署好activemq和kafka后,修改application.yml 。启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer ...

    springboot整合activemq 消费者 ACK手动确认 &消息重发

    springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。

    2小时学会Spring+Dubbo整合ActiveMQ消息队列

    让你更了解互联网是如何解决高并发 学完SSM框架的同学就可以学习,能让你切身感受到企业级开发环境目标1:理解消息中间件、JMS等概念目标2:掌握JMS点对点与发布订阅模式的收发消息目标3:掌握SpringJms目标4:完成...

    springboot整合activemq 生产者 一对一,一对多

    springboot整合 activeMq 生产者 发送消息 包含队列模式点对点发送消息 以及 主题模式一对多发送消息 这是生产者的demo producer; 需要配合消费者的demo consumer 使用

    ActiveMQ——Java连接ActiveMQ(点对点)

    点对点的消息传递中,目的地被称为队列(Queue)点对点消息传递的特点如下:(1)每个消息只能有一个消费者,类似1对1的关系,好比个人快递自己领取自己的(2)消息的生产者和消费者之间没有时间上的相关性。...

    ActiveMQ in Action最新版

    任务队列系统使用了点对点模式。消息生产者发送工作消息到 JMS 队列,消费者从这个队列中接收消息并处理。点对点模式不需要生产者和消费者同时在线。队列会一直保留收到的消息,直到有消费者把它消费掉。当消费者...

    Docker学习之搭建ActiveMQ消息服务的方法步骤

    前言 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经...点对点或队列模式 包含三个角色:消息队列(Queue),发送

    PearMQ:利用对等连接模型的消息队列替代(ActiveMQ)

    PearMQ-消息队列对等替代 PearMQ是传统JMS和MQ中间件基础结构的替代方案。 它将REST Web服务的简单性与JMS的性能和可管理性结合在一起,而无需其他中间件基础结构。 为什么要点对点? 传统上,计算机系统以客户端-...

    activemq的几种基本通信方式总结

    总的来说,消息规范里面定义最常见的几种消息通信模式主要有发布-订阅、点对点这两种。另外,通过结合这些模式的具体应用,我们在处理某些应用场景的时候也衍生出来了一种请求应答的模式。下面,我们针对这几种方式...

    单点登录源码

    ActiveMQ | 消息队列 | [http://activemq.apache.org/](http://activemq.apache.org/) JStorm | 实时流式计算框架 | [http://jstorm.io/](http://jstorm.io/) FastDFS | 分布式文件系统 | ...

    Android代码-advanced-java

    消息队列 为什么使用消息队列?消息队列有什么优点和缺点?Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么优点和缺点? 如何保证消息队列的高可用? 如何保证消息不被重复消费?(如何保证消息消费的幂等性) 如何...

    java8stream源码-reactivemq:一个基于Akka的ActiveMQ客户端

    开发人员提供一个对 ActiveMQ 消息传递感到熟悉的接口。 在这一点上,它是一个相当低级的面向连接的接口,可以使用 AMQ 队列和主题,将消息发送到端点,并使用临时队列执行请求-回复消息,Camel 风格。 将根据需要/...

Global site tag (gtag.js) - Google Analytics