热爱技术,追求卓越
不断求索,精益求精

java中关于”事件驱动型延时处理“业务需求的几种程序设计方案

最近在做一个利用微信公众号引流的活动,其中有一个需求点是当系统收到用户回复的任意关键词两分钟后,系统会给用户发送一张带有二维码的图片,用户长按识别二维码会跳转到活动的落地页。这是一个典型的“事件驱动型延时处理”需求,触发条件时收到用户消息,延时2分钟处理。对于这么一个常见的业务场景,对于java开发者来说,你会怎样设计你的功能呢?

java功底薄弱的程序员的致命错误

在谈具体方案之前,先来一个事故小插曲,我们先看看一位经验不是很丰富的java程序员设计这个功能的部分代码:

<!-- spring线程池配置 -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="threadNamePrefix" value="TASK"/>
    <property name="queueCapacity" value="255" />
    <property name="corePoolSize" value="10" />
    <property name="maxPoolSize" value="100" />
    <property name="keepAliveSeconds" value="3600" />
    <property name="waitForTasksToCompleteOnShutdown" value="true"/>
</bean>

//注入spring线程池任务执行器
@Autowired
@Qualifier("taskExecutor")
private ThreadPoolTaskExecutor executor;

/**
 * 异步延迟发送图片
 */
public void asyncDelaySendImage(final WxGZHReceiveMsg receiveMsg) {
    executor.submit(new Runnable() {
        @Override
        public void run() {
            // 延时两分钟执行
            try {
                Thread.sleep(2 * 60 * 1000);
            } catch (InterruptedException e) {
                LOG.error("发送图片延时执行失败");
            }
            //根据收到的消息内容发送图片
            sendImage(receiveMsg);
        }
    });
}

这个代码上线后,运营人员开启了一波活动,立即有几千个粉丝回复了消息,这个时候我的系统大量的报类似下面的错误:

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6d5b9673[Running, pool size = 100, active threads = 100, queued tasks = 255, completed tasks = 402]] did not accept task

从这个报错我们看出,当并发量上来后,executor.submit提交失败了,原因是线程池队列已经满了。我们在分析上面的代码,这个代码有两个致命的错误:

  1. 接受到一个用户的回复executor.submit一次,如果并发量超过线程池队列大小,后面的提交都会失败。
  2. submit任务后,run方法里居然有个sleep,这直接让线程池里的线程休眠了,线程不能空闲出来被其他任务使用,干等2分钟。

这样的代码会很快耗尽系统资源,大大降低系统的吞吐率,对于java程序员来说,犯这样的错误其实是很低级的,也是致命的,接下来我们来看有什么方案可以解决。

DelayQueue实现事件驱动延时处理

如果上面的这位程序员有了解过jdk的源码,有了解过DelayQueue的原理,我想他不会犯那么低级的错误。java.util.concurrent.DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。

我们现在使用DelayQueue简单实现以下这个功能。我们针对上面出现的WxGZHReceiveMsg类,实现Delayed接口,并实现getDelay和compareTo方法,具体看注释:

package cn.lovecto.test.delay;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 微信公众收到的消息类
 */
public class WxGZHReceiveMsg implements Delayed{
    /**用户ID*/
    private Integer userId;
    /**用户回复的关键字*/
    private String keyWord;
    /**执行时间,单位ms*/
    public Long executeTime;

    public WxGZHReceiveMsg(Integer userId, String keyWord, Long executeTime) {
        super();
        this.userId = userId;
        this.keyWord = keyWord;
        this.executeTime = executeTime;
    }

    /**
     * compareTo方法的作用即是判断队列中元素的顺序谁前谁后。当前元素比队列元素后执行时,返回一个正数,比它先执行时返回一个负数,否则返回0.
     */
    @Override
    public int compareTo(Delayed o) {
        if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
            return 1;
        }else if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
            return -1;
        }
        return 0;
    }

    /**
     * getDelay的作用是计算当前时间到执行时间之间还有多少毫秒
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getKeyWord() {
        return keyWord;
    }

    public void setKeyWord(String keyWord) {
        this.keyWord = keyWord;
    }

    public Long getExecuteTime() {
        return executeTime;
    }

    public void setExecuteTime(Long executeTime) {
        this.executeTime = executeTime;
    }
}

接下来我们模拟测试一下,新建一个ReplayWxGZHReceiveMsgExecutor任务执行类,使用java.util.concurrent.ScheduledExecutorService开启一个线程每秒扫描一次队列中是否有需要回复的消息,只要队列中有需要回复的消息则进行回复:

package cn.lovecto.test.delay;

import java.util.TimerTask;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ReplayWxGZHReceiveMsgExecutor {
    /** 接收到的微信公众号消息队列 */
    private static DelayQueue<WxGZHReceiveMsg> queue = new DelayQueue<>();

    /** 类初始化后就每秒检测队列进行任务处理 */
    static {
        ScheduledExecutorService executorService = Executors
                .newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                while (!queue.isEmpty()) {
                    try {
                        WxGZHReceiveMsg msg = queue.take();
                        System.out.println(String.format(
                                "当前时间:%d,计划执行时间%d,准备给用户%d回复的\"%s\"回复图片",
                                System.currentTimeMillis(),
                                msg.getExecuteTime(), msg.getUserId(),
                                msg.getKeyWord()));
                        // TODO 发送逻辑,此处略
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, 0, 1000, TimeUnit.MILLISECONDS);
    }

    /**
     * 添加到队列
     * 
     * @param msg
     */
    public static void submit(WxGZHReceiveMsg msg) {
        ReplayWxGZHReceiveMsgExecutor.queue.add(msg);
    }

    public static void main(String[] args) throws InterruptedException {
        long delay = 5000;// 延迟5秒执行
        for (int i = 1; i <= 10; i++) {
            WxGZHReceiveMsg msg = new WxGZHReceiveMsg(i, "关键词" + i,
                    System.currentTimeMillis() + delay);
            ReplayWxGZHReceiveMsgExecutor.submit(msg);
            //这里sleep是为了模拟活动的后半段,用户参与度有所下降
            if(i > 5){
                Thread.sleep(2000);
            }
        }
    }

}

