如何在Java中实现延迟队列DelayQueue

来源:IPIPP.com作者:头衔:全栈工程师
导读:本期聚焦于小伙伴创作的《如何在Java中实现延迟队列DelayQueue》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《如何在Java中实现延迟队列DelayQueue》有用,将其分享出去将是对创作者最好的鼓励。

如何在Java中实现延迟队列DelayQueue

延迟队列是一种特殊的队列结构,它要求队列中的元素只有在到达指定的延迟时间之后,才能被消费者从队列中取出。在Java的并发包中,已经提供了成熟的延迟队列实现,也就是java.util.concurrent.DelayQueue,我们可以直接使用它来满足延迟消费的需求,也可以基于它的原理自定义实现延迟队列的逻辑。

DelayQueue的核心特性

DelayQueue是一个无界阻塞队列,队列中的元素必须实现java.util.concurrent.Delayed接口,该接口要求实现两个核心方法:

  • long getDelay(TimeUnit unit):返回当前元素剩余的延迟时间,当返回值小于等于0时,说明元素已经到达可消费的时间。
  • int compareTo(Delayed o):用于队列内部排序,通常按照剩余延迟时间升序排列,保证最早到期的元素排在队列头部。

DelayQueue的内部是基于优先队列PriorityQueue实现的,所以元素的排序逻辑非常重要,同时它也是线程安全的,支持多线程下的生产者和消费者操作。

使用原生DelayQueue实现延迟任务

下面我们通过一个完整的示例,演示如何使用原生的DelayQueue实现延迟消息的消费。首先我们需要定义一个实现Delayed接口的任务类:

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 延迟任务类,实现Delayed接口
 */
public class DelayTask implements Delayed {
    // 任务内容
    private String taskContent;
    // 任务执行的时间戳(毫秒)
    private long executeTime;

    public DelayTask(String taskContent, long delaySeconds) {
        this.taskContent = taskContent;
        // 计算任务到期时间:当前时间 + 延迟秒数
        this.executeTime = System.currentTimeMillis() + delaySeconds * 1000;
    }

    /**
     * 返回剩余延迟时间,小于等于0表示任务可以执行
     */
    @Override
    public long getDelay(TimeUnit unit) {
        long remaining = executeTime - System.currentTimeMillis();
        return unit.convert(remaining, TimeUnit.MILLISECONDS);
    }

    /**
     * 按照剩余延迟时间升序排序,剩余时间越短优先级越高
     */
    @Override
    public int compareTo(Delayed o) {
        DelayTask other = (DelayTask) o;
        return Long.compare(this.executeTime, other.executeTime);
    }

    public String getTaskContent() {
        return taskContent;
    }
}

接下来我们编写生产者和消费者的逻辑,模拟延迟队列的使用场景:

import java.util.concurrent.DelayQueue;

public class DelayQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 创建延迟队列
        DelayQueue<DelayTask> delayQueue = new DelayQueue<>();

        // 生产者线程:向队列中添加不同延迟时间的任务
        Thread producer = new Thread(() -> {
            delayQueue.put(new DelayTask("延迟3秒执行的任务", 3));
            delayQueue.put(new DelayTask("延迟1秒执行的任务", 1));
            delayQueue.put(new DelayTask("延迟5秒执行的任务", 5));
            System.out.println("所有延迟任务已添加到队列中");
        });

        // 消费者线程:从队列中取出到期的任务执行
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    // take()方法会阻塞,直到有到期的元素可以取出
                    DelayTask task = delayQueue.take();
                    System.out.println("执行任务:" + task.getTaskContent() + ",当前时间:" + System.currentTimeMillis());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });

        // 启动生产者和消费者
        producer.start();
        consumer.start();

        // 等待生产者执行完成
        producer.join();
        // 这里为了演示让消费者运行一段时间,实际场景中可以根据业务需求控制消费者生命周期
        Thread.sleep(10000);
        consumer.interrupt();
    }
}

运行上面的代码,你会看到任务并不是按照添加顺序执行的,而是按照延迟时间从小到大依次执行:首先执行延迟1秒的任务,然后是延迟3秒的任务,最后是延迟5秒的任务,完全符合延迟队列的预期效果。

