Java并发-线程池深度解析

线程池深度解析

线程池是高并发系统的核心组件。理解其原理和参数调优至关重要。

一、为什么需要线程池?

1.1 线程的开销

创建一个线程的代价是昂贵的:

开销类型说明
内存分配每个线程需要分配约 1MB 栈内存
系统调用创建/销毁涉及用户态到内核态切换
上下文切换线程过多导致频繁切换,约 1-10 微秒/次
GC 压力线程对象创建销毁增加 GC 负担

1.2 线程池的优势

  • 降低资源消耗:复用线程,避免频繁创建销毁
  • 提高响应速度:任务到达时无需等待线程创建
  • 提高可管理性:统一分配、调优和监控
  • 提供更多功能:定时执行、周期执行、并发数控制

二、ThreadPoolExecutor 核心参数

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 工作队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)

2.1 参数详解

参数说明建议
corePoolSize核心线程数,即使空闲也不会被回收CPU 密集型:N+1;IO 密集型:2N
maximumPoolSize最大线程数,队列满后才会创建根据系统资源和任务特性设置
keepAliveTime非核心线程的空闲存活时间通常 60 秒
workQueue存放待执行任务的阻塞队列有界队列更安全
threadFactory创建线程的工厂,可自定义线程名便于问题排查
handler队列满且线程数达到最大时的拒绝策略根据业务选择

2.2 工作队列类型

队列类型特点适用场景
ArrayBlockingQueue有界数组队列,FIFO防止资源耗尽
LinkedBlockingQueue可选有界链表队列吞吐量高于数组队列
SynchronousQueue不存储元素,直接传递需要无限最大线程数
PriorityBlockingQueue优先级队列需要任务优先级
DelayQueue延迟队列定时任务

2.3 拒绝策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 1. AbortPolicy(默认):抛出 RejectedExecutionException
new ThreadPoolExecutor.AbortPolicy();

// 2. CallerRunsPolicy:由提交任务的线程执行
new ThreadPoolExecutor.CallerRunsPolicy();

// 3. DiscardPolicy:静默丢弃任务
new ThreadPoolExecutor.DiscardPolicy();

// 4. DiscardOldestPolicy:丢弃队列中最老的任务
new ThreadPoolExecutor.DiscardOldestPolicy();

// 5. 自定义策略
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志、持久化、重试等
log.warn("任务被拒绝: {}", r.toString());
}
};

三、线程池执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
             提交任务
|
v
+-------------------------------+
| corePoolSize 是否已满? |
+---------------+---------------+
| |
否| |是
v v
+-------+-------+ +---+---+
| 创建核心线程 | | 队列满?|
| 执行任务 | +---+---+
+---------------+ |
否| |是
v v
+--------+ +---------------+
| 入队列 | | 达到最大线程? |
+--------+ +-------+-------+
|
否| |是
v v
+---------+ +----------+
|创建非核心| | 拒绝策略 |
|线程执行 | +----------+
+---------+

3.1 源码解析:execute 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();

// 1. 当前线程数 < 核心线程数,创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// 2. 核心线程已满,尝试入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查:线程池可能在入队后关闭
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 队列满,尝试创建非核心线程
else if (!addWorker(command, false))
// 4. 达到最大线程数,执行拒绝策略
reject(command);
}

四、线程的生命周期

Java 线程在运行过程中有 6 种状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
                  +-------+
| NEW | 线程创建,未调用 start()
+---+---+
|
v start()
+---+---+
+------>|RUNNABLE|<------+
| +---+---+ |
| | |
notify() | | | notify()
notifyAll | synchronized | interrupt()
超时结束 | Lock.lock() | 超时结束
| | |
| v |
+-------+---+ +---+---+ +----+------+
| WAITING | |BLOCKED| |TIMED_WAIT |
+-----------+ +-------+ +-----------+
wait() 等待锁 sleep(n)
join() wait(n)
park() join(n)

|
v run() 结束
+---+---+
|TERMINATED|
+----------+
状态说明触发条件
NEW初始状态new Thread()
RUNNABLE可运行(含运行中和就绪)start()
BLOCKED阻塞,等待锁synchronized 获取锁失败
WAITING无限等待wait()join()LockSupport.park()
TIMED_WAITING超时等待sleep(n)wait(n)join(n)
TERMINATED终止run() 执行完毕或异常退出

五、创建线程池的方式

5.1 Executors 工厂方法(不推荐)

1
2
3
4
5
6
7
8
9
10
11
// 固定大小线程池(队列无界,可能 OOM)
ExecutorService fixed = Executors.newFixedThreadPool(10);

// 缓存线程池(最大线程无界,可能创建过多线程)
ExecutorService cached = Executors.newCachedThreadPool();

// 单线程池(队列无界,可能 OOM)
ExecutorService single = Executors.newSingleThreadExecutor();

// 定时任务线程池
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(5);

阿里巴巴 Java 开发手册明确禁止使用 Executors 创建线程池,原因:

  • FixedThreadPoolSingleThreadPool 使用无界队列,可能堆积大量请求导致 OOM
  • CachedThreadPool 最大线程数为 Integer.MAX_VALUE,可能创建过多线程

5.2 ThreadPoolExecutor(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // 核心线程数
20, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲存活时间
new LinkedBlockingQueue<>(1000), // 有界队列
new ThreadFactoryBuilder()
.setNameFormat("业务线程-%d")
.setDaemon(false)
.build(), // 自定义线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

// 提交任务
executor.execute(() -> {
// 任务逻辑
});

// 优雅关闭
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}

5.3 ForkJoinPool(分治任务)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
ForkJoinPool forkJoinPool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors()
);

// 递归任务
class SumTask extends RecursiveTask<Long> {
private final long[] array;
private final int start, end;
private static final int THRESHOLD = 1000;

@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 直接计算
long sum = 0;
for (int i = start; i < end; i++) sum += array[i];
return sum;
}
// 分治
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 异步执行
return right.compute() + left.join(); // 合并结果
}
}

Long result = forkJoinPool.invoke(new SumTask(array, 0, array.length));

六、线程池参数调优

6.1 核心线程数估算

1
2
3
4
5
6
7
// CPU 密集型任务:N + 1(N 为 CPU 核心数)
int cpuIntensive = Runtime.getRuntime().availableProcessors() + 1;

// IO 密集型任务:2N 或 N * (1 + 等待时间/计算时间)
int ioIntensive = Runtime.getRuntime().availableProcessors() * 2;

// 混合型任务:根据实际压测调整

6.2 队列容量估算

1
2
// 队列容量 = 每秒任务数 × 任务平均耗时 × 允许等待时间系数
int queueSize = (int) (taskPerSecond * avgTaskTime * 2);

6.3 动态调参

1
2
3
// 运行时动态调整
executor.setCorePoolSize(newCoreSize);
executor.setMaximumPoolSize(newMaxSize);

七、线程池监控

1
2
3
4
5
6
7
8
9
10
11
12
13
ThreadPoolExecutor executor = ...;

// 监控指标
int activeCount = executor.getActiveCount(); // 活跃线程数
int poolSize = executor.getPoolSize(); // 当前线程数
int queueSize = executor.getQueue().size(); // 队列任务数
long completedCount = executor.getCompletedTaskCount();// 已完成任务数
long taskCount = executor.getTaskCount(); // 总任务数

// 使用率 = 活跃线程数 / 最大线程数
double usage = (double) activeCount / executor.getMaximumPoolSize();

// 可结合 Prometheus + Grafana 做可视化监控

八、最佳实践

  1. 使用有界队列:防止 OOM
  2. 自定义线程工厂:便于问题排查
  3. 合理设置拒绝策略:根据业务场景选择
  4. 隔离不同业务线程池:避免相互影响
  5. 优雅关闭:先 shutdown(),再 awaitTermination()
  6. 监控线程池状态:及时发现问题
1
2
3
4
5
6
7
8
9
10
11
12
// 推荐配置模板
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
new ThreadFactoryBuilder().setNameFormat("pool-%d").build(),
(r, e) -> log.error("任务被拒绝,队列已满")
);

// 预热核心线程(可选)
executor.prestartAllCoreThreads();