`
yuwenlin2008
  • 浏览: 124987 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

RabbitMQ安装使用(直接交换direct exchange)

阅读更多

1.简介

         RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP高级消息队列,说白了就是一个开源的消息中间件。它能解决不同组件、模块、系统间消息通信。

 

2.系统架构

RabbitMQ Server: 也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。

Producer:数据的发送方,create messages and publish (send) them to a broker server (RabbitMQ)。

Consumer:数据的接收方,Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。

Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。

 

Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

那么,为什么使用Channel,而不是直接使用TCP连接?

 

 

    对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。

 

对于一个数据从Producer到Consumer的正确传递,还有三个概念需要明确:exchanges, queues and bindings。

 

        Exchanges are where producers publish their messages.

 

        Queues are where the messages end up and are received by consumers

 

 

        Bindings are how the messages get routed from the exchange to particular queues.

 

Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。

     有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。

Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。

Fanout exchange: 会向响应的queue广播。

Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。

 

Consumer和Procuder都可以通过 queue.declare 创建queue。如果queue已经存在,也不会报错。如果没有,要么发送不了消息,要么取不到消息,所以还是都创建吧。

 

Bindings就是将通过Exchange将queue和routing keys绑定。

 

3.应用开发测试

我们使用直接交换(direct exchange)模式,这种方式有效实现点对点发送。比如发送方:系统分别给每个组织机构或用户发送信息,接收方:每个组织机构或用户各自接收自己的消息。

RabbitMQ服务端搭建(windows环境)参考附件:

a.它由erlang开发,要安装erlang依赖otp_win32_R16B02.exe

b.rabbitmq服务端rabbitmq-server-3.2.0.exe,默认端口5672,要改的话,代码里得显示指定。

c.RabbitMQ客户端对种语言支持良好,这里我用Java,下载java开发包rabbitmq-client.jar,commons-cli-1.1.jar,commons-io-1.2.jar

producer代码:

 

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
 * 测试RabbitMQ发送
 */
public class SendTest {
	
	private static String HOST = "172.16.6.180";
	private static String EXCHANGE_NAME = "temp";
	private static String[] BINDINGS_QUEUE_NAMES = { "user001","user002","user003","user004","user005","user006","user007","user008","user009","user010"};

	public static void main(String[] args) throws Exception {
		BufferedReader br = null;
		String message = null;
		String flag = "";
		// 建立连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(HOST);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		System.out.println("连接成功");
		System.out.println("声明持久化的direct交换机....");
		System.out.println("声明持久化队列并绑定...");
		// 声明此交换器为全广播并且持久化
		channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
		for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {
			channel.queueDeclare(BINDINGS_QUEUE_NAMES[i], true, false, false, null);
			channel.basicQos(1);
			channel.queueBind(BINDINGS_QUEUE_NAMES[i], EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i]);
			System.out.println("队列" + BINDINGS_QUEUE_NAMES[i] + "绑定成功!");
		}
		
		while (true) {
			System.out.println("请选择发送方式:1:全部发送;2:指定发送;");
			br = new BufferedReader(new InputStreamReader(System.in));
			flag = br.readLine();
			if ("1".equals(flag)) {
				System.out.println("发送内容为:这是一条测试数据");
				for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {
					// 发送消息
					byte[] buffer = ("这是一条测试数据"+BINDINGS_QUEUE_NAMES[i]+new Date()).getBytes("utf-8");
					channel.basicPublish(EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i], MessageProperties.PERSISTENT_TEXT_PLAIN, buffer);
					String ss = new String(buffer, "utf-8");
					System.out.println(ss);
				}
				System.out.println("发送完毕!");
			} else if("2".equals(flag)) {
				System.out.println("请选择您需要发送的队列序号,以','隔开:");
				for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {
					System.out.println((i+1) + ":" + BINDINGS_QUEUE_NAMES[i]);
				}
				br = new BufferedReader(new InputStreamReader(System.in));
				String[] indexs = br.readLine().split(",");
				System.out.println("请输入您要发送的消息:");
				br = new BufferedReader(new InputStreamReader(System.in));
				message = br.readLine();
				for (int i = 0; i < indexs.length; i++) {
					// 发送消息
					channel.basicPublish(EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[Integer.valueOf(indexs[i])-1], MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
				}
				System.out.println("发送完毕!");
			}
		}
		
	}
	
}

发送方控制台:

 

连接成功
声明持久化的direct交换机....
声明持久化队列并绑定...
队列user001绑定成功!
队列user002绑定成功!
队列user003绑定成功!
队列user004绑定成功!
队列user005绑定成功!
队列user006绑定成功!
队列user007绑定成功!
队列user008绑定成功!
队列user009绑定成功!
队列user010绑定成功!
请选择发送方式:1:全部发送;2:指定发送。
1
发送内容为:这是一条测试数据
这是一条测试数据user001Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user002Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user003Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user004Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user005Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user006Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user007Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user008Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user009Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user010Wed Apr 22 10:37:00 CST 2015
发送完毕!
请选择发送方式:1:全部发送;2:指定发送。

 RabbitMQ可视化查看:http://localhost:15672/#/queues


 

Receive代码:

 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
 * 测试RabbitMQ接收
 */
public class ReceTest {
	
	private static String HOST = "172.16.6.180";
	private static String EXCHANGE_NAME = "temp";
	private static String[] BINDINGS_QUEUE_NAMES = { "user001","user002","user003","user004","user005","user006","user007","user008","user009","user010"};


	public static void main(String[] args) throws Exception {
		// 建立连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(HOST);
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();
		System.out.println("连接成功");
		System.out.println("声明持久化的direct交换机....");
		System.out.println("声明持久化队列并绑定...");
		// 声明交换器,与服务保持一致
		channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
		for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {
			channel.queueDeclare(BINDINGS_QUEUE_NAMES[i], true, false, false, null);
			channel.basicQos(1);
			channel.queueBind(BINDINGS_QUEUE_NAMES[i], EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i]);
			System.out.println("队列" + BINDINGS_QUEUE_NAMES[i] + "绑定成功!");
		}
		System.out.println("开始接收数据...");
		for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {
			final String queue = BINDINGS_QUEUE_NAMES[i];
			new Thread(){
				public void run() {
					try {
						receive(channel, queue);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}.start();
		}
	}
	
	private static void receive(Channel channel,String QUEUE_NAME) throws Exception {
		// 声明消费者
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(QUEUE_NAME, false, consumer);
		while (true) {
			// 等待队列推送消息
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody(), "UTF-8");
			System.out.println(QUEUE_NAME + " Received '" + message + "'");
			// 反馈给服务器表示收到信息
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}
}

 

 接收方控制台:

 

连接成功
声明持久化的direct交换机....
声明持久化队列并绑定...
队列user001绑定成功!
队列user002绑定成功!
队列user003绑定成功!
队列user004绑定成功!
队列user005绑定成功!
队列user006绑定成功!
队列user007绑定成功!
队列user008绑定成功!
队列user009绑定成功!
队列user010绑定成功!
开始接收数据...
user010 Received '这是一条测试数据user010Wed Apr 22 10:37:00 CST 2015'
user001 Received '这是一条测试数据user001Wed Apr 22 10:37:00 CST 2015'
user007 Received '这是一条测试数据user007Wed Apr 22 10:37:00 CST 2015'
user006 Received '这是一条测试数据user006Wed Apr 22 10:37:00 CST 2015'
user004 Received '这是一条测试数据user004Wed Apr 22 10:37:00 CST 2015'
user005 Received '这是一条测试数据user005Wed Apr 22 10:37:00 CST 2015'
user002 Received '这是一条测试数据user002Wed Apr 22 10:37:00 CST 2015'
user008 Received '这是一条测试数据user008Wed Apr 22 10:37:00 CST 2015'
user009 Received '这是一条测试数据user009Wed Apr 22 10:37:00 CST 2015'
user003 Received '这是一条测试数据user003Wed Apr 22 10:37:00 CST 2015'

 再看 http://localhost:15672/#/queues



 关于RabbitMQ服务端搭建,及可视化页面配置,请参考附件RabbitMQ安装与配置。

参考网站:

http://blog.csdn.net/anzhsoft/article/details/19563091

http://my.oschina.net/OpenSourceBO/blog/379732

  • 大小: 28.7 KB
  • 大小: 31.3 KB
1
0
分享到:
评论
1 楼 freezingsky 2015-04-22  
难得看开一两篇讲得不错的文章!

相关推荐

Global site tag (gtag.js) - Google Analytics