遇到的问题
在使用redis的过程中,尤其是在做大数据“实时计算”的过程中,也许会经常遇到下列场景:比如网站每个页面的实时pv运算,使用storm(或者spark streaming)从kafka中消费实时点击流数据进行统计计算;并计算好的结果放入redis进行存储,如果redis中已经存在 需要先取出来与最新统计数据相加再放入redis;另外为了减少与redis的交互次数,降低redis的存取压力,一般不会消费一条数据后就立即放入redis,而是处理一批数据后再存入redis,大致流程如下:
网站实时pv计算中可能比这个图的处理要更复杂些,但大致流程也就这样了。这里我们重点看下redis的相关处理,java伪代码实现如下:
public static void main(String[] args) { Redis redis = null;//省略redis实例化过程 //省略storm计算过程,这里假设已经计算好,并放入一个MAP中 Map<String,Integer> pagesPv = new HashMap(); pagesPv.put("page_index",4); pagesPv.put("page_product",2); pagesPv.put("page_cart",3); pagesPv.put("page_order",1); for(String key:pagesPv.keySet()){ redis.incrBy(key,pagesPv.get(key));//incrBy是redis中的自增操作 } }
首先说下redis中的incrBy方法,可以实现数字类型的自增操作。也可以用redis的get方法先获取到老数据,然后加上新增的值,再调用redis的set方法写入redis。但这就跟redis有两次操作,这里更推荐是用incrBy。
咋一看上述代码其实没啥问题,这里Map里只有4条数据,也就是说上述的for循环需要与redis服务器之间做4次远程调用。但一个稍微知名一点的网站,每分钟可能会数万级别的访问量,对应到这个Map里可能会有数百个页面,也就是说这个真实的情况下,如果使用上述代码的话,这个for循环中与redis服务之间会有数百次远程调用操作,对redis服务来说这无疑是一笔较大的开销。讲到这里该是pipeline登场的时候了。
Pipeline是什么
Pipeline其实是redis的java客户端对redis的基本事务的调用封装。这里所谓的“基本事务”与传统的关系型数据库事务回滚不同,而是按照顺序执行一批命令,在redis中是使用MULTI和EXEC命令实现的,而Pipeline只是对这两个命令的java客户端封装而已。
在redis的客户端中执行事务分三步:首先执行MULTI命令;然后输入一批我们想要执行的其他操作(比如一批set操作),这批操作会被写到一个队列里;最后再执行EXEC命令,客户端会把这上述队列“一次性”的发生到服务端,并等待服务端返回。
再来看服务端:服务端接收到这个命令操作队列后,按照顺序一个一个执行,每个命令操作都会对应一个返回值,待队列中所有的命令都执行完成后“一次性”把这些返回值拼装成list返回给客户端。并且这个返回值list的数量和顺序与命令队列一致。整个pipeline的执行过程结束,在有redis需要命令批量执行的情况下,使用pipeline可以大大减少redis客户端与服务端交互次数,从而提升多命令的执行性能。
回到文章开头的场景,假设Map里有500个值,for循环中就是执行500次incrBy操作,对应的与服务端的交互就需要500次。如果使用pipeline,只需要交互两次即可完成,java伪代码实现如下:
public static void main(String[] args) { Redis redis = null;//省略redis实例化过程 Map<String,Integer> pagesPv = new HashMap(); pagesPv.put("page_index",4); pagesPv.put("page_product",2); pagesPv.put("page_cart",3); pagesPv.put("page_order",1); Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例 for(String key:pagesPv.keySet()){ //注意这里调用Pipeline的incrBy方法,此时命令并没有执行 pipeline.incrBy(key,pagesPv.get(key)); } //执行sync方法,此时批量向服务端提交命令,并等待返回 pipeline.sync(); }
整个过程大致分为三步:
1、通过redis.pipelined()方法,获取pipeline实例,与上述讲解中的MULTI命令对应。
2、以前调用的是redis. incrBy()方法,现在改为调用pipeline的incrBy方法,注意此时不会向服务端发起调用请求,只是把命令写入队列。
3、执行pipeline.sync()方法,与上述流程中的EXEC命令对应,此时会把第2步中的命令队列一次性的提交给服务端,并等待服务端返回。
Pipeline是非线程安全的
Pipeline能提升性能,而且使用起来也非常方便,但使用的时候一定要注意一点“Pipeline是非线程安全的”。也就是说多个线程如果公用一个Pipeline实例,会出现线程安全问题,典型的就是数据返回结果错乱。正确的用法是在每次需要用到Pipeline的地方,都新建一个实例即:
Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例
为什么在多线程下是线程不安全的,其实很好理解,看下图:
A、B两个线程共用一个pipeline实例,同时向redis服务端提交5个命令,各自都期望收到5个返回值。但真实的结果是有一个收到10个结果,有一个会失败,这其实不是我们期望的。
在使用Spring框架的java程序中,redis的客户端对象是线程安全的,可以单例注入spring容器,在需要redis客户端的地方直接使用@Resource直接获取即可。
@Resource private Redis redis; public void method(){ redis.xxx; }
但pipeline是非线程安全的,每次都必须新建实例,如果你的代码中出现了下列代码,请注意会有线程安全问题,请及时修正:
//错误的写法,pipeline被单例注入了spring容器,全局复用 @Resource private Pipeline pipeline; public void method(){ pipeline.xxx; }
Spring中Pipeline的正确用法:
@Resource private Redis redis; public void method(){ Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例 pipeline.xxx;//多个操作 pipeline. sync(); }
最后提一点,Pipeline是一次提交一个队列给服务端,这个队列如果太大会占用更多内存,以及增加网络传输时间。所以 Pipeline里一次提交的命令数也不要太多,根据实际数据量大小 一般几百条还是可以的。
好了,关于redis中的pipeline就总结到这里。
出处
http://moon-walker.iteye.com/blog/2397962
相关推荐
Redis-PipeLine批量导入.docx
主要给大家介绍了关于Redis利用Pipeline加速查询速度的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Redis具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提升吞吐量方面发生的效用。 案例背景 应用系统在数据推送或事件处理过程中,往往出现数据流经过多个网元; 然而在某些服务中,数据操作对redis 是强...
kettle如何3秒内写入100万条数据到Redis https://blog.csdn.net/huryer/article/details/106889792
redis pipleLine
主要介绍了在Redis集群中使用pipeline批量插入的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
本篇文章主要介绍了Java使用Pipeline对Redis批量读写(hmset&hgetall),具有一定的参考价值,有兴趣的可以了解一下。
RedisPipe - 具有隐式流水线操作的Go开发的高吞吐量Redis客户端
java客户端不是很好支持redis cluster,spring-date-redis和jedis批量提交还不支持,单个提交都是可以的。 为了批量解决批量提交 网上有几个方案,本示例使用了其中一种,demo里的JedisClusterPipeline类是网上找的...
今天小编就为大家分享一篇python使用pipeline批量读写redis的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
Redis中的管道(PipeLine)特性:简述一下就是,Redis如何从客户端一次发送多个命令,服务端到客户端如何一次性响应多个命令。 Redis使用的是客户端-服务器模型和请求/响应协议的TCP服务器,这就意味着一个请求要有...
同意-Redis-Wrappers 这些是Appgree使用的类,用于封装和改进Jedis客户端,从而增加了流水线功能,主从控制和本地内存缓存,从而减少了不同线程重复读取的需求。 这些类可以单独使用或嵌套使用。执照该软件受Apache ...
一、 redis 环境搭建 2 二、 redis学习笔记之...五、 redis学习笔记之pipeline 20 六、 redis学习笔记之发布订阅 23 七、 redis学习笔记之持久化 28 八、 redis学习笔记之主从复制 30 九、 redis学习笔记之虚拟内存 31
ISP库使用redis格式转换shell脚本 将csv的ISP库文件转化为redis pipeline的命令
【中间件篇-Redis缓存数据库】Redis高级特性和应用(慢查询、Pipeline、事务、Lua)
redis实操代码 发布/订阅、Lua、PipeLine等