java中延迟任务的处理⽅式
1、利⽤延迟队列
tnc延时队列,第⼀他是个队列,所以具有对列功能第⼆就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到……
应⽤场景⽐较多,⽐如延时1分钟发短信,延时1分钟再次执⾏等,下⾯先看看延时队列demo之后再看延时队列在项⽬中的使⽤:
简单的延时队列要有三部分:第⼀实现了Delayed接⼝的消息体、第⼆消费消息的消费者、第三存放消息的延时队列,那下⾯就来看看延时队列demo。
⼀、消息体
package com.delqueue;
import urrent.Delayed;
import urrent.TimeUnit;
/
**
* 消息体定义实现Delayed接⼝就是实现两个⽅法即compareTo 和 getDelay最重要的就是getDelay⽅法,这个⽅法⽤来判断是否到期…… */
public class Message implements Delayed {
private int id;
private String body; // 消息内容
private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。
public int getId() {
function是什么意思return id;
}
public String getBody() {
return body;
}
public long getExcuteTime() {
return excuteTime;
}
public Message(int id, String body, long delayTime) {
this.id = id;
this.body = body;
}
// ⾃定义实现⽐较⽅法返回 1 0 -1三个参数
@Override
peeasian com
public int compareTo(Delayed delayed) {
Message msg = (Message) delayed;
return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1
: (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
}
// 延迟任务是否到时就是按照这个⽅法判断如果返回的是负数则说明到期否则还没到期
@Override
public long getDelay(TimeUnit unit) {
uteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
⼆、消息消费者
package com.delqueue;
import urrent.DelayQueue;
public class Consumer implements Runnable {复式记帐
// 延时队列 ,消费者从其中获取消息进⾏消费
private DelayQueue<Message> queue;
public Consumer(DelayQueue<Message> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Message take = queue.take();
System.out.println("消费消息id:" + Id() + " 消息体:" + Body());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
rmit三、延时队列
package com.delqueue;
import urrent.DelayQueue;
import urrent.ExecutorService;
import urrent.Executors;
public class DelayQueueTest {
public static void main(String[] args) {
// 创建延时队列
DelayQueue<Message> queue = new DelayQueue<Message>();
// 添加延时消息,m1 延时3s
Message m1 = new Message(1, "world", 3000);
// 添加延时消息,m2 延时10s
Message m2 = new Message(2, "hello", 10000);
//将延时消息放到延时队列中
queue.offer(m2);
queue.offer(m1);
// 启动消费线程消费添加到延时队列中的消息,前提是任务到了延期时间
ExecutorService exec = wFixedThreadPool(1);
exec.shutdown();
}
}
将消息体放⼊延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则⽆法取出消息也就⽆法消费。
这就是延迟队列demo,下⾯我们来说说在真实环境下的使⽤。
使⽤场景描述:
dramatic在打车软件中对订单进⾏派单的流程,当有订单的时候给该订单筛选司机,然后给当订单绑定司机,但是有时运⽓没那么好,订单进来后第⼀次没有筛选到合适的司机,但我们也不能就此结束派单,⽽是将该订单的信息放到延时队列中过个2秒钟在进⾏⼀次,其实这个2秒钟就是⼀个延迟,所以这⾥我们就可以使⽤延时队列来实现……
下⾯看看简单的流程图:
下⾯来看看具体代码实现:
在项⽬中有如下⼏个类:第⼀、任务类第⼆、按照任务类组装的消息体类第三、延迟队列管理类
任务类即执⾏筛选司机、绑单、push消息的任务类
* 具体执⾏相关业务的业务类logon
* @author whd
* @date 2017年9⽉25⽇上午12:49:32
*/
public class DelayOrderWorker implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
//相关业务逻辑处理
System.out.println(Thread.currentThread().getName()+" do something ……");
}
}
消息体类,在延时队列中这个实现了Delayed接⼝的消息类是⽐不可少的,实现接⼝时有⼀个getDela
y(TimeUnit unit)⽅法,这个⽅法就是判断是否到期的这⾥定义的是⼀个泛型类,所以可以将我们上⾯的任务类作为其中的task,这样就将任务类分装成了⼀个消息体
import urrent.Delayed;
import urrent.TimeUnit;
/**
* 延时队列中的消息体将任务封装为消息体
*
* @author whd
* @date 2017年9⽉25⽇上午12:48:30
* @param <T>
*/
public class DelayOrderTask<T extends Runnable> implements Delayed {
private final long time;
private final T task; // 任务类,也就是之前定义的任务类
/**
* @param timeout
* 超时时间(秒)
* @param task
nospot* 任务
*/
public DelayOrderTask(long timeout, T task) {
this.time = System.nanoTime() + timeout;
this.task = task;
}
@Override
public int compareTo(Delayed o) {
// TODO Auto-generated method stub
DelayOrderTask other = (DelayOrderTask) o;
long diff = time - other.time;
if (diff > 0) {
return 1;
} el if (diff < 0) {
return -1;
} el {
return 0;
}
}
@Override
public long getDelay(TimeUnit unit) {
// TODO Auto-generated method stub
vert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int hashCode() {
return task.hashCode();
}
public T getTask() {
return task;
}
}
延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执⾏任务st.delayqueue;
import java.util.Map;
import urrent.DelayQueue;
import urrent.ExecutorService;
import urrent.Executors;
import urrent.TimeUnit;
import urrent.atomic.AtomicLong;
/**
* 延时队列管理类,⽤来添加任务、执⾏任务
*
* @author whd
* @author whd
* @date 2017年9⽉25⽇上午12:44:59
*/
public class DelayOrderQueueManager {
private final static int DEFAULT_THREAD_NUM = 5;
private static int thread_num = DEFAULT_THREAD_NUM;
/
/ 固定⼤⼩线程池
private ExecutorService executor;
// 守护线程
private Thread daemonThread;
// 延时队列
private DelayQueue<DelayOrderTask<?>> delayQueue;
private static final AtomicLong atomic = new AtomicLong(0);
private final long n = 1;
private static DelayOrderQueueManager instance = new DelayOrderQueueManager(); private DelayOrderQueueManager() {
executor = wFixedThreadPool(thread_num);
delayQueue = new DelayQueue<>();
init();
}
public static DelayOrderQueueManager getInstance() {
return instance;
}
/**
* 初始化
*/
public void init() {
daemonThread = new Thread(() -> {
execute();
});
daemonThread.tName("DelayQueueMonitor");
daemonThread.start();
}
private void execute() {
while (true) {
Map<Thread, StackTraceElement[]> map = AllStackTraces();
System.out.println("当前存活线程数量:" + map.size());
int taskNum = delayQueue.size();
System.out.println("当前延时任务数量:" + taskNum);
try {
/
/ 从延时队列中获取任务
DelayOrderTask<?> delayOrderTask = delayQueue.take();
if (delayOrderTask != null) {
Runnable task = Task();
if (null == task) {
continue;
}
// 提交到线程池执⾏task
}
} catch (Exception e) {crv是什么意思
e.printStackTrace();
}
}
}
wave是什么意思
/**
* 添加任务
*
* @param task
* @param time
* 延时时间
* @param unit
* 时间单位
*/
public void put(Runnable task, long time, TimeUnit unit) {
// 获取延时时间
long timeout = vert(time, unit);
// 将任务封装成实现Delayed接⼝的消息体