只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效
RabbitMQ3.5以后已经集成了rabbitmq_priority_queue
引用
验证方式:
触发为及时消费场景,常用场景与Qos结合使用
1、可先发送消息,再进行消费
2、开启手动应答、设置Qos。若为1,在一个消费者存在的情况下,除第一个消息外均按优先级进行消费(第一个消息被及时消费掉了)
3、可在方式二的基础上不断增加消费者,也符合优先调用规则
为消息设置优先级别:
//随机设置消息优先级
Builder properties=new BasicProperties.Builder();
int priority=new Random().nextInt(10);
properties.priority(priority);//建议0~255,超过貌似也没问题
channel.basicPublish(exchange_name, "", properties.build(), SerializationUtils.serialize(_mes));
为队列创建优先级别:
//设置队列的优先级,消息的优先级大于队列的优先级,以较小值为准(例如:队列优先级5、消息优先级8,消息实际优先级为5)
Map<String, Object> args=new HashMap<String, Object>();
args.put("x-max-priority", 10);//队列优先级只能声明一次,不可改变(涉及到硬盘、内存存储方式)
channel.queueDeclare(queueName, false, false, false, args);
队列、消息上均要设置优先级才可生效,以较小值为准
队列优先级只能声明一次,不可改变(涉及到硬盘、内存存储方式)
优先级队列在内存、硬盘、cpu会有成本消耗,不建议创建大量的优先级别(数量、级别种类、大级别,理解混乱,英文理解困难...)
package com.demo.mq.rabbitmq.example10;
import java.io.IOException;
import java.io.Serializable;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 优先队列测试消息发送类
* @author sheungxin
*
*/
public class PrioritySend {
private static String exchange_name="priority_direct";
public static void prioritySend(Serializable mes) throws IOException, TimeoutException{
Connection conn=MqManager.newConnection();
Channel channel=conn.createChannel();
channel.exchangeDeclare(exchange_name, BuiltinExchangeType.DIRECT);
//发送10条消息
for(int i=0;i<10;i++){
//随机设置消息优先级
Builder properties=new BasicProperties.Builder();
int priority=new Random().nextInt(10);
properties.priority(priority);//建议0~255,超过貌似也没问题
String _mes=mes.toString()+i;
channel.basicPublish(exchange_name, "", properties.build(), SerializationUtils.serialize(_mes));
System.out.println(priority+" "+_mes);
}
channel.close();
conn.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
prioritySend("priority send:hello world!");
}
}
package com.demo.mq.rabbitmq.example10;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class PriorityRecv {
private static String exchange_name="priority_direct";
private static String queueName="priority_queue";
/**
* 只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效
* 验证方式:
* 1、可先发送消息,再进行消费
* 2、开启手动应答、设置Qos。若为1,在一个消费者存在的情况下,除第一个消息为均按优先级进行消费(第一个消息被及时消费掉了)
* 3、可在方式二的基础上不断增加消费者,也符合优先调用规则
* 注意要点:
* 1、队列、消息上均要设置优先级才可生效,以较小值为准;
* 2、队列优先级只能声明一次,不可改变(涉及到硬盘、内存存储方式)
* 3、优先级队列在内存、硬盘、cpu会有成本消耗,不建议创建大量的优先级别(数量、级别种类、大级别,理解混乱,英文理解困难...)
* @throws IOException
* @throws TimeoutException
*/
public static void priorityRecv() throws IOException, TimeoutException{
Connection conn=MqManager.newConnection();
Channel channel=conn.createChannel();
channel.exchangeDeclare(exchange_name, BuiltinExchangeType.DIRECT);
//设置队列的优先级,消息的优先级大于队列的优先级,以较小值为准(例如:队列优先级5、消息优先级8,消息实际优先级为5)
Map<String, Object> args=new HashMap<String, Object>();
args.put("x-max-priority", 10);//队列优先级只能声明一次,不可改变(涉及到硬盘、内存存储方式)
channel.queueDeclare(queueName, false, false, false, args);
channel.queueBind(queueName, exchange_name, "");
channel.basicQos(1);//需要开启手动应答模式,否则无效
channel.basicConsume(queueName, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
String mes=SerializationUtils.deserialize(body);
System.out.println(properties.getPriority()+":priority Received :'"+mes+"' done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
public static void main(String[] args) throws IOException, TimeoutException {
priorityRecv();
}
}
分享到:
相关推荐
RabbitMQ实战 高效部署分布式消息队列 RabbitMQ实战 高效部署分布式消息队列
RabbitMQ 是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送...
RabbitMQ实战: 高效部署分布式消息队列,高质量文档分享,请珍惜!
RabbitMQ实战 高效部署分布式消息队列 附带目录 高清完整版 PDF下载
RabbitMQ实战 高效部署分布式消息队列 高清完整版下载PDF下载
RabbitMQ实战高效部署分布式消息队列.pdf+rabbitmq学习手册.pdf
RabbitMQ实战:高效部署分布式消息队列 探索消息通信世界 打造24×7×365无间断工作环境
RabbitMQ实战:高效部署分布式消息队列pdf版 很详细的rabbitMQ实战教程
RabbitMQ实战高效部署分布式消息队列书籍, 高清完整版PDF。有学习RabbitMQ的小伙伴可以下载了。
RabbitMQ实战 高效部署分布式消息队列pdf
《RabbitMQ实战高效部署分布式消息队列》完整pdf书籍, 初学者可以看看
采用python编写的批量删除rabbitmq的队列或交换机。 1.修改rabbitmq_delete.py中rabbitmq的配置; 2.执行以下命令: 删除队列: python3 rabbitmq_delete.py -k ‘udata.climb’ -d 1 删除交换机: python3 rabbitmq_...
RabbitMQ实战:高效部署分布式消息队列 带目录书签高清PDF
RabbitMQ实战 高效部署分布式消息队列.part1.rar 高清 80M 2部分
rabbitmq-server-3.9.11.exe
【课程目录】:---第一章:RabbitMQ介绍----1-什么是消息中间件.mp4----2-RabbitMQ消息队列安装:window环境.mp4----3-RabbitMQ消息队列安装 :Linux环境.mp4----4-Rabbitmq入口示例:server.mp4----5-rabbitmq入口...
RabbitMQ+Erlang+RabbitMq延时队列插件