Redis或许已经在很多企业开始推广并试水,本文也根据个人的实践,简单描述一下Redis在实际开发过程中的使用(部署与架构,稍后介绍),程序执行环境为java + jedis,关于spring下如何集成redis-api,稍后介绍吧。
前言:下载redis-2.6.2,安装好redis之后,请在redis.conf文件中,将如下3个配置属性开启(仅供测试使用):
##客户端链接的端口,也是server端侦听client链接的端口 ##每个client实例,都将和server在此端口上建立tcp长链接 port 6379 ## server端绑定的ip地址,如果一个物理机器有多个网络接口时,可以明确指定为某个网口的ip地址 bind 127.0.0.1 ##链接中io操作空闲时间,如果在指定时间内,没有IO操作,链接将会被关闭 ##此属性和TCP链接中的timeout选项一样,建议设置为0,很多时候,我们一个应用也只会有一个redis实例 ##不过,如果你使用连接池的话,你需要对此参数做额外的考虑。 timeout 0
Pub/Sub: "发布/订阅",对于此功能,我们将会想到很多JMS实现,Redis提供此功能显的“多此一举”;不过这个功能在redis中,被设计的非常轻量级和简洁,它做到了消息的“发布”和“订阅”的基本能力,但是尚未提供JMS中关于消息的持久化/耐久性等各种企业级的特性。
一个Redis client发布消息,其他多个redis client订阅消息,发布的消息“即发即失”,redis不会持久保存发布的消息;消息订阅者也将只能得到订阅之后的消息,通道中此前的消息将无从获得。这就类似于JMS中“非持久”类型的消息。
消息发布者,即publish客户端,无需独占链接,你可以在publish消息的同时,使用同一个redis-client链接进行其他操作(例如:INCR等)
消息订阅者,即subscribe客户端,需要独占链接,即进行subscribe期间,redis-client无法穿插其他操作,此时client以阻塞的方式等待“publish端”的消息;这一点很好理解,因此subscribe端需要使用单独的链接,甚至需要在额外的线程中使用。
一旦subscribe端断开链接,将会失去部分消息,即链接失效期间的消息将会丢失。
如果你非常关注每个消息,那么你应该考虑使用JMS或者基于Redis做一些额外的补充工作,如果你期望订阅是持久的,那么如下的设计思路可以借鉴(如下原理基于JMS):
1) subscribe端首先向一个Set集合中增加“订阅者ID”,此Set集合保存了“活跃订阅”者,订阅者ID标记每个唯一的订阅者,例如:sub:email,sub:web。此SET称为“活跃订阅者集合”
2) subcribe端开启订阅操作,并基于Redis创建一个以“订阅者ID”为KEY的LIST数据结构,此LIST中存储了所有的尚未消费的消息。此LIST称为“订阅者消息队列”
3) publish端:每发布一条消息之后,publish端都需要遍历“活跃订阅者集合”,并依次向每个“订阅者消息队列”尾部追加此次发布的消息。
4) 到此为止,我们可以基本保证,发布的每一条消息,都会持久保存在每个“订阅者消息队列”中。
5) subscribe端,每收到一个订阅消息,在消费之后,必须删除自己的“订阅者消息队列”头部的一条记录。
6) subscribe端启动时,如果发现自己的自己的“订阅者消息队列”有残存记录,那么将会首先消费这些记录,然后再去订阅。
--------------------------------------------------------------非持久化订阅-------------------------------------------------------
PrintListener.java:订阅者消息处理器
public class PrintListener extends JedisPubSub{ @Override public void onMessage(String channel, String message) { String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"); System.out.println("message receive:" + message + ",channel:" + channel + "..." + time); //此处我们可以取消订阅 if(message.equalsIgnoreCase("quit")){ this.unsubscribe(channel); } } ... }
PubClient.java:消息发布端
public class PubClient { private Jedis jedis;// public PubClient(String host,int port){ jedis = new Jedis(host,port); } public void pub(String channel,String message){ jedis.publish(channel, message); } public void close(String channel){ jedis.publish(channel, "quit"); jedis.del(channel);// } }
SubClient.java:消息订阅端
public class SubClient { private Jedis jedis;// public SubClient(String host,int port){ jedis = new Jedis(host,port); } public void sub(JedisPubSub listener,String channel){ jedis.subscribe(listener, channel); //此处将会阻塞,在client代码级别为JedisPubSub在处理消息时,将会“独占”链接 //并且采取了while循环的方式,侦听订阅的消息 // } }
PubSubTestMain.java:测试引导类
public class PubSubTestMain { /** * @param args */ public static void main(String[] args) throws Exception{ PubClient pubClient = new PubClient(Constants.host, Constants.port); final String channel = "pubsub-channel"; pubClient.pub(channel, "before1"); pubClient.pub(channel, "before2"); Thread.sleep(2000); //消息订阅着非常特殊,需要独占链接,因此我们需要为它创建新的链接; //此外,jedis客户端的实现也保证了“链接独占”的特性,sub方法将一直阻塞, //直到调用listener.unsubscribe方法 Thread subThread = new Thread(new Runnable() { @Override public void run() { try{ SubClient subClient = new SubClient(Constants.host, Constants.port); System.out.println("----------subscribe operation begin-------"); JedisPubSub listener = new PrintListener(); //在API级别,此处为轮询操作,直到unsubscribe调用,才会返回 subClient.sub(listener, channel); System.out.println("----------subscribe operation end-------"); }catch(Exception e){ e.printStackTrace(); } } }); subThread.start(); int i=0; while(i < 10){ String message = RandomStringUtils.random(6, true, true);//apache-commons pubClient.pub(channel, message); i++; Thread.sleep(1000); } //被动关闭指示,如果通道中,消息发布者确定通道需要关闭,那么就发送一个“quit” //那么在listener.onMessage()中接收到“quit”时,其他订阅client将执行“unsubscribe”操作。 pubClient.close(channel); //此外,你还可以这样取消订阅 //listener.unsubscribe(channel); } }
--------------------------------------------------------------持久化订阅-------------------------------------------------------
基本思路:当订阅者订阅消息时,将此订阅者信息添加到一个列表中,此列表为“所有订阅者列表”,同时为每个订阅者都创建一个保存消息(内容或者消息ID)的队列,消息发布者将每条消息都添加到每个订阅者的队列中。
如下实现仅供参考,有很多更优的实现方式。
PPrintListener.java
public class PPrintListener extends JedisPubSub{ private String clientId; private PSubHandler handler; public PPrintListener(String clientId,Jedis jedis){ this.clientId = clientId; handler = new PSubHandler(jedis); } @Override public void onMessage(String channel, String message) { //此处我们可以取消订阅 if(message.equalsIgnoreCase("quit")){ this.unsubscribe(channel); } handler.handle(channel, message);//触发当前订阅者从自己的消息队列中移除消息 } private void message(String channel,String message){ String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"); System.out.println("message receive:" + message + ",channel:" + channel + "..." + time); } @Override public void onPMessage(String pattern, String channel, String message) { System.out.println("message receive:" + message + ",pattern channel:" + channel); } @Override public void onSubscribe(String channel, int subscribedChannels) { handler.subscribe(channel); System.out.println("subscribe:" + channel + ";total channels : " + subscribedChannels); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { handler.unsubscribe(channel); System.out.println("unsubscribe:" + channel + ";total channels : " + subscribedChannels); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { System.out.println("unsubscribe pattern:" + pattern + ";total channels : " + subscribedChannels); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { System.out.println("subscribe pattern:" + pattern + ";total channels : " + subscribedChannels); } @Override public void unsubscribe(String... channels) { super.unsubscribe(channels); for(String channel : channels){ handler.unsubscribe(channel); } } class PSubHandler { private Jedis jedis; PSubHandler(Jedis jedis){ this.jedis = jedis; } public void handle(String channel,String message){ int index = message.indexOf("/"); if(index < 0){ return; } Long txid = Long.valueOf(message.substring(0,index)); String key = clientId + "/" + channel; while(true){ String lm = jedis.lindex(key, 0);//获取第一个消息 if(lm == null){ break; } int li = lm.indexOf("/"); //如果消息不合法,删除并处理 if(li < 0){ String result = jedis.lpop(key);//删除当前message //为空 if(result == null){ break; } message(channel, lm); continue; } Long lxid = Long.valueOf(lm.substring(0,li));//获取消息的txid //直接消费txid之前的残留消息 if(txid >= lxid){ jedis.lpop(key);//删除当前message message(channel, lm); continue; }else{ break; } } } public void subscribe(String channel){ String key = clientId + "/" + channel; boolean exist = jedis.sismember(Constants.SUBSCRIBE_CENTER,key); if(!exist){ jedis.sadd(Constants.SUBSCRIBE_CENTER, key); } } public void unsubscribe(String channel){ String key = clientId + "/" + channel; jedis.srem(Constants.SUBSCRIBE_CENTER, key);//从“活跃订阅者”集合中删除 jedis.del(key);//删除“订阅者消息队列” } } }
PPubClient.java
public class PPubClient { private Jedis jedis;// public PPubClient(String host,int port){ jedis = new Jedis(host,port); } /** * 发布的每条消息,都需要在“订阅者消息队列”中持久 * @param message */ private void put(String message){ //期望这个集合不要太大 Set<String> subClients = jedis.smembers(Constants.SUBSCRIBE_CENTER); for(String clientKey : subClients){ jedis.rpush(clientKey, message); } } public void pub(String channel,String message){ //每个消息,都有具有一个全局唯一的id //txid为了防止订阅端在数据处理时“乱序”,这就要求订阅者需要解析message Long txid = jedis.incr(Constants.MESSAGE_TXID); String content = txid + "/" + message; //非事务 this.put(content); jedis.publish(channel, content);//为每个消息设定id,最终消息格式1000/messageContent } public void close(String channel){ jedis.publish(channel, "quit"); jedis.del(channel);//删除 } public void test(){ jedis.set("pub-block", "15"); String tmp = jedis.get("pub-block"); System.out.println("TEST:" + tmp); } }
PPSubClient.java
public class PSubClient { private Jedis jedis;// private JedisPubSub listener;//单listener public PSubClient(String host,int port,String clientId){ jedis = new Jedis(host,port); listener = new PPrintListener(clientId, new Jedis(host, port)); } public void sub(String channel){ jedis.subscribe(listener, channel); } public void unsubscribe(String channel){ listener.unsubscribe(channel); } }
PPubSubTestMain.java
public class PPubSubTestMain { /** * @param args */ public static void main(String[] args) throws Exception{ PPubClient pubClient = new PPubClient(Constants.host, Constants.port); final String channel = "pubsub-channel-p"; final PSubClient subClient = new PSubClient(Constants.host, Constants.port,"subClient-1"); Thread subThread = new Thread(new Runnable() { @Override public void run() { System.out.println("----------subscribe operation begin-------"); //在API级别,此处为轮询操作,直到unsubscribe调用,才会返回 subClient.sub(channel); System.out.println("----------subscribe operation end-------"); } }); subThread.setDaemon(true); subThread.start(); int i = 0; while(i < 2){ String message = RandomStringUtils.random(6, true, true);//apache-commons pubClient.pub(channel, message); i++; Thread.sleep(1000); } subClient.unsubscribe(channel); } }
相关推荐
完整的代码,使用vs2015创建。通过一个64位StackExchange.Redis.dll(开源,1.2.6版),创建客户端,管理和使用Redis的PUB/SUB功能,适合集成进项目使用。 (需正常配置Redis客户端后才可使用)
redis-spring-pub_sub
dtalk就是为了实现上述的目标而开发的一个Redis发布订阅(pub/sub)系统实现的前端设备控制框架,在dtalk框架上,Redis服务器用于提供中转服务。前端设备通过订阅特定的频道接收管理发送的请求消息,执行对应的功能。...
跨平台的开源Redis DB管理工具 源文件地址:https://redisdesktop.com/
mv redis-3.0.7.tar.gz /usr/local/redis/ cd /usr/local/redis tar -xvf redis-3.0.7.tar.gz 5.安装c语言环境 yum install gcc-c++ 6.编译redis cd /usr/local/redis/redis-3.0.7 make 7.安装redis make ...
thunk-redis, 基于 thunk/promise的redis客户端,支持所有redis特性 thunk基于 thunk/promise的redis客户端,支持所有redis特性。 插件实现:thunk RateLimiter 最快的抽象速率限制器。定时队列分布式计时作业队列,...
redis实操代码 发布/订阅、Lua、PipeLine等
该资源合集内容包括:redis-x64-2.8,与之匹配的扩展文件php_redis.dll和php_igbinary.dll文件,使用php5.6 -ntx(亲测有效,注意文件适用都是nts的)
Redis命令参考手册完整版 又不懂得可以@我 一起学习开发交流redis redis的强大之处 redis 消息推送(基于分布式 pub/sub)多用于实时性较高的消息推送,并不保证可靠。 其他的mq和kafka保证可靠但有一些延迟(非实时...
1. 含redis安装包(linux/win) 2. php扩展支持5.3~5.6各版本
使用Redis Pub / Sub + socket.io基于Node.js进行聊天基于简单的应用程序,以显示Redis Pub / Sub机制以及Node.js和socket.io。如何使应用程序运行必须已安装并正在运行。 必须安装 npm安装npm运行redis或npm运行套...
mkdir -p /var/redis/7001 mkdir -p /var/redis/7002 拷贝配置文件: cp /usr/local/redis-3.2.8/redis.conf /etc/redis/7001.conf cp /usr/local/redis-3.2.8/redis.conf /etc/redis/7001.conf 修改/etc/redis/7001...
php_redis-2.1.3-5.2-vc6-ts-4350b2a php_redis-5.3-vc9-ts-73d99c3e phpredis_5.4_vc9_nts phpredis_5.4_vc9_ts
避免踩坑,免费持续更新
该文档介绍docker部署redis/mongod/rabbitmq/nacos/mysql等服务详细步骤
/usr/local/redis/bin/redis-server /usr/local/redis/redis.conf 连接: 用redis-cli cd /usr/local/redis/bin/ ./redis-cli #进入 exit /quit #退出 关闭redis pkill redis-server #关闭 ./redis-cli shutdown ...
Windows 32位、64位的Redis 的NOSQL 数据库程序
64位版本:redis-2.8.9.zip(稳定版本) 32位版本:redisbin.zip linux:redis-2.8.11.tar.gz 以及Redis入门教程ppt一份
使用Redis的pubsub技术构建实时消息系统