`

Java学习——rabbitmq(pub/sub)

 
阅读更多

Publish/Subscribe

 

/* Pub/Sub Model
                   / [...](Q1) -> subscriber1
P -> X(type fanout)
                   \ [...](Q2) -> subscriber1
*/

这是很常见的应用场景,生产者在某一时刻将一条消息发送给多个消费者。只是需要注意rabbitmq和其他典型的消息队列有些区别的是生产者并不直接将消息放入队列中,实际上生产者并不知道队列的存在。

 

它只能将消息发送给一个exchange交换组件,这个交换组件要做的事很简单。一方面接收消息,另一方面将消息放入队列。不过它需要知道根据什么规则来处理这些消息(是否放入一个特殊队列还是多个队列,或者被丢弃)。有一些规则可以使用direct,topic,headers,fanout.

pub/sub模型中我们用到fanout类型

 

channel.exchangeDeclare("chatroom","fanout");

channel.basicPublish("chatroom","",null,mesage.getBytes());

前面简单的producer/consumer与taskqueue都是需要指定一个队列名,而我们这里需要一个临时队列(缘由如下)

1,consumer无论何时接入rabbit我们都需要一个空的队列(自己随机命名或让rabbitmq给我们指定一个随机队列)

2,consumer不再监听的时候临时队列需要及时删除

 

String queueName = channel.queueDeclare().getQueue();

当创建完exchange和queue,需要绑定它们两

 

channel.queueBind(queueName, "chatroom", "");

Publisher: Send.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

	public static void main(String[] args) throws IOException {
		// 创建一个连接连接服务器
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		//factory.setPort(1987);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		// 声明一个队列,可以对队列做配置,如持久化等。然后往队列发送数据
		channel.exchangeDeclare("chatroom","fanout");
		
		for(int i = 1; i < 10; i ++){
			String message = "message " + i;
			channel.basicPublish("chatroom", "", null, message.getBytes());
			System.out.println(" [x] Sent '" + message +"'");
		}
		channel.close();
		connection.close();
	}
}

Subscriber: Recv.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownSignalException;

public class Recv {

	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		// 创建一个连接接收数据
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		//factory.setPort(1987);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.exchangeDeclare("chatroom","fanout");
		String queueName = channel.queueDeclare().getQueue();
		channel.queueBind(queueName, "chatroom", "");
		// 等待消息
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);
		while(true){
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
		}
	}
}

$ javac -cp rabbitmq-client.jar Recv.java Send.java

运行多个Recv实例,可以看到每个接收端都能获取到所有的消息。(后启动的Recv实例只会获取后它启动后的消息)

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics