- 浏览: 23932 次
- 性别:
- 来自: 深圳
文章分类
最新评论
想找到一个消息推送的方案,隐约觉得Pub/Sub是一种解决问题的途径,但没在项目实践中用到。最新在了解学习阿里云,里面有demo。摘录记之。
消息的发布与订阅
场景介绍
ApsaraDB for Redis也提供了与Redis相同的消息发布(pub)与订阅(sub)功能。即一个client发布消息,其他多个client订阅消息。
需要注意的是,ApsaraDB for Redis发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”;消息订阅者也只能得到订阅之后的消息,频道(channel)中此前的消息将无从获得。
此外消息发布者(即publish客户端),无需独占与服务器端的连接,你可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如List操作等)。但是消息订阅者(即subscribe客户端),需要独占与服务器端的连接,即进行subscribe期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道(channel)中的消息;因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用(参见如下示例)。
消息发布者 (即publish client)
消息订阅者 (即subscribe client)
消息监听者
示例主程序
运行结果
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)
----------订阅消息SUBSCRIBE 开始-------
>>> 订阅(SUBSCRIBE) > Channel:KVStore频道-A
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:ed5924a9-016b-469b-8203-7db63d06f812
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
>>> 取消订阅(UNSUBSCRIBE) > Channel:KVStore频道-A
----------订阅消息SUBSCRIBE 结束-------
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息2:(此时订阅取消,所以此消息不会被接收)
>>> 发布(PUBLISH)结束 > Channel:KVStore频道-A > Message:quit
消息的发布与订阅
场景介绍
ApsaraDB for Redis也提供了与Redis相同的消息发布(pub)与订阅(sub)功能。即一个client发布消息,其他多个client订阅消息。
需要注意的是,ApsaraDB for Redis发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”;消息订阅者也只能得到订阅之后的消息,频道(channel)中此前的消息将无从获得。
此外消息发布者(即publish客户端),无需独占与服务器端的连接,你可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如List操作等)。但是消息订阅者(即subscribe客户端),需要独占与服务器端的连接,即进行subscribe期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道(channel)中的消息;因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用(参见如下示例)。
消息发布者 (即publish client)
package message.kvstore.aliyun.com; import redis.clients.jedis.Jedis; public class KVStorePubClient { private Jedis jedis;// public KVStorePubClient(String host,int port, String password){ jedis = new Jedis(host,port); //KVStore的实例ID及密码 String authString = jedis.auth(password);//kvstore_instance_id:password if (!authString.equals("OK")) { System.err.println("AUTH Failed: " + authString); return; } } public void pub(String channel,String message){ System.out.println(" >>> 发布(PUBLISH) > Channel:"+channel+" > 发送出的Message:"+message); jedis.publish(channel, message); } public void close(String channel){ System.out.println(" >>> 发布(PUBLISH)结束 > Channel:"+channel+" > Message:quit"); //消息发布者结束发送,即发送一个“quit”消息; jedis.publish(channel, "quit"); } }
消息订阅者 (即subscribe client)
package message.kvstore.aliyun.com; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; public class KVStoreSubClient extends Thread{ private Jedis jedis; private String channel; private JedisPubSub listener; public KVStoreSubClient(String host,int port, String password){ jedis = new Jedis(host,port); //ApsaraDB for Redis的实例ID及密码 String authString = jedis.auth(password);//kvstore_instance_id:password if (!authString.equals("OK")) { System.err.println("AUTH Failed: " + authString); return; } } public void setChannelAndListener(JedisPubSub listener,String channel){ this.listener=listener; this.channel=channel; } private void subscribe(){ if(listener==null || channel==null){ System.err.println("Error:SubClient> listener or channel is null"); } System.out.println(" >>> 订阅(SUBSCRIBE) > Channel:"+channel); System.out.println(); //接收者在侦听订阅的消息时,将会阻塞进程,直至接收到quit消息(被动方式),或主动取消订阅 jedis.subscribe(listener, channel); } public void unsubscribe(String channel){ System.out.println(" >>> 取消订阅(UNSUBSCRIBE) > Channel:"+channel); System.out.println(); listener.unsubscribe(channel); } @Override public void run() { try{ System.out.println(); System.out.println("----------订阅消息SUBSCRIBE 开始-------"); subscribe(); System.out.println("----------订阅消息SUBSCRIBE 结束-------"); System.out.println(); }catch(Exception e){ e.printStackTrace(); } } }
消息监听者
package message.kvstore.aliyun.com; import redis.clients.jedis.JedisPubSub; public class KVStoreMessageListener extends JedisPubSub{ @Override public void onMessage(String channel, String message) { System.out.println(" <<< 订阅(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message ); System.out.println(); //当接收到的message为quit时,取消订阅(被动方式) if(message.equalsIgnoreCase("quit")){ this.unsubscribe(channel); } } @Override public void onPMessage(String pattern, String channel, String message) { // TODO Auto-generated method stub } @Override public void onSubscribe(String channel, int subscribedChannels) { // TODO Auto-generated method stub } @Override public void onUnsubscribe(String channel, int subscribedChannels) { // TODO Auto-generated method stub } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { // TODO Auto-generated method stub } @Override public void onPSubscribe(String pattern, int subscribedChannels) { // TODO Auto-generated method stub } }
示例主程序
package message.kvstore.aliyun.com; import java.util.UUID; import redis.clients.jedis.JedisPubSub; public class KVStorePubSubTest { //ApsaraDB for Redis的连接信息,从控制台可以获得 static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com"; static final int port = 6379; static final String password="xxxxxxxxxx:yyyyyyyy";//kvstore_instance_id:password public static void main(String[] args) throws Exception{ KVStorePubClient pubClient = new KVStorePubClient(host, port,password); final String channel = "KVStore频道-A"; //消息发送者开始发消息,此时还无人订阅,所以此消息不会被接收 pubClient.pub(channel, "Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)"); //消息接收者 KVStoreSubClient subClient = new KVStoreSubClient(host, port,password); JedisPubSub listener = new KVStoreMessageListener(); subClient.setChannelAndListener(listener, channel); //消息接收者开始订阅 subClient.start(); //消息发送者继续发消息 for (int i = 0; i < 5; i++) { String message=UUID.randomUUID().toString(); pubClient.pub(channel, message); Thread.sleep(1000); } //消息接收者主动取消订阅 subClient.unsubscribe(channel); Thread.sleep(1000); pubClient.pub(channel, "Aliyun消息2:(此时订阅取消,所以此消息不会被接收)"); //消息发布者结束发送,即发送一个“quit”消息; //此时如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”时,将执行“unsubscribe”操作。 pubClient.close(channel); } }
运行结果
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)
----------订阅消息SUBSCRIBE 开始-------
>>> 订阅(SUBSCRIBE) > Channel:KVStore频道-A
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:ed5924a9-016b-469b-8203-7db63d06f812
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
>>> 取消订阅(UNSUBSCRIBE) > Channel:KVStore频道-A
----------订阅消息SUBSCRIBE 结束-------
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息2:(此时订阅取消,所以此消息不会被接收)
>>> 发布(PUBLISH)结束 > Channel:KVStore频道-A > Message:quit
发表评论
-
Canal相关理解
2017-12-29 16:18 434转载:http://www.importnew.com/251 ... -
kettle部署
2017-12-26 16:04 6721.将jmbi sql先上生产环境, 参考附件jmbi.sql ... -
crontab定时运行MR不行,手动shell可以执行成功问题排查过程
2017-12-26 15:48 790设置了定时任务,但MR任务没有执行。 第一步:手动执行she ... -
Flume+kafka+Spark Steaming demo2
2017-11-22 13:15 435一,flume配置 # Name the components ... -
Flume+Kafka+Spark Steaming demo
2017-11-21 15:21 413一.准备flume配置 a1.sources = r1 a1. ... -
HBase表导出成HDFS
2017-10-19 19:40 860导出步骤:在old cluster上/opt/cloudera ... -
zepplin实战
2017-10-13 16:10 336一句话介绍Zeppelin 以笔记(Note)的形式展示的数据 ... -
Azkaban安装
2017-10-10 18:32 879一.下载 https://github.com/azkaban ... -
KYKIN安装
2017-09-30 17:35 121. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
KYKIN安装
2017-09-30 17:40 3351. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
Logstash安装部署配置
2017-04-28 10:24 960为了实现各业务平台日志信息采集到大数据平台hdf ... -
HBASE API
2017-04-18 11:01 443package org.jumore.test; impor ... -
Ambari卸载shell
2017-03-28 17:28 436#!/bin/bash # Program: # uni ... -
linux ssh 相互密码登录
2017-02-22 13:40 3621.修改集群各机器名称 vim /etc/sysconfig/ ... -
Kettle Linux 安装部署
2017-02-15 17:20 1294一.安装JDK环境:根据自己的linux系统选择相应的版本,比 ... -
hadoop环境搭建
2017-01-23 17:31 326192.168.23.231 server1 192.168. ... -
环境安装
2017-01-17 16:26 365物理机部署分配 3台物理机上部署 Zookeeper 3个,F ... -
Storm demo
2016-12-19 15:50 419public class SentenceSpout exte ... -
运行Hadoop jar 第三方jar包依赖
2016-08-22 13:47 963将自己编写的MapReduce程序打包成jar后,在运行 ha ... -
windows10下运行MR错误
2016-07-05 13:45 1577当在windows下运行MR程序时,会报各种错误。现把这次碰到 ...
相关推荐
Java实现Redis的消息订阅和发布实例。
Redis支持跨进程发布订阅机制。代码实现了key过期的notification.
Redis发布与订阅系统源码,统一配置更新数据库,亲测可用,简单易懂。切记运行程序前要开启Redis服务.
实现redis发布订阅的一个小Demo,一个发布消息,其他订阅了的都能接收消息
redis消息订阅发布
基于netcore 3.0的redis发布订阅示例代码,直接可以运行,学习netcore和redis的很好入门示例代码。
Redis 发布订阅 Demo,SpringBoot 使用 Redis 发布订阅模式
基于muduo网络库的集群聊天服务器和客户端源码,使用nginx tcp负载均衡,mysql数据库,redis发布-订阅数据库,redis发布-订阅 基于muduo网络库的集群聊天服务器和客户端源码,使用nginx tcp负载均衡,mysql数据库,...
redis订阅机制,一方面推送消息,另一方面同时接收消息。
redis绑定webSocket发布订阅,进行长连接推送,用以暂时进度条,查看任务进行状态,失败数量与成功数量
Java实现Redis的消息订阅和发布源码
本案例包含redis的发布订阅功能,以及dotnet core+SignalR实现的简单即时通信,并提供文档笔记。本案例初衷是想结合redis的发布订阅功能+websocket实现消息客户端页面订阅指定的消息,并在客户端页面进行显示;
可以工作在nginx tcp负载均衡环境中的集群聊天服务器和客户端源码 基于nuduo库实现 使用了redis发布订阅消息队列 数据库采用MySQL 可以工作在nginx tcp负载均衡环境中的集群聊天服务器和客户端源码 基于nuduo库实现 ...
Redis支持跨进程发布订阅机制。代码实现了key过期的notification.
redispubandsub订阅预发布,本包使用C#编写的代码,在使用之前需要有redis的服务,否则无法使用
基于ssm实现websocket长连接+redis发布/订阅消息,服务端实时推送消息至前端页面,实时通信。内含前端代码,如需sql文件请下载https://download.csdn.net/download/gmetbtgbki/10824890
SpringBoot + Redis实现事件的发布订阅功能。详情可看博文https://blog.csdn.net/linhaiyun_ytdx/article/details/103569370
Redis在Delphi7下使用,支持发布、订阅等
springboot+redis+websocket 使用redis发布订阅实现websocket集群
集群聊天服务器(nginx tcp负载均衡模块、muduo网络库、基于发布-订阅的redis消息队列、mysql数据库) 集群聊天服务器(nginx tcp负载均衡模块、muduo网络库、基于发布-订阅的redis消息队列、mysql数据库) 集群聊天...