在高并发业务场景中,我们常常需要同时发起多个请求处理任务,比如批量查询数据、并行调用多个接口等。当部分请求已经返回满足业务需求的结果时,剩余还在运行的线程就属于多余资源,如果继续运行不仅会浪费CPU和内存,还可能因为后续无用的操作引发数据不一致等问题。基于动态终止信号的分批并发请求策略,就是解决这类问题的有效方案。

核心设计思路
该策略的核心由三个部分组成:动态终止信号、分批请求调度、线程安全终止机制。动态终止信号是一个可以被所有工作线程共享访问的标识,当业务条件满足时,外部可以修改这个标识的状态,通知所有线程停止后续工作。分批请求调度则是将总的请求任务拆分成多个批次,每个批次分配固定数量的线程执行,避免一次性创建过多线程导致系统负载过高。线程安全终止机制保证线程在检查到终止信号后,能够安全地释放自身占用的资源并退出。
动态终止信号实现
动态终止信号需要保证多线程环境下的可见性和原子性,在Java中可以使用AtomicBoolean来实现,它的底层基于CAS操作,能够避免加锁带来的性能损耗,同时保证修改后的值对所有线程立即可见。
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 动态终止信号管理器
*/
public class TerminateSignalManager {
// 终止信号,默认未触发
private final AtomicBoolean terminateSignal = new AtomicBoolean(false);
/**
* 触发终止信号
*/
public void triggerTerminate() {
terminateSignal.set(true);
}
/**
* 检查是否触发终止
* @return 是否触发终止
*/
public boolean isTerminated() {
return terminateSignal.get();
}
/**
* 重置终止信号,用于复用场景
*/
public void reset() {
terminateSignal.set(false);
}
}
分批请求调度逻辑
分批调度需要先计算总批次数量,然后依次启动每个批次的线程,每个批次的线程执行完成后,再检查终止信号状态,如果已经触发终止,则不再启动后续批次的线程。同时每个批次内的线程执行时,也需要定期检查终止信号,避免已经不需要的任务继续执行。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 分批并发请求调度器
*/
public class BatchRequestScheduler {
// 线程池,核心线程数为每批次最大线程数
private final ExecutorService threadPool;
// 每批次最大线程数
private final int batchSize;
// 动态终止信号管理器
private final TerminateSignalManager signalManager;
public BatchRequestScheduler(int batchSize, TerminateSignalManager signalManager) {
this.batchSize = batchSize;
this.signalManager = signalManager;
this.threadPool = Executors.newFixedThreadPool(batchSize);
}
/**
* 执行分批请求
* @param totalTasks 总任务数
* @param taskHandler 任务处理逻辑
*/
public void executeBatchRequests(int totalTasks, RequestTaskHandler taskHandler) {
// 计算总批次数
int totalBatches = (totalTasks + batchSize - 1) / batchSize;
for (int batchIndex = 0; batchIndex < totalBatches; batchIndex++) {
// 检查终止信号,如果已触发则不再启动后续批次
if (signalManager.isTerminated()) {
break;
}
// 当前批次的任务数量
int currentBatchTaskCount = Math.min(batchSize, totalTasks - batchIndex * batchSize);
List<Runnable> batchTasks = new ArrayList<>();
CountDownLatch batchLatch = new CountDownLatch(currentBatchTaskCount);
for (int i = 0; i < currentBatchTaskCount; i++) {
int taskId = batchIndex * batchSize + i;
batchTasks.add(() -> {
try {
// 执行任务前再次检查终止信号
if (!signalManager.isTerminated()) {
taskHandler.handleTask(taskId);
}
} finally {
batchLatch.countDown();
}
});
}
// 提交当前批次的所有任务
for (Runnable task : batchTasks) {
threadPool.submit(task);
}
try {
// 等待当前批次所有任务执行完成
batchLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
signalManager.triggerTerminate();
}
}
threadPool.shutdown();
}
/**
* 任务处理接口
*/
public interface RequestTaskHandler {
/**
* 处理单个任务
* @param taskId 任务ID
*/
void handleTask(int taskId);
}
}
完整使用示例
下面通过一个批量查询用户数据的场景来演示整个策略的使用流程,当查询到足够数量的用户数据时,就触发终止信号,停止后续的多余线程。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class MainDemo {
// 需要查询的用户ID总数
private static final int TOTAL_USER_IDS = 100;
// 每批次执行的线程数
private static final int BATCH_SIZE = 10;
// 需要查询到的目标用户数量
private static final int TARGET_USER_COUNT = 30;
// 终止信号管理器
private static final TerminateSignalManager SIGNAL_MANAGER = new TerminateSignalManager();
// 已查询到的用户列表
private static final List<String> RESULT_USERS = new ArrayList<>();
// 已查询到的用户计数
private static final AtomicInteger QUERY_COUNT = new AtomicInteger(0);
public static void main(String[] args) {
BatchRequestScheduler scheduler = new BatchRequestScheduler(BATCH_SIZE, SIGNAL_MANAGER);
// 执行分批请求
scheduler.executeBatchRequests(TOTAL_USER_IDS, taskId -> {
// 模拟查询用户数据的耗时操作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 模拟查询到用户的情况
int currentCount = QUERY_COUNT.incrementAndGet();
RESULT_USERS.add("用户_" + taskId);
System.out.println("查询到用户_" + taskId + ",当前已查询数量:" + currentCount);
// 当达到目标数量时触发终止信号
if (currentCount >= TARGET_USER_COUNT) {
SIGNAL_MANAGER.triggerTerminate();
System.out.println("已达到目标用户数量,触发终止信号");
}
});
System.out.println("最终查询到的用户数量:" + RESULT_USERS.size());
}
}
注意事项
- 动态终止信号的状态修改需要保证线程安全,避免使用普通的布尔变量,防止出现可见性问题导致线程无法及时感知到终止信号。
- 线程在执行任务的过程中,需要在合适的节点检查终止信号,比如任务开始前、耗时操作前,避免已经无用的任务继续执行消耗资源。
- 如果任务中存在不可中断的阻塞操作,需要在设计时就考虑好终止的处理逻辑,比如设置超时时间,避免线程无法响应终止信号一直阻塞。
- 终止信号触发后,已经启动的线程需要等待当前任务执行完成或者安全中断,不要强制杀死线程,防止出现资源未释放的问题。
适用场景
该策略非常适合以下场景:批量数据查询时只需要部分结果即可、并行调用多个接口只要任意一个返回成功就不需要其他结果、需要限制并发数量同时支持动态停止多余任务的场景。相比传统的线程终止方式,它更加灵活可控,不会破坏线程的执行完整性,同时能够有效降低系统资源的无效消耗。