运行main方法测试一下,对比下计划执行时间和真正的执行时间,相差在1毫秒左右,基本能满足功能性需求。测试结果如下:

当前时间:1534595607657,计划执行时间1534595607656,准备给用户1回复的"关键词1"回复图片
当前时间:1534595607673,计划执行时间1534595607656,准备给用户2回复的"关键词2"回复图片
当前时间:1534595607674,计划执行时间1534595607656,准备给用户3回复的"关键词3"回复图片
当前时间:1534595607674,计划执行时间1534595607657,准备给用户4回复的"关键词4"回复图片
当前时间:1534595607674,计划执行时间1534595607657,准备给用户5回复的"关键词5"回复图片
当前时间:1534595607674,计划执行时间1534595607657,准备给用户6回复的"关键词6"回复图片
当前时间:1534595609658,计划执行时间1534595609657,准备给用户7回复的"关键词7"回复图片
当前时间:1534595611658,计划执行时间1534595611657,准备给用户8回复的"关键词8"回复图片
当前时间:1534595613658,计划执行时间1534595613657,准备给用户9回复的"关键词9"回复图片
当前时间:1534595615658,计划执行时间1534595615657,准备给用户10回复的"关键词10"回复图片

使用DelayQueue能够“满足事件驱动延时处理”类需求,但要考虑单jvm内存大小的问题,如果并发量太高,消息队列将会较庞大,遍历一次耗时也会较长。此时可以考虑增加机器部署多个实例,实现集群,每个jvm还可以使用多个队列和线程池对任务进行处理从而提高回复效率,提升整个系统的性能。

使用mq实现事件驱动延时处理

使用DelayQueue的优点是JDK自身实现,使用方便,量小特别适用,但是整个队列处于jvm内存中,内容不能持久化,如果没有负载均衡机制,就不能支持分布式运行。如果你要考虑消息的持久化,那么mq是一个不错的选择,比如Rocketmq的延时队列,有兴趣可以参考官方实例:

http://rocketmq.apache.org/docs/schedule-example/

除了rocketmq,Rabbitmq延时队列(TTL+DLX实现)也是一个不错的选择。

使用redis缓存的zset实现事件驱动延时处理

redis缓存的zset是一个有序集合,根据score值进行排序。对于本文的例子,我们可以把score值设置为上面WxGZHReceiveMsg中的executeTime,java代码中使用定时任务扫描有序集合中在一定时间范围内(score值)的元素进行回复操作(使用ZRANGEBYSCORE命令),对已经处理的消息进行rem操作(ZREMRANGEBYSCORE )。如果并发量高,可以使用多个有序集合,采用线程池对集合进行扫描。如果应用是集群部署,要考虑好队列竞争的问题,可以使用分布式锁(此处可以对已经扫描的最小的时间值minScore加锁,扫描一次迭代一次,每次扫描都是当前时间到上一次的minScore值之间的一个范围)。基于redis的分布式锁可参考《springboot使用redisson作为分布式锁的一种实现方式》。

另外redis常用命令可参考:

http://doc.redisfans.com/index.html

总结

总结一下,java中关于”事件驱动型延时处理“业务需程序设计方案,本文列举了三种,第一种设计基于jdk的DelayQueue(java开发人员必须要会);第二种是基于mq的持久化消息队列,如使用Rocketmq、Rabbitmq等,适用于分布式系统;第三种是基于缓存的持久化队列,适用于分布式,但要考虑好集群中多个实例对同一个队列读取时的多次消费问题。总之提高系统吞吐率和性能无非就是多线程、集群、分布式、异步消息队列、缓存,根据实际使用情况选型即可。

赞(2)
未经允许不得转载:LoveCTO » java中关于”事件驱动型延时处理“业务需求的几种程序设计方案

热爱技术 追求卓越 精益求精