`
QING____
  • 浏览: 2232167 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Spring-data-redis: 分布式队列

 
阅读更多

    Redis中list数据结构,具有“双端队列”的特性,同时redis具有持久数据的能力,因此redis实现分布式队列是非常安全可靠的。它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。Redis本身的高性能和"便捷的"分布式设计(replicas,sharding),可以为实现"分布式队列"提供了良好的基础.

    Redis中的队列阻塞时,整个connection都无法继续进行其他操作,因此在基于连接池设计是需要注意。

    我们通过spring-data-redis,来实现“同步队列”,设计风格类似与JMS。不过本实例中,并没有提供关于队列消费之后的消息确认机制,如果你感兴趣可以自己尝试实现它。

    1) Redis中的"队列"为双端队列,基于list数据结构实现,并提供了"队列阻塞"功能.

    2) 如果你期望使用redis做"分布式队列"server,且数据存取较为密集时,务必配置(redis.conf)中关于list数据结构的限制:

//当list中数据个数达到阀值是,将会被重构为linkedlist
//如果队列的存/取速度较为接近,此值可以稍大
list-max-ziplist-entries 5120
list-max-ziplist-value 1024

    3) Redis已经提供了"队列"的持久化能力,无需额外的技术支持

    4) Redis并没有提供JMS语义中"queue"消息的消费确认的功能,即当队列中的消息被redis-client接收之后,并不会执行"确认消息已到达"的操作;如果你的分布式队列,需要严格的消息确认,需要额外的技术支持.

    5) Redis并不能像JMS那样提供高度中心化的"队列"服务集群,它更适合"快速/小巧/及时消费"的情景.

    6) 本例中,对于消息的接收,是在一个后台线程中进行(参见下文RedisQueue),其实我们可以使用线程池的方式来做,以提高性能. 不过此方案,需要基于2个前提:

        A) 如果单个queue中的消息较多,且每条消息的处理时间较长(即消费速度比接收的速度慢)

        B) 如果此线程池可以被多个queue公用线程资源 ,如果一个queue就创建一个线程池,实在是有些浪费且存在不安全问题.

        C) 需要确认,多线程环境中对queue的操作,有可能在客户端层面打乱了队列的顺序,而造成异常.比如线程1从queue中获得data1,线程2从queue中获得data2,有可能因为线程调度的问题,导致data2被优先执行.

 

一.配置文件:

<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">
	<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
		<property name="maxActive" value="32"></property>
		<property name="maxIdle" value="6"></property>
		<property name="maxWait" value="15000"></property>
		<property name="minEvictableIdleTimeMillis" value="300000"></property>
		<property name="numTestsPerEvictionRun" value="3"></property>
		<property name="timeBetweenEvictionRunsMillis" value="60000"></property>
		<property name="whenExhaustedAction" value="1"></property>
	</bean>
	<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">
		<property name="poolConfig" ref="jedisPoolConfig"></property>
		<property name="hostName" value="127.0.0.1"></property>
		<property name="port" value="6379"></property>
		<property name="password" value="0123456"></property>
		<property name="timeout" value="15000"></property>
		<property name="usePool" value="true"></property>
	</bean>
	<bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
		<property name="connectionFactory" ref="jedisConnectionFactory"></property>
		<property name="defaultSerializer">
			<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
		</property>
	</bean>
	<bean id="jedisQueueListener" class="com.sample.redis.sdr.QueueListener"/>
	<bean id="jedisQueue" class="com.sample.redis.sdr.RedisQueue" destroy-method="destroy">
		<property name="redisTemplate" ref="jedisTemplate"></property>
		<property name="key" value="user:queue"></property>
		<property name="listener" ref="jedisQueueListener"></property>
	</bean>
</beans>

二.程序实例:

1) QueueListener:当队列中有数据时,可以执行类似于JMS的回调操作。

public interface RedisQueueListener<T> {

	public void onMessage(T value);
}
public class QueueListener<String> implements RedisQueueListener<String> {

	@Override
	public void onMessage(String value) {
		System.out.println(value);
		
	}

}

2) RedisQueue:队列操作,内部封装redisTemplate实例;如果配置了“listener”,那么queue将采用“消息回调”的方式执行,listenerThread是一个后台线程,用来自动处理“队列信息”。如果不配置“listener”,那么你可以将redisQueue注入到其他spring bean中,手动去“take”数据即可。

public class RedisQueue<T> implements InitializingBean,DisposableBean{
	private RedisTemplate redisTemplate;
	private String key;
	private int cap = Short.MAX_VALUE;//最大阻塞的容量,超过容量将会导致清空旧数据
	private byte[] rawKey;
	private RedisConnectionFactory factory;
	private RedisConnection connection;//for blocking
	private BoundListOperations<String, T> listOperations;//noblocking
	
	private Lock lock = new ReentrantLock();//基于底层IO阻塞考虑
	
	private RedisQueueListener listener;//异步回调
	private Thread listenerThread;
	
	private boolean isClosed;
	
	public void setRedisTemplate(RedisTemplate redisTemplate) {
		this.redisTemplate = redisTemplate;
	}

	public void setListener(RedisQueueListener listener) {
		this.listener = listener;
	}

	public void setKey(String key) {
		this.key = key;
	}
	

	@Override
	public void afterPropertiesSet() throws Exception {
		factory = redisTemplate.getConnectionFactory();
		connection = RedisConnectionUtils.getConnection(factory);
		rawKey = redisTemplate.getKeySerializer().serialize(key);
		listOperations = redisTemplate.boundListOps(key);
		if(listener != null){
			listenerThread = new ListenerThread();
			listenerThread.setDaemon(true);
			listenerThread.start();
		}
	}
	
	
	/**
	 * blocking
	 * remove and get last item from queue:BRPOP
	 * @return
	 */
	public T takeFromTail(int timeout) throws InterruptedException{ 
		lock.lockInterruptibly();
		try{
			List<byte[]> results = connection.bRPop(timeout, rawKey);
			if(CollectionUtils.isEmpty(results)){
				return null;
			}
			return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));
		}finally{
			lock.unlock();
		}
	}
	
	public T takeFromTail() throws InterruptedException{
		return takeFromTail(0);
	}
	
	/**
	 * 从队列的头,插入
	 */
	public void pushFromHead(T value){
		listOperations.leftPush(value);
	}
	
	public void pushFromTail(T value){
		listOperations.rightPush(value);
	}
	
	/**
	 * noblocking
	 * @return null if no item in queue
	 */
	public T removeFromHead(){
		return listOperations.leftPop();
	}
	
	public T removeFromTail(){
		return listOperations.rightPop();
	}
	
	/**
	 * blocking
	 * remove and get first item from queue:BLPOP
	 * @return
	 */
	public T takeFromHead(int timeout) throws InterruptedException{
		lock.lockInterruptibly();
		try{
			List<byte[]> results = connection.bLPop(timeout, rawKey);
			if(CollectionUtils.isEmpty(results)){
				return null;
			}
			return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));
		}finally{
			lock.unlock();
		}
	}
	
	public T takeFromHead() throws InterruptedException{
		return takeFromHead(0);
	}

	@Override
	public void destroy() throws Exception {
		if(isClosed){
			return;
		}
		shutdown();
		RedisConnectionUtils.releaseConnection(connection, factory);
	}
	
	private void shutdown(){
		try{
			listenerThread.interrupt();
		}catch(Exception e){
			//
		}
	}
	
	class ListenerThread extends Thread {
		
		@Override
		public void run(){
			try{
				while(true){
					T value = takeFromHead();//cast exception? you should check.
					//逐个执行
					if(value != null){
						try{
							listener.onMessage(value);
						}catch(Exception e){
							//
						}
					}
				}
			}catch(InterruptedException e){
				//
			}
		}
	}
	
}

    3) 使用与测试:

public static void main(String[] args) throws Exception{
	ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-redis-beans.xml");
	RedisQueue<String> redisQueue = (RedisQueue)context.getBean("jedisQueue");
	redisQueue.pushFromHead("test:app");
	Thread.sleep(15000);
	redisQueue.pushFromHead("test:app");
	Thread.sleep(15000);
	redisQueue.destroy();
}

    在程序运行期间,你可以通过redis-cli(客户端窗口)执行“lpush”,你会发现程序的控制台仍然能够正常打印队列信息。

 

 

 

 

 

2
0
分享到:
评论
2 楼 QING____ 2015-03-11  
myprincejava 写道
楼主,你这个jedis和spring-data-redis都是什么版本?

比较旧的版本,貌似是m1,不过最新release的版本中,可能部分接口已经改变。此文仅供参考。
1 楼 myprincejava 2015-03-11  
楼主,你这个jedis和spring-data-redis都是什么版本?

相关推荐

    202319-分布式面试必会(2023最新版)思维导图.zip

    - Spring Data - Spring Security - Spring Cloud 4. Web开发: - HTML、CSS、JavaScript - HTTP协议 - Servlet、JSP - AJAX、JSON、XML 5. 框架和工具: - MyBatis - Hibernate - Maven、Gradle - ...

    单点登录源码

    Redis | 分布式缓存数据库 | [https://redis.io/](https://redis.io/) Solr & Elasticsearch | 分布式全文搜索引擎 | [http://lucene.apache.org/solr/](http://lucene.apache.org/solr/) [https://www.elastic.co/]...

    Java Springboot学习资料.rar

    整合SpringDataJpa 整合Mybatis 通用Mapper与分页插件的集成 整合Lettuce Redis 使用Spring Cache集成Redis 集成Swagger在线调试 初探RabbitMQ消息队列 RabbitMQ延迟队列 actuator 服务监控与管理 actuator与spring-...

    基于SpringBoot框架搭建的物联网数据采集系统服务器端(源码)

    * 传感器提交Data数据时使用添加缓存,不直接操作数据库,而是将Data添加到Redis中形成缓存队列,提高并发效率 * 将用户登录信息不直接存入session,而是存入Redis缓存,以实现分布式session共享 * 3.提交Data数据...

    jeesuite-libs-其他

    事务内操作强制读主库基于注解自动缓存管理(所有查询方法结果自动缓存、自动更新,事务回滚缓存同步回滚机制)自动缓存实现基于jeesuite-cache和spring-data-redis分页组件敏感操作拦截scheduler模块支持分布式保证...

    Java及大数据学习路线.pdf

    Nginx反向代理、负载均衡、动静分离 JVM内存模型、参数调优 JUC线程⾼级 分布式架构注册中⼼Zookeeper 虚拟化应⽤容器Docker 全⽂检索引擎ElasticSearch 消息队列RabbitMQ 前端VUE/ES6 2⼤数据学习路线 2.1JavaSE ...

    java开源包1

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包10

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包11

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包2

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包3

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包6

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包5

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包4

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包8

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包7

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包9

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包101

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    Java资源包01

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    JAVA上百实例源码以及开源项目

    第一步:运行ServerData.java 启动服务器,然后服务器处于等待状态 第二步:运行LoginData.java 启动(客户端)登陆界面 输入用户名 ip为本机localhost 第三步:在登陆后的界面文本框输入文本,然后发送 可以同时启动...

Global site tag (gtag.js) - Google Analytics