热爱技术,追求卓越
不断求索,精益求精

使用redis和redisson延迟队列(Delayed Queue)实现定时功能及其注意事项

在前面的一篇文章《java中关于”事件驱动型延时处理“业务需求的几种程序设计方案》中讲到了使用redis的功能实现延迟事件处理的定时任务功能,本文主要讲下使用redisson的延迟队列实现定时功能。redisson相关文档及章节参考:

https://github.com/redisson/redisson/wiki/7.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%9B%86%E5%90%88#715-%E5%BB%B6%E8%BF%9F%E9%98%9F%E5%88%97delayed-queue

官方文档中只是简单介绍了延迟队列的提交,但未涉及到使用的问题,今天我们就补充一下。

现在的使用场景是,电商系统会做促销,有时候每天会有多个促销场次,比如早上9点-11点一场,下午14点-16点一场,晚上20点-23点一场,每个场次的打折力度是不一样的,所以在不同的促销时间段内用户看到的价格也应该不一样,如果价格的计算过程比较复杂,那么产品的起价可能会缓存,所以缓存的更新就会依赖促销的场次变化。

促销规则都是预设的(活动都是有计划的),所以我们现在的解决思路是每天凌晨0点查询出当天需要进行的促销场次,然后把促销ID加入到延迟队列中,促销场次开始时触发起价计算逻辑,重新更起价缓存。

业务相关的就不提供了,此处给出添加队列和使用队列的相关代码吧。先来看添加队列:

/**
     * 尝试添加到监控队列,满足条件是预订开始时间是当天
     * 
     * @param promotionId
     *            促销ID
     * @param bookingStartTime
     *            促销预订开始时间
     * @return
     */
    public boolean tryAddToQueue(Integer promotionId, Date bookingStartTime) {
        Date today = DateUtil.getCurrenctDate();
        Date tommorow = DateUtil.addDay(today, 1);
        if (!DateUtil.between(bookingStartTime, today, tommorow)) {
            return false;
        }
        // 过了当前时间的,不必再加入队列
        Date now = new Date();
        long delay = bookingStartTime.getTime() - now.getTime();
        if (delay <= 0) {
            return false;
        }
        RQueue<Integer> queue = redisson
                .getQueue(PromotionConstants.KeyFormat.KEY_START_PRICE_PROMOTIONID_MONITOR_QUEUE);
        // 使用延迟队列
        RDelayedQueue<Integer> delayedQueue = redisson.getDelayedQueue(queue);
        delayedQueue.offer(promotionId, delay, TimeUnit.MILLISECONDS);
        // 使用后释放掉
        delayedQueue.destroy();
        return true;
    }

提交到队列先使用redisson.getQueue获取到一个队列,再使用redisson.getDelayedQueue获取到一个延迟队列,最后使用delayedQueue.offer提交。再来看看使用队列的地方:

@PostConstruct
public void init() {
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    // 每秒检测一次
    executor.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            RQueue<Integer> queue = redisson
                    .getQueue(PromotionConstants.KeyFormat.KEY_START_PRICE_PROMOTIONID_MONITOR_QUEUE);
            Integer promotionId = queue.poll();
            // 有促销ID,则进行起价更新
            if (promotionId != null) {
                updatePromotionProductStartPrice(promotionId);
            }
        }
    }, 0, 1, TimeUnit.SECONDS);
}

使用redisson.getQueue获取到队列,使用queue.poll()从队列中获取到元素,如果获取成功,则直接更新促销产品起价,否则,一秒后继续检测。

使用redisson的延迟队列时,千万要注意的地方是放入队列是使用的RDelayedQueue,获取队列是使用RQueue而不是RDelayedQueue。有兴趣可参考redisson源码,尤其是下面这个方法内部的实现逻辑:

org.redisson.RedissonDelayedQueue.RedissonDelayedQueue(QueueTransferService, Codec, CommandAsyncExecutor, String)
赞(12)
未经允许不得转载:LoveCTO » 使用redis和redisson延迟队列(Delayed Queue)实现定时功能及其注意事项

热爱技术 追求卓越 精益求精