自定义简单延迟队列实现

如果我们需要更灵活地控制延迟队列的逻辑,也可以基于优先队列和锁机制自己实现一个简单的延迟队列,核心思路是:内部维护一个优先队列存储任务,使用ReentrantLock保证线程安全,通过Condition实现阻塞等待,当最早到期的任务时间到达时唤醒消费者。

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 自定义简单延迟队列实现
 */
public class CustomDelayQueue<T extends Delayed> {
    // 优先队列存储任务,按照到期时间排序
    private final PriorityQueue<T> queue = new PriorityQueue<>((a, b) -> Long.compare(a.getDelay(), b.getDelay()));
    // 可重入锁保证线程安全
    private final ReentrantLock lock = new ReentrantLock();
    // 条件变量,用于阻塞消费者线程
    private final Condition available = lock.newCondition();
    // 标记队列是否关闭
    private volatile boolean isClosed = false;

    /**
     * 添加延迟任务到队列
     */
    public void put(T task) {
        lock.lock();
        try {
            if (isClosed) {
                throw new IllegalStateException("队列已关闭,无法添加任务");
            }
            queue.offer(task);
            // 如果添加的是最早到期的任务,唤醒等待的消费者
            if (queue.peek() == task) {
                available.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 取出到期的任务,没有到期任务则阻塞等待
     */
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (true) {
                if (isClosed && queue.isEmpty()) {
                    return null;
                }
                T task = queue.peek();
                if (task == null) {
                    // 队列为空,等待新任务加入
                    available.await();
                } else {
                    long delay = task.getDelay();
                    if (delay <= 0) {
                        // 任务已到期,取出并返回
                        return queue.poll();
                    } else {
                        // 任务未到期,等待对应时间
                        available.awaitNanos(delay * 1000_000);
                    }
                }
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 关闭队列
     */
    public void close() {
        lock.lock();
        try {
            isClosed = true;
            available.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

/**
 * 自定义延迟任务接口,对应原生Delayed接口
 */
interface Delayed {
    /**
     * 返回剩余延迟时间(毫秒)
     */
    long getDelay();
}

/**
 * 自定义延迟任务实现类
 */
class CustomDelayTask implements Delayed {
    private String content;
    private long executeTime;

    public CustomDelayTask(String content, long delaySeconds) {
        this.content = content;
        this.executeTime = System.currentTimeMillis() + delaySeconds * 1000;
    }

    @Override
    public long getDelay() {
        return executeTime - System.currentTimeMillis();
    }

    public String getContent() {
        return content;
    }
}

上面的自定义实现虽然功能不如原生DelayQueue完善,但是核心逻辑是一致的:通过优先队列排序任务,通过锁和条件变量控制线程的阻塞和唤醒,保证只有到期的任务才能被取出。

延迟队列的常见使用场景

延迟队列在实际开发中有很多实用的场景,比如:

  • 订单超时自动取消:用户下单后未支付,超过30分钟自动取消订单,就可以把订单信息作为延迟任务放入队列,延迟30分钟消费。
  • 定时消息推送:需要在指定时间给用户发送通知、消息,可以把推送任务放入延迟队列,到时间后自动触发推送。
  • 缓存过期清理:缓存中的数据设置了过期时间,到时间后自动清理,避免无效数据占用内存。
  • 重试机制:接口调用失败后,延迟一段时间重试,把重试任务放入延迟队列,到时间后再次执行调用。

需要注意的是,原生的DelayQueue是内存中的队列,如果服务重启会导致队列中的任务丢失,如果需要持久化延迟任务,通常需要结合数据库、Redis等外部存储来实现,保证任务的可靠性。

Java延迟队列DelayQueue使用Delayed接口延迟任务处理并发编程修改时间:2026-05-24 14:17:28

免责声明:已尽一切努力确保本网站所含信息的准确性。网站部分内容来源于网络或由用户自行发表,内容观点不代表本站立场。本站是个人网站免费分享,内容仅供个人学习、研究或参考使用,如内容中引用了第三方作品,其版权归原作者所有。若内容触犯了您的权益,请联系我们进行处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。前端、网络、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握网站开发与运维所需的核心技术栈。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端逻辑,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。