DelayMessage
延迟(延时)任务处理方案(JDK队列、Redis Zset与RabbitMQ TTL+DLX实现),对比各方案优缺点
Install / Use
/learn @smallFive55/DelayMessageREADME
delayMessage
延迟任务处理方案
方案一:数据库轮询
小型项目常用方式,通过一个线程去扫描数据库或数据库定时任务,通过订单时间判断超时的订单,进行更新状态或其他操作。

方案二:JDK延迟队列
DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中获取元素,放入DelayQueue中的对象需要实现Delayed接口。

实现
com.five.delay.plan.DelayQueuePlan
public class DelayQueuePlan {
public static void main(String[] args) {
DelayQueue<MyDelayed> delayQueue = new DelayQueue<MyDelayed>();
//生产者生产一个5秒的延时任务
new Thread(new ProducerDelay(delayQueue, 5)).start();
//开启消费者轮询
new Thread(new ConsumerDelay(delayQueue)).start();
}
/**
* 延时任务生产者
**/
public static class ProducerDelay implements Runnable{
DelayQueue<MyDelayed> delayQueue;
int delaySecond;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public ProducerDelay(DelayQueue<MyDelayed> delayQueue, int delaySecond){
this.delayQueue = delayQueue;
this.delaySecond = delaySecond;
}
public void run() {
String orderId = "1010101";
for (int i = 0; i < 10; i++) {
//定义一个Delay, 放入到DelayQueue队列中
MyDelayed delay = new MyDelayed(this.delaySecond, orderId+i);
delayQueue.add(delay);//向队列中插入一个元素(延时任务)
System.out.println(sdf.format(new Date())+ " Thread "+Thread.currentThread()+" 添加了一个delay. orderId:"+orderId+i);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 延时任务消费者
**/
public static class ConsumerDelay implements Runnable{
DelayQueue<MyDelayed> delayQueue;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public ConsumerDelay(DelayQueue<MyDelayed> delayQueue){
this.delayQueue = delayQueue;
}
public void run() {
//轮询获取DelayQueue队列中当前超时的Delay元素
while(true){
MyDelayed delayed=null;
try {
delayed = delayQueue.take();
} catch (Exception e) {
e.printStackTrace();
}
//如果Delay元素存在,则任务到达超时时间
if(delayed!=null){
//处理任务
System.out.println(sdf.format(new Date())+" Thread "+Thread.currentThread()+" 消费了一个delay. orderId:"+delayed.getOrderId());
}else{
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("....");
}
}
}
}
}
结果
2018-05-11 16:20:48 Thread Thread[Thread-4,5,main] 添加了一个delay. orderId:10101010
2018-05-11 16:20:48 Thread Thread[Thread-4,5,main] 添加了一个delay. orderId:10101011
2018-05-11 16:20:48 Thread Thread[Thread-4,5,main] 添加了一个delay. orderId:10101012
2018-05-11 16:20:48 Thread Thread[Thread-4,5,main] 添加了一个delay. orderId:10101013
2018-05-11 16:20:48 Thread Thread[Thread-4,5,main] 添加了一个delay. orderId:10101014
2018-05-11 16:20:49 Thread Thread[Thread-4,5,main] 添加了一个delay. orderId:10101015
2018-05-11 16:20:49 Thread Thread[Thread-4,5,main] 添加了一个delay. orderId:10101016
2018-05-11 16:20:49 Thread Thread[Thread-4,5,main] 添加了一个delay. orderId:10101017
2018-05-11 16:20:49 Thread Thread[Thread-4,5,main] 添加了一个delay. orderId:10101018
2018-05-11 16:20:49 Thread Thread[Thread-4,5,main] 添加了一个delay. orderId:10101019
2018-05-11 16:20:53 Thread Thread[Thread-5,5,main] 消费了一个delay. orderId:10101010
2018-05-11 16:20:53 Thread Thread[Thread-5,5,main] 消费了一个delay. orderId:10101011
2018-05-11 16:20:53 Thread Thread[Thread-5,5,main] 消费了一个delay. orderId:10101012
2018-05-11 16:20:53 Thread Thread[Thread-5,5,main] 消费了一个delay. orderId:10101013
2018-05-11 16:20:53 Thread Thread[Thread-5,5,main] 消费了一个delay. orderId:10101014
2018-05-11 16:20:54 Thread Thread[Thread-5,5,main] 消费了一个delay. orderId:10101015
2018-05-11 16:20:54 Thread Thread[Thread-5,5,main] 消费了一个delay. orderId:10101016
2018-05-11 16:20:54 Thread Thread[Thread-5,5,main] 消费了一个delay. orderId:10101017
2018-05-11 16:20:54 Thread Thread[Thread-5,5,main] 消费了一个delay. orderId:10101018
2018-05-11 16:20:54 Thread Thread[Thread-5,5,main] 消费了一个delay. orderId:10101019
方案三:Redis 有序集合
将订单超时时间戳与订单号分别设置为score与member,系统扫描第一个元素判断是否超时。

实现
com.five.delay.utils.RedisUtils
//扫描redis 判断订单是否超时需要处理
public void dofind(String key){
//拿到redis客户端
Jedis jedis = jedisPool.getResource();
while(true){
Set<Tuple> zrangeWithScores = jedis.zrangeWithScores(key, 0, 0);
//判断元素是否超时 根据超时时间戳
if(zrangeWithScores !=null && !zrangeWithScores.isEmpty()){
//score === 订单的超时时间戳 与当前时间戳对比 判断是否超时
double score = ((Tuple)(zrangeWithScores.toArray()[0])).getScore();//订单的超时时间戳
long currentTimeMillis = System.currentTimeMillis();
if(currentTimeMillis>=score){
//订单超时
String element = ((Tuple)(zrangeWithScores.toArray()[0])).getElement();//订单ID
//删除元素
Long zrem = jedis.zrem(key, element); //关键点:redis单线程机制解决并发场景安全问题。
if(zrem!=null && zrem>0){
//处理超时订单
System.out.println(sdf.format(new Date())+"["+Thread.currentThread()+"] 从redis中拿到一个超时任务[key:"+key+", score:"+score+", member:"+element+"]");
}else{
// System.out.println(sdf.format(new Date())+"["+Thread.currentThread()+"] 任务被其他服务消费了");
}
}else{
// System.out.println(sdf.format(new Date())+"["+Thread.currentThread()+"] 当前没有超时的订单");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}else{
// System.out.println("当前redis中没有可以操作的数据");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
结果
生产者
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152621E12, member:OIDNO100000]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152646E12, member:OIDNO100001]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152648E12, member:OIDNO100002]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152649E12, member:OIDNO100003]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.52602615265E12, member:OIDNO100004]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152651E12, member:OIDNO100005]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152652E12, member:OIDNO100006]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152652E12, member:OIDNO100007]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152653E12, member:OIDNO100008]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152654E12, member:OIDNO100009]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152656E12, member:OIDNO100010]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152658E12, member:OIDNO100011]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152659E12, member:OIDNO100012]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.52602615266E12, member:OIDNO100013]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152661E12, member:OIDNO100014]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152661E12, member:OIDNO100015]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152662E12, member:OIDNO100016]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152663E12, member:OIDNO100017]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152664E12, member:OIDNO100018]
2018-05-11 16:08:12 向redis中添加了一个任务[key:ORDER_KEY, score:1.526026152664E12, member:OIDNO100019]
消费者
2018-05-11 16:09:12[Thread[Thread-10,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152648E12, member:OIDNO100002]
2018-05-11 16:09:12[Thread[Thread-7,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152621E12, member:OIDNO100000]
2018-05-11 16:09:12[Thread[Thread-4,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152646E12, member:OIDNO100001]
2018-05-11 16:09:12[Thread[Thread-12,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152649E12, member:OIDNO100003]
2018-05-11 16:09:12[Thread[Thread-10,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.52602615265E12, member:OIDNO100004]
2018-05-11 16:09:12[Thread[Thread-7,5,main]] 任务被其他服务消费了
2018-05-11 16:09:12[Thread[Thread-4,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152651E12, member:OIDNO100005]
2018-05-11 16:09:12[Thread[Thread-12,5,main]] 任务被其他服务消费了
2018-05-11 16:09:12[Thread[Thread-10,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152652E12, member:OIDNO100006]
2018-05-11 16:09:12[Thread[Thread-7,5,main]] 任务被其他服务消费了
2018-05-11 16:09:12[Thread[Thread-4,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152652E12, member:OIDNO100007]
2018-05-11 16:09:12[Thread[Thread-12,5,main]] 任务被其他服务消费了
2018-05-11 16:09:12[Thread[Thread-7,5,main]] 任务被其他服务消费了
2018-05-11 16:09:12[Thread[Thread-10,5,main]] 任务被其他服务消费了
2018-05-11 16:09:12[Thread[Thread-4,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152653E12, member:OIDNO100008]
2018-05-11 16:09:12[Thread[Thread-12,5,main]] 任务被其他服务消费了
2018-05-11 16:09:12[Thread[Thread-10,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152654E12, member:OIDNO100009]
2018-05-11 16:09:12[Thread[Thread-7,5,main]] 任务被其他服务消费了
2018-05-11 16:09:12[Thread[Thread-4,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152656E12, member:OIDNO100010]
2018-05-11 16:09:12[Thread[Thread-12,5,main]] 任务被其他服务消费了
2018-05-11 16:09:12[Thread[Thread-10,5,main]] 任务被其他服务消费了
2018-05-11 16:09:12[Thread[Thread-7,5,main]] 从redis中拿到一个超时任务[key:ORDER_KEY, score:1.526026152658E12, member:OIDNO100011]
2018-05-11 16:09:12[Thread[Thread-6,5
