发送消息时指定userid,只有是当前conn的用户名才可以发送出去消息(测试发现非当前conn用户名,队列没有也没有创建,可能因为是在同一个channel)。用于消费者consumer需要知道消息是从哪个用户发过来的情况
package com.demo.mq.rabbitmq.example12;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 校验User ID
* @author sheungxin
*
*/
public class ValidUserSend {
private static String userId="sheungxin";
/**
* 发送消息时校验userId
* @param mes
* @throws IOException
* @throws TimeoutException
*/
public static void validUserSend(Serializable mes) throws IOException, TimeoutException{
Connection conn=MqManager.newConnection();
//链接堵塞监听器,除此还有ShutdownListener(非本用例的重点,略带一下)
conn.addBlockedListener(new BlockedListener() {
@Override
public void handleUnblocked() throws IOException {
// connection is now unblocked
}
@Override
public void handleBlocked(String reason) throws IOException {
// connection is now blocked
}
});
Channel channel=conn.createChannel();
String queueName=channel.queueDeclare().getQueue();
//声明消费者用于接收消息(发在basicPublish后面也可以接收到消息,可能因为在同一个channel)
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
System.out.println(SerializationUtils.deserialize(body));
}
});
//发送消息时指定userid,只有是当前conn的用户名才可以发送出去消息(测试发现非当前conn用户名,队列没有也没有创建,可能因为是在同一个channel)
Builder properties=new BasicProperties.Builder();
properties.userId(userId);
channel.basicPublish("", queueName, properties.build(), SerializationUtils.serialize(mes));
}
public static void main(String[] args) throws IOException, TimeoutException {
validUserSend("Hello World!");
}
}
x-max-length:指队列中可存放的最大消息数量,
不是队列中单条消息的最大长度
max-length-bytes:指队列中存放消息内容的最大字节长度
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10);
//args.put("x-max-length-bytes",10000);
channel.queueDeclare("myqueue", false, false, false, args);
可通过服务端策略设定
rabbitmqctl set_policy Ten "^one-meg$" '{"max-length-bytes":1000000}' --apply-to queues
分享到:
相关推荐
RabbitMQ 是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送...
rabbitmq-server-3.9.11.exe
【课程目录】:---第一章:RabbitMQ介绍----1-什么是消息中间件.mp4----2-RabbitMQ消息队列安装:window环境.mp4----3-RabbitMQ消息队列安装 :Linux环境.mp4----4-Rabbitmq入口示例:server.mp4----5-rabbitmq入口...
在当今信息技术迅速...此外,文档还提供了使用HAProxy实现负载均衡的方法,旨在提升整体架构的高可用性和性能。通过这些详尽的指导,IT专业人员可以有效搭建和管理RabbitMQ集群,确保企业消息系统的稳定和高效运行。
如题,C#的Demo项目:RabbitMQ封装和使用, 引用了RabbitMQ.Client 版本:3.6.9 RabbitMQ .NET客户端操作类库, 并简单展示了3种Exchange的使用
以抢购为例, 给抢购用户发布消息
RabbitMQ实战: 高效部署分布式消息队列,高质量文档分享,请珍惜!
RabbitMQ-HelloWorld:RabbitMQ的Hello World示例
image : rabbitmq:3-management container_name : rabbitmq volumes : - rabbitmq-vol:/var/lib/rabbitmq - rabbitmq-log:/var/log/rabbitmq - ./conf/:/etc/rabbitmq/ networks : - rabbitmq-net 步骤2:...
rabbitmq-3.10.6:management
跑步docker run -d --hostname rabbitserver --name rabbitmq-server -p 15672:15672 -p 5672:5672 rabbitmq:3-managementRabbitMQ服务器要查看服务器管理,您应该使用在docker image上配置的端口转到本地主机...
springboot集成rabbitmq,三种模式:fanout direct topic的实现,入门级别
RabbitMQ分发方式:主题模式思维脑图
RabbitMQ实战 高效部署分布式消息队列 RabbitMQ实战 高效部署分布式消息队列
rabbitmq延迟插件:rabbitmq_delayed_message_exchange-20171215-3.6.x.ez
RabbitMQ
GitChat分享会-RabbitMQ典型场景实战-源码数据库。 有任何问题均可以与博主交流:1974544863。
otp_win64_25.0.2 rabbitmq 3.10.2 安装步骤:https://blog.csdn.net/qq_36802726/article/details/125619207
RabbitMQ示例 RabbitMQ示例1:哈喽世界 RabbitMQ示例2:工作原理 RabbitMQ示例3:发布与订阅【fanout转换器】 RabbitMQ示例4:路由【直接交换机】 RabbitMQ示例5:主题【topic切换】 RabbitMQ示例6:远程过程调用...
rabbitmq练习项目Java源代码.zip