- 浏览: 839066 次
文章分类
- 全部博客 (365)
- java (124)
- spring mvc (21)
- spring (22)
- struts2 (6)
- jquery (27)
- javascript (24)
- mybatis/ibatis (8)
- hibernate (7)
- compass (11)
- lucene (26)
- flex (0)
- actionscript (0)
- webservice (8)
- rabbitMQ/Socket (15)
- jsp/freemaker (5)
- 数据库 (27)
- 应用服务器 (21)
- Hadoop (1)
- PowerDesigner (3)
- EJB (0)
- JPA (0)
- PHP (2)
- C# (0)
- .NET (0)
- html (2)
- xml (5)
- android (7)
- flume (1)
- zookeeper (0)
- 证书加密 (2)
- maven (1)
- redis (2)
- cas (11)
最新评论
-
zuxianghuang:
通过pom上传报错 Artifact upload faile ...
nexus上传了jar包.通过maven引用当前jar,不能取得jar的依赖 -
流年末年:
百度网盘的挂了吧???
SSO单点登录系列3:cas-server端配置认证方式实践(数据源+自定义java类认证) -
953434367:
UfgovDBUtil 是什么类
Java发HTTP POST请求(内容为xml格式) -
smilease:
帮大忙了,非常感谢
freemaker自动生成源代码 -
syd505:
十分感谢作者无私的分享,仔细阅读后很多地方得以解惑。
Nginx 反向代理、负载均衡、页面缓存、URL重写及读写分离详解
这里是同步发送消息,异步接收消息
接收有两种方式:http://www.rabbitmq.com/api-guide.html#getting
Retrieving individual messages(channel.basicGet)
To retrieve individual messages, use Channel.basicGet
. The returned value is an instance of GetResponse
, from which the header information (properties) and message body can be extracted
Retrieving messages by subscription(Consumer)
import com.rabbitmq.client.Consumer; import com.rabbitmq.client.QueueingConsumer;
Another way to receive messages is to set up a subscription using the Consumer
interface. The messages will then be delivered automatically as they arrive, rather than having to be requested proactively.
The easiest and safest supplied implementation of Consumer
is the QueueingConsumer
convenience class:
TestBase.java -- 基类
/**
* @author sunjun
* @createtime 2010-4-27 下午03:13:27
*/
public class TestBase implements ShutdownListener {
private static Connection connection;
static {
ConnectionParameters parameters = new ConnectionParameters();
parameters.setUsername("guest");
parameters.setPassword("guest");
parameters.setVirtualHost("/");
// 默认情况下使用用户guest/guest,vhost=/
ConnectionFactory factory = new ConnectionFactory(parameters);
try {
// 获取connection
connection = factory.newConnection("192.168.18.24", AMQP.PROTOCOL.PORT);
} catch (IOException e) {
e.printStackTrace();
}
}
protected Channel getChannel() throws IOException {
return connection.createChannel();
}
protected void close(Channel channel) {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
protected void printBasicProperties(BasicProperties properties) {
System.out.println("properties.ContentEncoding: " + properties.getContentEncoding());
System.out.println("properties.content type: " + properties.getContentType());
System.out.println("properties.Expiration: " + properties.getExpiration());
System.out.println("properties.type: " + properties.getType());
System.out.println("properties.reply to: " + properties.getReplyTo());
System.out.println("properties.appId: " + properties.getAppId());
System.out.println("properties.classId: " + properties.getClassId());
System.out.println("properties.className: " + properties.getClassName());
System.out.println("properties.clusterId: " + properties.getClusterId());
}
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
System.out.println("-----------shutdown information----------");
Object reason = cause.getReason();
System.out.println(reason);
Object reference = cause.getReference();
System.out.println(reference);
System.out.println(cause.isHardError());
System.out.println(cause.isInitiatedByApplication());
}
}
TestSender.java
/**
* 发送消息
*
* @author sunjun
* @createtime 2010-4-27 上午11:21:05
*/
public class TestSender extends TestBase {
/**
* 使用channel类basicPublish发送
*
* @throws IOException
*/
public void sendMsg() throws IOException {
// 创建一个channel
Channel channel = getChannel();
// 设置return listener,处理不能发送或未被送达的消息,如果发送时设置 强制发送 或 立即发送 ,但是消息没有被接收到,就会回调return listener
// Handling unroutable or undelivered messages
// If a message is published with the "mandatory" or "immediate" flags set, but cannot be
// delivered, the broker will return it to the sending client (via a AMQP.Basic.Return
// command).
channel.setReturnListener(new ReturnListener() {
@Override
public void handleBasicReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("-----------get exception reply message---------");
System.out.println("replyCode: " + replyCode);
System.out.println("replyText: " + replyText);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("ContentEncoding: " + properties.getContentEncoding());
System.out.println("content type: " + properties.getContentType());
System.out.println("Expiration: " + properties.getExpiration());
System.out.println("type: " + properties.getType());
System.out.println("reply to: " + properties.getReplyTo());
System.out.println("body: " + new String(body));
}
});
// 定义一个由rabbitmq server自动生成queueName的queue, autodelete, non-durable
// DeclareOk queueDeclare = channel.queueDeclare();
// String queue = queueDeclare.getQueue();
// System.out.println(queue);
// 定义一个queue,non-autodelete, non-durable(即rabbitmq重启后会消失)
String queueName = "test.queue";
DeclareOk queueDeclare = channel.queueDeclare(queueName);
// 定义一个queue,durable(即rabbitmq重启后也不会消失)
// String queue = "test.queue";
// DeclareOk queueDeclare = channel.queueDeclare(queue ,true);
// qeuue名称,也就是test.queue
System.out.println("queueName: " + queueDeclare.getQueue());
// 消费者数量
System.out.println("ConsumerCount: " + queueDeclare.getConsumerCount());
// 未读取的消息数量
System.out.println("MessageCount: " + queueDeclare.getMessageCount());
// 定义一个exchange
String exchange = "test.exchange";
channel.exchangeDeclare(exchange, "direct");
// bind queue到exchange上
String routingKey = "test.routingkey";
channel.queueBind(queueName, exchange, routingKey);
// 发送一个非持久化文本消息
String msg = "hello world!";
// 异步发送消息,不用等待去接收消费者的回复消息
channel.basicPublish(exchange, routingKey, MessageProperties.TEXT_PLAIN, msg.getBytes());
System.out.println("send message success.");
// close
close(channel);
}
/**
* main
*
* @param args
*/
public static void main(String[] args) {
try {
new TestSender().sendMsg();
} catch (IOException e) {
e.printStackTrace();
}
}
}
TestReceiver.java
/**
* 接收消息,使用channel.basicGet接收消息
*
* @author sunjun
* @createtime 2010-4-27 下午03:14:31
*/
public class TestReceiver extends TestBase {
/**
* 接收消息
*
* @throws IOException
*/
public void receive() throws IOException {
// 获取channel
Channel channel = getChannel();
// 定义一个queue,如果先启动receive端而又没有定义queue,会报错no queue 'test.queuea' in vhost
String queue = "test.queue";
channel.queueDeclare(queue);
boolean noAck = true;
// 获取到消息的计数器
int counter = 0;
// 循环从queue(test.queue)中获取消息
while (true) {
GetResponse response = channel.basicGet(queue, noAck);
if (response != null) {
System.out.println("----------- receive message " + (++counter) + "-----------");
System.out.println("Message : " + new String(response.getBody()));
Envelope envelope = response.getEnvelope();
String exchange = envelope.getExchange();
String routingKey = envelope.getRoutingKey();
long deliveryTag = envelope.getDeliveryTag();
BasicProperties properties = response.getProps();
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("MessageCount: " + response.getMessageCount());
System.out.println("deliveryTag: " + deliveryTag);
printBasicProperties(properties);
System.out.println("receive success.\n\n");
if (!noAck) // 发送回复
channel.basicAck(deliveryTag, false);
}
}
}
/**
* main
*
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
new TestReceiver().receive();
}
}
TestReceiver2.java
/**
* 接收消息,使用Consumer(QueueingConsumer)来取消息
*
* @author sunjun
* @createtime 2010-4-27 下午03:14:31
*/
public class TestReceiver2 extends TestBase {
/**
* 接收消息
*
* @throws IOException
*/
public void receive() throws IOException {
// 获取channel
Channel channel = getChannel();
// 定义一个queue,如果先启动receive端而又没有定义queue,会报错no queue 'test.queuea' in vhost
String queue = "test.queue";
channel.queueDeclare(queue);
int counter = 0;
boolean noAck = false;
// 定义一个消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
String consumerTag = channel.basicConsume(queue, noAck,
queueingConsumer);
System.out.println("receive message started.");
// 循环取消息
while (true) {
Delivery nextDelivery = null;
try {
// 取消息
nextDelivery = queueingConsumer.nextDelivery();
} catch (Exception e) {
e.printStackTrace();
}
if (nextDelivery != null) {
System.out.println("----------- receive message " + (++counter)
+ "-----------");
System.out.println("Message : "
+ new String(nextDelivery.getBody()));
Envelope envelope = nextDelivery.getEnvelope();
String exchange = envelope.getExchange();
String routingKey = envelope.getRoutingKey();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("deliveryTag: " + deliveryTag);
BasicProperties properties = nextDelivery.getProperties();
printBasicProperties(properties);
System.out.println("receive success.\n\n");
if (!noAck)
channel.basicAck(deliveryTag, false);
// channel.basicCancel(queueingConsumer.getConsumerTag());
// System.out.println("cancel consumer success.");
break;
}
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
TestReceiver2 test = new TestReceiver2();
test.receive();
}
}
发表评论
-
rabbitmq 学习-14- 官方rabbitmq+spring进行远程接口调用
2012-07-07 09:08 3755到http://github.com/momania/spri ... -
rabbitmq 学习-13- 发送接收消息示例-2
2012-07-06 17:17 1727Basic RPC As a programming con ... -
rabbitmq 学习-积累
2012-07-06 17:17 13161,temporary queue(由server自动命名)在 ... -
rabbitmq 学习-11- 几个发送接收消息的重要类
2012-07-06 17:17 16141,ChannelbasicPublish() 用来发送消息, ... -
rabbitmq 学习-10-channel 说明
2012-07-06 17:17 5230rabbitmq java api 关于消息处理的一个重要的类 ... -
rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理
2012-06-30 16:44 3015本身使用RpcClient发送消息与同步接收消息的代码是很 ... -
rabbitmq 学习-8- Exchange Queue RoutingKey关系说明
2012-06-30 16:44 3095String queue = channel.queueDec ... -
rabbitmq 学习-7-rabbitmq 支持场景
2012-06-30 16:43 1240What messaging scenarios are su ... -
rabbitmq 学习-6-rabbitmq基础
2012-06-30 16:43 1721rabbitmq的中文资料真少,和同事lucas经过两周的 ... -
rabbitmq 学习-4-初试2
2012-06-29 14:32 1048RpcClient,RpcServer同步发送接收消息Chan ... -
rabbitmq 学习-3-初试1
2012-06-29 14:32 1038本例是一个简单的异步 ... -
rabbitmq 学习-1-AMQP介绍
2012-06-29 14:30 1366Windows 1,下载下载erlang:erlang. ... -
rabbitmq学习1:hello world
2012-06-29 14:27 1345rabbitMQ是一个在AMQP基础上完整的,可服用的企业消息 ... -
rabbitmq操作命令
2012-06-14 13:54 201161.必需掌握的指令 添加用户: ...
相关推荐
C# RabbitMQ发送和接收简单明了的示例, 演示了,轮流接收消息功能,和集群功能.
该服务接收来自客户端的消息请求并将消息发送到 RabbitMQ。 这些服务侦听 RabbitMQ 以获取消息并向客户端返回响应。 消费者服务。 该服务侦听来自 RabbitMQ 的消息并处理消息,然后将消息发送到 RabbitMQ。 要求: ...
主要介绍了c# rabbitmq 简单收发消息的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
RabbitMQ消息队列,在C#代码 中的使用。代码有发送端以及接收端两个控制台应用。资源内部包含RabbitMQ介绍以及安装部署服务器PDF教程。
主要介绍了使用PHP访问RabbitMQ消息队列的方法,结合实例形式分析了RabbitMQ消息队列的相关扩展安装、队列建立、队列绑定、消息发送、消息接收等相关操作技巧,需要的朋友可以参考下
springboot整合rabbitmq简单示例。交换机的模式使用的是Topic,消息发送使用RabbitTemplate,消息接收使用RabbitListener
信息测试演示使用RabbitMQ使用Spring发送和接收消息。设置这需要在您的IDE中安装Lombok才能进行编译; 有关详细信息,请参见 。 安装RabbitMQ( brew install rabbitmq ) 确保/usr/local/sbin在您的路径中启动代理...
如下图所示,您可以使用MQTT.fx作为MQTT客户端,在MQTT.fx客户端配置相关参数后接入微消息队列MQTT版实现消息的发送和接收。消息收发 微消息队列MQTT版同时提供了公网接入点和VPC 接入点。 在物联网和移动互联网的...
RabbitMQ 的 Golang 简单任务 Runner 示例 仅供个人使用(非通用解决方案)。 它能做什么: 加载本地配置文件 (config.json) 基于本地配置请求远程配置: 发送当前运行时信息(由收集的内存、cpu、网络设置) ...
这种方法利用RabbitMQ消息代理在微服务之间进行事件通信,并且所有服务都使用Docker进行了容器化,因此它们可以独立开发,部署,监控和扩展。 完整应用程序后端演示 以下视频演示显示了概念验证当前支持的所有功能...
当在适当的队列上接收到消息时,会收集一些系统数据(目前仅 CPU 温度)并将其返回给发送消息的客户端。 这种架构使得任何进程在任何地方运行(如 )都可以根据需要轻松访问此数据。 用织物安装 使用有一个安装任务...
可发送/接收一堆消息并测量花费时间的示例显示传奇的示例显示SimpleInjector如何与Rebus一起工作的示例-具有各种记录类型的样本演示“正好一次交付”的示例,其中SQL传输和在同一个事务中征集的用户工
纳米流x 微型高性能服务器,在节点处理块时将其流式传输到套接字,以供其他进程接收。 这个包并不是要单独使用,而是其他纳nano-stream npm包的基础库,这些纳nano-stream npm包接收该流并将其... 数据的解析示例:
在服务启动期间,它将服务名称和自动生成的UUID保留在其透视图数据库中,并将数据发送到RabbitMQ交换,后者随后根据路由密钥将数据广播到所有队列。 每个微服务都侦听自己的RabbitMQ队列,并在接收数据时不断更新...
Socket通信案例消息发送与接收 第8周 上节回顾 Socket实现简单的ssh客户端 Socket实现简单的ssh服务端 积极思考正能量 Socket实现简单的ssh2 Socket粘包 Socket粘包深入编码 SocketServer SocketServer多并发 多...
生产者生产者仅仅是将消息发送到消息代理的软件,例如微服务系统中的客户服务,该系统希望通过发送事件customer来告诉其他服务新客户是由新客户创建的。新创建的客户ID作为有效载荷。 使用者使用者是一种从消息代理...
python入门到高级全栈工程师培训视频学习资料;本资料仅用于学习,请查看后24小时之内删除。 【课程内容】 第1章 01 计算机发展史 02 计算机系统 03 小结 04 数据的概念 05 进制转换 06 原码补码反码 07 物理层和...
兔子队列Python '''示例使用RabbitMQ发送和接收JSON数据 创建virtualenv并运行pip install -r requirements.txt'''