在高并发业务系统中,不同业务类型的任务往往存在优先级差异,核心变量相关的任务通常需要更高的执行保障,而默认线程池无法区分任务优先级,容易出现非核心任务抢占线程资源,导致核心任务执行带宽不足的问题。通过自定义线程池实现任务分组隔离,可以有效解决这一痛点。

自定义线程池的核心参数配置
实现任务分组隔离的基础是构建独立的自定义线程池实例,每个线程池对应一组特定类型的任务,核心参数需要根据任务特性合理配置。
- 核心线程数:对应核心任务组的线程池需要设置足够的核心线程数,保障核心变量任务的最低执行带宽
- 最大线程数:根据任务峰值流量设置,避免线程数过多导致系统资源耗尽
- 任务队列:核心任务组建议使用有界队列,防止任务堆积过多占用内存
- 拒绝策略:核心任务组的拒绝策略可以选择重试或者降级处理,非核心任务组可以选择直接丢弃
以下是自定义线程池的基础配置代码示例:
import java.util.concurrent.*;
public class CustomThreadPoolBuilder {
// 构建核心任务组线程池
public static ThreadPoolExecutor buildCoreThreadPool() {
// 核心线程数设置为8,保障核心变量任务的基础执行带宽
int corePoolSize = 8;
// 最大线程数设置为16,应对核心任务的流量峰值
int maximumPoolSize = 16;
// 空闲线程存活时间30秒
long keepAliveTime = 30L;
TimeUnit unit = TimeUnit.SECONDS;
// 使用容量为100的有界队列,防止任务过度堆积
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
// 自定义线程工厂,方便后续排查问题
ThreadFactory threadFactory = new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "core-task-thread-" + count++);
}
};
// 核心任务拒绝策略:重试三次后记录日志
RejectedExecutionHandler rejectedHandler = (r, executor) -> {
int retryCount = 0;
while (retryCount < 3) {
if (executor.getQueue().offer(r)) {
return;
}
retryCount++;
}
System.err.println("核心变量任务提交失败,已重试3次");
};
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
rejectedHandler
);
}
// 构建非核心任务组线程池
public static ThreadPoolExecutor buildNonCoreThreadPool() {
int corePoolSize = 4;
int maximumPoolSize = 8;
long keepAliveTime = 60L;
TimeUnit unit = TimeUnit.SECONDS;
// 非核心任务使用更大的队列,允许一定堆积
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(200);
ThreadFactory threadFactory = new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "non-core-task-thread-" + count++);
}
};
// 非核心任务拒绝策略:直接丢弃并打印日志
RejectedExecutionHandler rejectedHandler = (r, executor) -> {
System.err.println("非核心任务被拒绝,已丢弃");
};
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
rejectedHandler
);
}
}
任务分组隔离的实现逻辑
任务分组隔离的核心是让不同类型的任务提交到对应的自定义线程池,避免跨组资源抢占,同时需要建立任务提交的统一入口,规范任务分组规则。
任务类型定义
首先定义任务类型枚举,明确区分核心变量任务和非核心任务:
public enum TaskType {
// 核心变量相关任务,需要保障执行带宽
CORE_VARIABLE_TASK,
// 非核心任务,允许被延迟或丢弃
NON_CORE_TASK
}
任务提交管理器
构建任务提交管理器,根据任务类型自动路由到对应的线程池:
public class TaskSubmitManager {
// 核心任务线程池实例
private static final ThreadPoolExecutor CORE_POOL = CustomThreadPoolBuilder.buildCoreThreadPool();
// 非核心任务线程池实例
private static final ThreadPoolExecutor NON_CORE_POOL = CustomThreadPoolBuilder.buildNonCoreThreadPool();
// 提交任务的方法,根据任务类型选择对应的线程池
public static void submitTask(Runnable task, TaskType taskType) {
if (taskType == TaskType.CORE_VARIABLE_TASK) {
CORE_POOL.execute(task);
} else {
NON_CORE_POOL.execute(task);
}
}
// 提交带返回值的任务
public static <T> Future<T> submitCallable(Callable<T> task, TaskType taskType) {
if (taskType == TaskType.CORE_VARIABLE_TASK) {
return CORE_POOL.submit(task);
} else {
return NON_CORE_POOL.submit(task);
}
}
}
核心变量任务示例
以下是核心变量任务的具体实现,提交时指定任务类型为核心类型:
public class CoreVariableTask implements Runnable {
private String variableName;
public CoreVariableTask(String variableName) {
this.variableName = variableName;
}
@Override
public void run() {
// 模拟核心变量更新操作
System.out.println("执行核心变量" + variableName + "的更新任务,线程名:" + Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
提交核心变量任务的代码:
public class TaskTest {
public static void main(String[] args) {
// 提交核心变量任务,自动路由到核心线程池
for (int i = 0; i < 10; i++) {
TaskSubmitManager.submitTask(new CoreVariableTask("user_count"), TaskType.CORE_VARIABLE_TASK);
}
// 提交非核心任务
for (int i = 0; i < 10; i++) {
TaskSubmitManager.submitTask(() -> {
System.out.println("执行非核心日志任务,线程名:" + Thread.currentThread().getName());
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, TaskType.NON_CORE_TASK);
}
}
}
核心任务执行带宽保障与监控
完成分组隔离后,还需要通过监控和动态调整机制,进一步保障核心变量任务的执行带宽。
线程池状态监控
定期采集核心线程池的运行状态,包括活跃线程数、队列剩余容量、已完成任务数等指标:
import java.util.concurrent.ThreadPoolExecutor;
public class ThreadPoolMonitor {
public static void monitorCorePool() {
ThreadPoolExecutor corePool = CustomThreadPoolBuilder.buildCoreThreadPool();
// 每10秒打印一次核心线程池状态
ScheduledExecutorService monitorPool = Executors.newSingleThreadScheduledExecutor();
monitorPool.scheduleAtFixedRate(() -> {
System.out.println("核心线程池状态:");
System.out.println("核心线程数:" + corePool.getCorePoolSize());
System.out.println("活跃线程数:" + corePool.getActiveCount());
System.out.println("队列剩余容量:" + corePool.getQueue().remainingCapacity());
System.out.println("已完成任务数:" + corePool.getCompletedTaskCount());
}, 0, 10, TimeUnit.SECONDS);
}
}
动态参数调整
当监控发现核心线程池负载过高时,可以动态调整核心线程数,临时提升核心任务的执行带宽:
public class ThreadPoolDynamicAdjust {
// 当核心线程池活跃线程数超过阈值时,动态增加核心线程数
public static void adjustCorePool(ThreadPoolExecutor corePool) {
int activeCount = corePool.getActiveCount();
int corePoolSize = corePool.getCorePoolSize();
// 如果活跃线程数达到核心线程数的80%,且当前核心线程数小于最大线程数,则扩容
if (activeCount >= corePoolSize * 0.8 && corePoolSize < corePool.getMaximumPoolSize()) {
corePool.setCorePoolSize(corePoolSize + 2);
System.out.println("核心线程池扩容,新的核心线程数:" + corePool.getCorePoolSize());
}
// 如果活跃线程数低于核心线程数的30%,则缩容
if (activeCount <= corePoolSize * 0.3 && corePoolSize > 4) {
corePool.setCorePoolSize(corePoolSize - 2);
System.out.println("核心线程池缩容,新的核心线程数:" + corePool.getCorePoolSize());
}
}
}
注意事项
- 自定义线程池使用完成后需要及时调用shutdown方法关闭,避免线程泄漏
- 核心任务组的队列容量需要根据业务峰值合理设置,过小会导致任务被拒绝,过大可能导致内存溢出
- 线程工厂需要自定义线程名称,方便日志排查和问题定位
- 拒绝策略需要根据业务特性选择,核心任务不建议使用直接丢弃的策略
通过自定义线程池实现任务分组隔离,能够从资源层面保障核心变量任务的执行带宽,避免非核心任务对核心业务的干扰,提升系统的稳定性和可靠性。实际落地时需要根据业务的实际流量和任务特性调整线程池参数,同时配合监控机制动态优化,才能达到最佳的效果。