如何在Java中实现延迟队列DelayQueue
延迟队列是一种特殊的队列结构,它要求队列中的元素只有在到达指定的延迟时间之后,才能被消费者从队列中取出。在Java的并发包中,已经提供了成熟的延迟队列实现,也就是java.util.concurrent.DelayQueue,我们可以直接使用它来满足延迟消费的需求,也可以基于它的原理自定义实现延迟队列的逻辑。
DelayQueue的核心特性
DelayQueue是一个无界阻塞队列,队列中的元素必须实现java.util.concurrent.Delayed接口,该接口要求实现两个核心方法:
long getDelay(TimeUnit unit):返回当前元素剩余的延迟时间,当返回值小于等于0时,说明元素已经到达可消费的时间。int compareTo(Delayed o):用于队列内部排序,通常按照剩余延迟时间升序排列,保证最早到期的元素排在队列头部。
DelayQueue的内部是基于优先队列PriorityQueue实现的,所以元素的排序逻辑非常重要,同时它也是线程安全的,支持多线程下的生产者和消费者操作。
使用原生DelayQueue实现延迟任务
下面我们通过一个完整的示例,演示如何使用原生的DelayQueue实现延迟消息的消费。首先我们需要定义一个实现Delayed接口的任务类:
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延迟任务类,实现Delayed接口
*/
public class DelayTask implements Delayed {
// 任务内容
private String taskContent;
// 任务执行的时间戳(毫秒)
private long executeTime;
public DelayTask(String taskContent, long delaySeconds) {
this.taskContent = taskContent;
// 计算任务到期时间:当前时间 + 延迟秒数
this.executeTime = System.currentTimeMillis() + delaySeconds * 1000;
}
/**
* 返回剩余延迟时间,小于等于0表示任务可以执行
*/
@Override
public long getDelay(TimeUnit unit) {
long remaining = executeTime - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}
/**
* 按照剩余延迟时间升序排序,剩余时间越短优先级越高
*/
@Override
public int compareTo(Delayed o) {
DelayTask other = (DelayTask) o;
return Long.compare(this.executeTime, other.executeTime);
}
public String getTaskContent() {
return taskContent;
}
}接下来我们编写生产者和消费者的逻辑,模拟延迟队列的使用场景:
import java.util.concurrent.DelayQueue;
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 创建延迟队列
DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
// 生产者线程:向队列中添加不同延迟时间的任务
Thread producer = new Thread(() -> {
delayQueue.put(new DelayTask("延迟3秒执行的任务", 3));
delayQueue.put(new DelayTask("延迟1秒执行的任务", 1));
delayQueue.put(new DelayTask("延迟5秒执行的任务", 5));
System.out.println("所有延迟任务已添加到队列中");
});
// 消费者线程:从队列中取出到期的任务执行
Thread consumer = new Thread(() -> {
while (true) {
try {
// take()方法会阻塞,直到有到期的元素可以取出
DelayTask task = delayQueue.take();
System.out.println("执行任务:" + task.getTaskContent() + ",当前时间:" + System.currentTimeMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
// 启动生产者和消费者
producer.start();
consumer.start();
// 等待生产者执行完成
producer.join();
// 这里为了演示让消费者运行一段时间,实际场景中可以根据业务需求控制消费者生命周期
Thread.sleep(10000);
consumer.interrupt();
}
}运行上面的代码,你会看到任务并不是按照添加顺序执行的,而是按照延迟时间从小到大依次执行:首先执行延迟1秒的任务,然后是延迟3秒的任务,最后是延迟5秒的任务,完全符合延迟队列的预期效果。
自定义简单延迟队列实现
如果我们需要更灵活地控制延迟队列的逻辑,也可以基于优先队列和锁机制自己实现一个简单的延迟队列,核心思路是:内部维护一个优先队列存储任务,使用ReentrantLock保证线程安全,通过Condition实现阻塞等待,当最早到期的任务时间到达时唤醒消费者。
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 自定义简单延迟队列实现
*/
public class CustomDelayQueue<T extends Delayed> {
// 优先队列存储任务,按照到期时间排序
private final PriorityQueue<T> queue = new PriorityQueue<>((a, b) -> Long.compare(a.getDelay(), b.getDelay()));
// 可重入锁保证线程安全
private final ReentrantLock lock = new ReentrantLock();
// 条件变量,用于阻塞消费者线程
private final Condition available = lock.newCondition();
// 标记队列是否关闭
private volatile boolean isClosed = false;
/**
* 添加延迟任务到队列
*/
public void put(T task) {
lock.lock();
try {
if (isClosed) {
throw new IllegalStateException("队列已关闭,无法添加任务");
}
queue.offer(task);
// 如果添加的是最早到期的任务,唤醒等待的消费者
if (queue.peek() == task) {
available.signal();
}
} finally {
lock.unlock();
}
}
/**
* 取出到期的任务,没有到期任务则阻塞等待
*/
public T take() throws InterruptedException {
lock.lock();
try {
while (true) {
if (isClosed && queue.isEmpty()) {
return null;
}
T task = queue.peek();
if (task == null) {
// 队列为空,等待新任务加入
available.await();
} else {
long delay = task.getDelay();
if (delay <= 0) {
// 任务已到期,取出并返回
return queue.poll();
} else {
// 任务未到期,等待对应时间
available.awaitNanos(delay * 1000_000);
}
}
}
} finally {
lock.unlock();
}
}
/**
* 关闭队列
*/
public void close() {
lock.lock();
try {
isClosed = true;
available.signalAll();
} finally {
lock.unlock();
}
}
}
/**
* 自定义延迟任务接口,对应原生Delayed接口
*/
interface Delayed {
/**
* 返回剩余延迟时间(毫秒)
*/
long getDelay();
}
/**
* 自定义延迟任务实现类
*/
class CustomDelayTask implements Delayed {
private String content;
private long executeTime;
public CustomDelayTask(String content, long delaySeconds) {
this.content = content;
this.executeTime = System.currentTimeMillis() + delaySeconds * 1000;
}
@Override
public long getDelay() {
return executeTime - System.currentTimeMillis();
}
public String getContent() {
return content;
}
}上面的自定义实现虽然功能不如原生DelayQueue完善,但是核心逻辑是一致的:通过优先队列排序任务,通过锁和条件变量控制线程的阻塞和唤醒,保证只有到期的任务才能被取出。
延迟队列的常见使用场景
延迟队列在实际开发中有很多实用的场景,比如:
- 订单超时自动取消:用户下单后未支付,超过30分钟自动取消订单,就可以把订单信息作为延迟任务放入队列,延迟30分钟消费。
- 定时消息推送:需要在指定时间给用户发送通知、消息,可以把推送任务放入延迟队列,到时间后自动触发推送。
- 缓存过期清理:缓存中的数据设置了过期时间,到时间后自动清理,避免无效数据占用内存。
- 重试机制:接口调用失败后,延迟一段时间重试,把重试任务放入延迟队列,到时间后再次执行调用。
需要注意的是,原生的DelayQueue是内存中的队列,如果服务重启会导致队列中的任务丢失,如果需要持久化延迟任务,通常需要结合数据库、Redis等外部存储来实现,保证任务的可靠性。
Java延迟队列DelayQueue使用Delayed接口延迟任务处理并发编程修改时间:2026-05-24 14:17:28