`
lizhensan
  • 浏览: 369630 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

jeromq 例子

    博客分类:
  • java
 
阅读更多

zoremq 

这个不能算是mq产品,只能算是个socket的封装,并针对常用场景进行了抽取。简化了网络编程。

jeromq 是对 zoremq 框架的一个纯粹的java版本的实现(不需要依赖zoremq 动态库)

 

jeromq 它是基于java nio 进行的封装。

 

 

maven 依赖,仅一个jar

<dependency>
			<groupId>org.zeromq</groupId>
			<artifactId>jeromq</artifactId>
			<version>0.3.4</version>
		</dependency>

 

 

 

1、请求-响应模式

 

package zoremq;

import org.zeromq.ZMQ;

public class Request {
	public static void main(String args[]) {
		for (int j = 0; j < 5; j++) {
			new Thread(new Runnable() {
				public void run() {
					ZMQ.Context context = ZMQ.context(1); // 创建一个I/O线程的上下文
					ZMQ.Socket socket = context.socket(ZMQ.REQ); // 创建一个request类型的socket,这里可以将其简单的理解为客户端,用于向response端发送数据

					socket.connect("tcp://127.0.0.1:5555"); // 与response端建立连接
					long now = System.currentTimeMillis();
					for (int i = 0; i < 100000; i++) {
						String request = "hello";
						socket.send(request); // 向reponse端发送数据
						byte[] response = socket.recv(); // 接收response发送回来的数据
															// 正在request/response模型中,send之后必须要recv之后才能继续send,这可能是为了保证整个request/response的流程走完
						System.out.println("receive : " + new String(response));
					}
					long after = System.currentTimeMillis();

					System.out.println((after - now) / 1000);
				}

			}).start();
			;
		}

	}
}

 

package zoremq;

import org.zeromq.ZMQ;

public class Response {
	public static void main(String[] args) {
		ZMQ.Context context = ZMQ.context(1); // 这个表示创建用于一个I/O线程的context

		ZMQ.Socket socket = context.socket(ZMQ.REP); // 创建一个response类型的socket,他可以接收request发送过来的请求,其实可以将其简单的理解为服务端
		socket.bind("tcp://*:5555"); // 绑定端口
		int i = 0;
		int number = 0;
		while (!Thread.currentThread().isInterrupted()) {
			i++;
			if (i == 10000) {
				i = 0;
				System.out.println(++number);
			}
			byte[] request = socket.recv(); // 获取request发送过来的数据
			System.out.println("客户端的相应:" + new String(request));
			// System.out.println("receive : " + new String(request));
			String response = "world";
			socket.send(response); // 向request端发送数据
									// ,必须要要request端返回数据,没有返回就又recv,将会出错,这里可以理解为强制要求走完整个request/response流程
		}
		socket.close(); // 先关闭socket
		context.term(); // 关闭当前的上下文

	}
}

 

 

 

2、Publish-subscribe

 

 

package zoremq.sub;

import org.zeromq.ZMQ; 
import org.zeromq.ZMQ.Context; 
import org.zeromq.ZMQ.Socket; 

public class Publisher { 

    /** 
     * @param args 
     * @throws InterruptedException 
     */ 
    public static void main(String[] args) throws InterruptedException { 
    // TODO Auto-generated method stub 
    Context context = ZMQ.context(1); 
    Socket publisher = context.socket(ZMQ.PUB); 
    publisher.bind("tcp://*:5557"); 
    int i = 0; 
    while (true) { 
        Thread.currentThread().sleep(1000); 
        publisher.send("A".getBytes(), ZMQ.SNDMORE); 
        publisher.send("This is A".getBytes(), 0); 
        publisher.send("B".getBytes(), ZMQ.SNDMORE); 
        publisher.send("This is B".getBytes(), 0); 
    } 
    } 

}

 

package zoremq.sub;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class Sub1 {
	public static void main(String[] args) {
		Context context = ZMQ.context(1);
		Socket subscribe = context.socket(ZMQ.SUB);
		subscribe.connect("tcp://127.0.0.1:5557");
		subscribe.subscribe("B".getBytes());
		while (true) {
			System.out.println(new String(subscribe.recv(0)));
			System.out.println(new String(subscribe.recv(0)));
		}
	}

}

 3、PipeLine模式

 

 

package zoremq.sub;

import org.zeromq.ZMQ; 
import org.zeromq.ZMQ.Context; 
import org.zeromq.ZMQ.Socket; 

public class Sub2 { 
    public static void main(String[] args) { 
    Context context = ZMQ.context(1); 
    Socket subscribe = context.socket(ZMQ.SUB); 
    subscribe.connect("tcp://127.0.0.1:5557"); 
    //subscribe.subscribe("topic".getBytes()); 
    subscribe.subscribe("A".getBytes()); 
    while (true) { 
        System.out.println(new String(subscribe.recv(0))); 
        System.out.println(new String(subscribe.recv(0))); 
    } 
    } 
}

 3、PipeLine模式

package zoremq.PipeLine;

import org.zeromq.ZMQ;
public class MainPusher {
	public static void main(String[] args) throws InterruptedException {
		// 参数代表使用多少线程,大多数情况下,1个线程已经足够。
		ZMQ.Context context = ZMQ.context(1);
		// 指定模式为Pusher
		ZMQ.Socket socket = context.socket(ZMQ.PUSH);
		socket.bind("tcp://127.0.0.1:5557"); // 绑定服务地址及端口
		for (;;) {
			long time = System.nanoTime();
			socket.send(String.valueOf(time));
			System.out.println("发布了新消息,时间:" + time);
			Thread.sleep(2000);
		}
	}
}

 

package zoremq.PipeLine;

import org.zeromq.ZMQ;

public class WorkerOne {
	public static void main(String[] args) {
		// 指定模式为pull模式
		ZMQ.Socket receiver = ZMQ.context(1).socket(ZMQ.PULL);
		receiver.connect("tcp://127.0.0.1:5557");
		// 指定模式为push模式
		ZMQ.Socket sender = ZMQ.context(1).socket(ZMQ.PUSH);
		sender.connect("tcp://127.0.0.1:5558");
		for (;;) {
			byte[] recs = receiver.recv();
			long receiveTime = System.nanoTime();
			String oriMsg = new String(recs);
			long pubTime = Long.valueOf(oriMsg);
			long costTime = receiveTime - pubTime;
			System.out.println("Receive: " + oriMsg + " Cost time: " + costTime);
			sender.send("1" + oriMsg);
			System.out.println("Send to sinker.");
		}
	}
}

 

package zoremq.PipeLine;

import org.zeromq.ZMQ;

public class WorkerTwo {
	public static void main(String[] args) {
		// 指定模式为pull模式
		ZMQ.Socket receiver = ZMQ.context(1).socket(ZMQ.PULL);
		receiver.connect("tcp://127.0.0.1:5557");
		// 指定模式为push模式
		ZMQ.Socket sender = ZMQ.context(1).socket(ZMQ.PUSH);
		sender.connect("tcp://127.0.0.1:5558");
		for (;;) {
			byte[] recs = receiver.recv();
			long receiveTime = System.nanoTime();
			String oriMsg = new String(recs);
			long pubTime = Long.valueOf(oriMsg);
			long costTime = receiveTime - pubTime;
			System.out.println("Receive: " + oriMsg + " Cost time: " + costTime);
			sender.send("2" + oriMsg);
			System.out.println("Send to sinker.");
		}
	}
}

 

package zoremq.PipeLine;
import org.zeromq.ZMQ;
 
public class Sinker   {
     public static void main(String[] args) {
          ZMQ.Context context = ZMQ. context(1);
           // 指定模式为pull模式
          ZMQ.Socket receiver = context.socket(ZMQ. PULL);
          receiver. bind("tcp://127.0.0.1:5558");
           for (;;) {
               byte[] recs = receiver.recv();
               long receiveTime = System. nanoTime();
              String oriMsg = new String(recs);
              String msg = new String(recs,1,recs.length-1);
               long pubTime = Long. valueOf(msg);
               long costTime = receiveTime - pubTime;
              System. out.println( "Receive: " + oriMsg + " Cost time: " + costTime);
          }
     }
}

 

 

参考:http://www.coderli.com

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics