`
hellohank
  • 浏览: 143881 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

记录一下延时队列常见几种方案实现

阅读更多

在实际业务场景中,有许多要用到延时消息或消费的功能,最常见的是:下订单后,半小时或指定时间段内如果没有付款,就取消订单。如果使用定时任务轮询的话,不太合适,一来定时任务有一个时间间隔,同时也会导致单线程消息的速度跟不上。

对于这类,常用的解决方案如下:

  • 定时任务轮询

优点:简单方便,实现快速,如果使用得当,可支持分支式集群环境

缺点:轮询的时间间隔及排除处理的方式,会导致触发不及时,而且在量非常大的场景,表现越差

实现代码:略

  • 阻塞队列:DelayQueue

优点:JDK自带,稳定可靠

缺点:考虑复杂场景时,实现较为复杂,另外,如果消息量较多时,内存成为瓶颈,分布式环境实现也会比较复杂。

核心代码:略

  • 时间轮:HashedWheelTimer

优点:Netty自带功能,稳定可靠

缺点:同上DelayQueue。

核心代码:

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

public class HashedWheelTimerTest {
    static class MyTimerTask implements TimerTask {
        boolean flag;

        public MyTimerTask(boolean flag) {
            this.flag = flag;
        }

        public void run(Timeout timeout) throws Exception {
            System.out.println("要去数据库删除订单了。。。。");
            this.flag = false;
        }
    }

    public static void main(String[] argv) {
        MyTimerTask timerTask = new MyTimerTask(true);
        Timer timer = new HashedWheelTimer();
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
        timer.newTimeout(timerTask, 12, TimeUnit.SECONDS);
        int i = 1;
        while (timerTask.flag) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(i + "秒过去了");
            i++;
        }
        timer.stop();
    }
}

 

  • MQ延时消费:RocketMQ/RabbitMQ等

优点:使用MQ自带的延时功能,方便可靠。

缺点:要搭建并维护MQ体系。

核心代码:不同MQ的具体代码不一样,略。

  • Redis队列消费

PS:以下redis的相关代码基于redisson,如果使用jedis或lettuce,逻辑和原理相同,只是调用的组件接口不一样。

自实现redis延时队列

    优点:基于redis,稳定性的性能可以达到一个比较好的平稳

    缺点:要考虑特殊情况(如redis消息丢失等),并自己实现相应的消息收发逻辑。

    核心代码:

    

import org.redisson.Redisson;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 使用redis实现的延时队列
 */
public class RedisDelayQueue {
    public static final String EXPIRED_QUEUE_KEY = "queue:delay:list";

    public static void main(String[] args) {
        DelayTaskProducer producer = new DelayTaskProducer();
        long now = System.currentTimeMillis();
        producer.produce("消息1:dataId=123", now + TimeUnit.SECONDS.toMillis(5));
        producer.produce("消息2:dataId=45", now + TimeUnit.SECONDS.toMillis(15));
        producer.produce("消息3:dataId=234", now + TimeUnit.SECONDS.toMillis(75));
        producer.produce("消息4:dataId=534", now + TimeUnit.SECONDS.toMillis(55));

        //创建多个消息实例
//        for(int i=0;i<10;i++){
        new DelayTaskConsumer().start();
//        }
    }

    public static class DelayTaskProducer {
        /***
         * 将要延时处理的消息放到redis中
         * @param msg 具体消息内容
         * @param expiredAt 指定在哪个时间点到期消费
         */
        public void produce(String msg, long expiredAt) {
            RedissonClient redissonClient = createClient(1);
            RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(EXPIRED_QUEUE_KEY);
            set.add(expiredAt, msg);
        }
    }

    public static class DelayTaskConsumer {
        private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

        public void start() {
            scheduledExecutorService.scheduleWithFixedDelay(new DelayTaskHandler(), 1, 200, TimeUnit.MILLISECONDS);
        }
    }

    public static class DelayTaskHandler implements Runnable {
        @Override
        public void run() {
            RedissonClient redissonClient = createClient(1);
            RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(EXPIRED_QUEUE_KEY);
            Double firstScore = set.firstScore();
            if (firstScore == null || firstScore > System.currentTimeMillis()) {
                return;//如果队列为空,则时间还没到,则不执行
            }
            String firstMsg = set.takeFirst();
            System.out.println("开始消费消息:" + firstMsg);
        }
    }

    protected static RedissonClient createClient(int db) {
        Config config = new Config();
        config.setCodec(new StringCodec());
        config.useSingleServer()
                .setAddress("redis://172.18.12.34:6379")
                .setPassword("beta1234")
                .setConnectionPoolSize(500)
                .setIdleConnectionTimeout(10000)
                .setTimeout(3000)
                .setConnectTimeout(30000)
                .setRetryAttempts(3)
                .setRetryInterval(1000)
                .setDnsMonitoringInterval(-1)
                .setPingConnectionInterval(10000)
                .setDatabase(db);
        return Redisson.create(config);
    }
}

 
基于redis的key过期广播

    优点:触发及时可靠,逻辑简单

    缺点:要防止数据丢失后重新加载的情况;开启服务端相应广播事件队列;对redis存在性能消耗等。

    核心代码:使用redis的key过期事件监听,要开启redis服务,具体开启方式见:https://www.iteye.com/blog/hellohank-2524409,消息收发的示例代码如下:

    

import com.alibaba.fastjson.JSON;
import org.redisson.api.RBucket;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.StringCodec;

import java.util.concurrent.TimeUnit;

/**
 * 基于redis的key过期事件广播队列监听。所以,需要开启redis服务的这个功能。具体看:https://www.iteye.com/blog/hellohank-2524409
 */
public class RedisDelayQueueWithExpiredListener extends RedisDelayQueue{
    public static void main(String[] args) {
        int db = 2;
        RedissonClient redissonClient = createClient(db);
        RTopic topic = redissonClient.getTopic("__keyevent@" + db + "__:expired", new StringCodec());
        topic.addListener(String.class, new MessageListener() {
            @Override
            public void onMessage(CharSequence channel, Object msg) {
                System.out.println("onMessage:" + channel + "; Thread: " + Thread.currentThread().toString());
                System.out.println(msg);
            }
        });
        String key = "test_expire_listen";
        Object value = "val";
        RBucket bucket = redissonClient.getBucket(key);
        bucket.set(JSON.toJSONString(value), 5, TimeUnit.SECONDS);
        bucket.expire(6,TimeUnit.SECONDS);
    }
}

 

    

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics