Java并发-线程池深度解析
发表于更新于
字数总计:1.7k阅读时长:6分钟 上海
线程池深度解析
线程池是高并发系统的核心组件。理解其原理和参数调优至关重要。
一、为什么需要线程池?
1.1 线程的开销
创建一个线程的代价是昂贵的:
| 开销类型 | 说明 |
|---|
| 内存分配 | 每个线程需要分配约 1MB 栈内存 |
| 系统调用 | 创建/销毁涉及用户态到内核态切换 |
| 上下文切换 | 线程过多导致频繁切换,约 1-10 微秒/次 |
| GC 压力 | 线程对象创建销毁增加 GC 负担 |
1.2 线程池的优势
- 降低资源消耗:复用线程,避免频繁创建销毁
- 提高响应速度:任务到达时无需等待线程创建
- 提高可管理性:统一分配、调优和监控
- 提供更多功能:定时执行、周期执行、并发数控制
二、ThreadPoolExecutor 核心参数
1 2 3 4 5 6 7 8 9
| public ThreadPoolExecutor( int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 空闲存活时间 TimeUnit unit, // 时间单位 BlockingQueue<Runnable> workQueue, // 工作队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler // 拒绝策略 )
|
2.1 参数详解
| 参数 | 说明 | 建议 |
|---|
| corePoolSize | 核心线程数,即使空闲也不会被回收 | CPU 密集型:N+1;IO 密集型:2N |
| maximumPoolSize | 最大线程数,队列满后才会创建 | 根据系统资源和任务特性设置 |
| keepAliveTime | 非核心线程的空闲存活时间 | 通常 60 秒 |
| workQueue | 存放待执行任务的阻塞队列 | 有界队列更安全 |
| threadFactory | 创建线程的工厂,可自定义线程名 | 便于问题排查 |
| handler | 队列满且线程数达到最大时的拒绝策略 | 根据业务选择 |
2.2 工作队列类型
| 队列类型 | 特点 | 适用场景 |
|---|
| ArrayBlockingQueue | 有界数组队列,FIFO | 防止资源耗尽 |
| LinkedBlockingQueue | 可选有界链表队列 | 吞吐量高于数组队列 |
| SynchronousQueue | 不存储元素,直接传递 | 需要无限最大线程数 |
| PriorityBlockingQueue | 优先级队列 | 需要任务优先级 |
| DelayQueue | 延迟队列 | 定时任务 |
2.3 拒绝策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| new ThreadPoolExecutor.AbortPolicy();
new ThreadPoolExecutor.CallerRunsPolicy();
new ThreadPoolExecutor.DiscardPolicy();
new ThreadPoolExecutor.DiscardOldestPolicy();
new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.warn("任务被拒绝: {}", r.toString()); } };
|
三、线程池执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| 提交任务 | v +-------------------------------+ | corePoolSize 是否已满? | +---------------+---------------+ | | 否| |是 v v +-------+-------+ +---+---+ | 创建核心线程 | | 队列满?| | 执行任务 | +---+---+ +---------------+ | 否| |是 v v +--------+ +---------------+ | 入队列 | | 达到最大线程? | +--------+ +-------+-------+ | 否| |是 v v +---------+ +----------+ |创建非核心| | 拒绝策略 | |线程执行 | +----------+ +---------+
|
3.1 源码解析:execute 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
|
四、线程的生命周期
Java 线程在运行过程中有 6 种状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| +-------+ | NEW | 线程创建,未调用 start() +---+---+ | v start() +---+---+ +------>|RUNNABLE|<------+ | +---+---+ | | | | notify() | | | notify() notifyAll | synchronized | interrupt() 超时结束 | Lock.lock() | 超时结束 | | | | v | +-------+---+ +---+---+ +----+------+ | WAITING | |BLOCKED| |TIMED_WAIT | +-----------+ +-------+ +-----------+ wait() 等待锁 sleep(n) join() wait(n) park() join(n) | v run() 结束 +---+---+ |TERMINATED| +----------+
|
| 状态 | 说明 | 触发条件 |
|---|
| NEW | 初始状态 | new Thread() |
| RUNNABLE | 可运行(含运行中和就绪) | start() |
| BLOCKED | 阻塞,等待锁 | synchronized 获取锁失败 |
| WAITING | 无限等待 | wait()、join()、LockSupport.park() |
| TIMED_WAITING | 超时等待 | sleep(n)、wait(n)、join(n) |
| TERMINATED | 终止 | run() 执行完毕或异常退出 |
五、创建线程池的方式
5.1 Executors 工厂方法(不推荐)
1 2 3 4 5 6 7 8 9 10 11
| ExecutorService fixed = Executors.newFixedThreadPool(10);
ExecutorService cached = Executors.newCachedThreadPool();
ExecutorService single = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(5);
|
阿里巴巴 Java 开发手册明确禁止使用 Executors 创建线程池,原因:
FixedThreadPool 和 SingleThreadPool 使用无界队列,可能堆积大量请求导致 OOMCachedThreadPool 最大线程数为 Integer.MAX_VALUE,可能创建过多线程
5.2 ThreadPoolExecutor(推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder() .setNameFormat("业务线程-%d") .setDaemon(false) .build(), new ThreadPoolExecutor.CallerRunsPolicy() );
executor.execute(() -> { });
executor.shutdown(); try { if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); }
|
5.3 ForkJoinPool(分治任务)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| ForkJoinPool forkJoinPool = new ForkJoinPool( Runtime.getRuntime().availableProcessors() );
class SumTask extends RecursiveTask<Long> { private final long[] array; private final int start, end; private static final int THRESHOLD = 1000;
@Override protected Long compute() { if (end - start <= THRESHOLD) { long sum = 0; for (int i = start; i < end; i++) sum += array[i]; return sum; } int mid = (start + end) / 2; SumTask left = new SumTask(array, start, mid); SumTask right = new SumTask(array, mid, end); left.fork(); return right.compute() + left.join(); } }
Long result = forkJoinPool.invoke(new SumTask(array, 0, array.length));
|
六、线程池参数调优
6.1 核心线程数估算
1 2 3 4 5 6 7
| int cpuIntensive = Runtime.getRuntime().availableProcessors() + 1;
int ioIntensive = Runtime.getRuntime().availableProcessors() * 2;
|
6.2 队列容量估算
1 2
| int queueSize = (int) (taskPerSecond * avgTaskTime * 2);
|
6.3 动态调参
1 2 3
| executor.setCorePoolSize(newCoreSize); executor.setMaximumPoolSize(newMaxSize);
|
七、线程池监控
1 2 3 4 5 6 7 8 9 10 11 12 13
| ThreadPoolExecutor executor = ...;
int activeCount = executor.getActiveCount(); int poolSize = executor.getPoolSize(); int queueSize = executor.getQueue().size(); long completedCount = executor.getCompletedTaskCount(); long taskCount = executor.getTaskCount();
double usage = (double) activeCount / executor.getMaximumPoolSize();
|
八、最佳实践
- 使用有界队列:防止 OOM
- 自定义线程工厂:便于问题排查
- 合理设置拒绝策略:根据业务场景选择
- 隔离不同业务线程池:避免相互影响
- 优雅关闭:先
shutdown(),再 awaitTermination() - 监控线程池状态:及时发现问题
1 2 3 4 5 6 7 8 9 10 11 12
| ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueCapacity), new ThreadFactoryBuilder().setNameFormat("pool-%d").build(), (r, e) -> log.error("任务被拒绝,队列已满") );
executor.prestartAllCoreThreads();
|