Java线程池源码深度解析:从任务提交到Worker线程的生命周期
Java线程池是并发编程的核心组件,通过复用线程资源提升系统吞吐量,避免频繁创建销毁线程的开销。本文将从源码层面深度解析线程池的任务提交、队列管理、Worker线程创建与销毁的全生命周期,帮助开发者理解其底层机制。
一、任务提交:execute()方法的入口逻辑
线程池的任务提交入口是ThreadPoolExecutor.execute(Runnable command)方法,其核心逻辑分为三步:
1.1 快速失败检查
if (command == null)throw new NullPointerException();
源码首先校验任务非空,避免NPE。
1.2 核心参数计算
int c = ctl.get(); // 获取线程池状态和worker数量if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // 尝试创建核心线程return;c = ctl.get(); // 失败后重新获取状态}
ctl是AtomicInteger,高3位存储运行状态(RUNNING/SHUTDOWN等),低29位存储worker数量。- 若当前worker数小于
corePoolSize,直接创建新worker执行任务(核心线程不空闲也不会被回收)。
1.3 任务入队与拒绝策略
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (!isRunning(recheck) && remove(command))reject(command); // 线程池关闭时从队列移除任务并拒绝else if (workerCountOf(recheck) == 0)addWorker(null, false); // 队列非空但无worker时创建非核心线程} else if (!addWorker(command, false)) // 队列满时尝试创建非核心线程reject(command); // 超过最大线程数则触发拒绝策略
- 任务优先入队(
workQueue),若队列满则尝试创建非核心线程(最大到maximumPoolSize)。 - 拒绝策略通过
RejectedExecutionHandler实现,默认抛出RejectedExecutionException。
二、Worker线程的创建与运行
Worker是线程池的核心工作单元,继承AQS并实现Runnable接口:
2.1 Worker的创建(addWorker方法)
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();// 检查线程池状态是否允许添加workerif (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry; // CAS增加worker数成功则退出循环c = ctl.get(); // 失败后重新检查状态}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask); // 创建Worker对象final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 再次检查线程池状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 线程未启动则抛异常throw new IllegalThreadStateException();workers.add(w); // 添加到worker集合workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start(); // 启动线程workerStarted = true;}}} finally {if (!workerStarted)addWorkerFailed(w); // 启动失败则清理资源}return workerStarted;}
- 双重检查:先通过CAS增加worker计数,再通过锁保证线程安全地添加到
workers集合。 - Worker构造:传入
firstTask作为初始任务,线程启动后执行Worker.run()。
2.2 Worker的运行(runWorker方法)
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 释放初始锁,允许中断boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock(); // 获取锁防止被中断if ((runStateAtLeast(ctl.get(), STOP)|| (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))&& !wt.isInterrupted())wt.interrupt(); // 线程池停止时中断workertry {beforeExecute(wt, task); // 钩子方法Throwable thrown = null;try {task.run(); // 执行任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); // 钩子方法}} finally {task = null;w.completedTasks++; // 任务计数+1w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly); // 清理worker}}
- 任务获取:优先执行
firstTask,后续通过getTask()从队列拉取任务。 - 中断处理:线程池停止时(STATE >= STOP)会中断worker线程。
- 钩子方法:
beforeExecute和afterExecute允许子类扩展。
2.3 任务获取策略(getTask方法)
private Runnable getTask() {boolean timedOut = false; // 是否超时for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查线程池状态if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount(); // 减少worker计数return null;}int wc = workerCountOf(c);boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {try {if (compareAndDecrementWorkerCount(c))return null; // 超过最大线程数或超时则退出continue;}}try {Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS): workQueue.take(); // 非超时则阻塞获取if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
- 核心线程保留:若
allowCoreThreadTimeOut=false,核心线程会一直存活(即使空闲)。 - 非核心线程回收:空闲时间超过
keepAliveTime的非核心线程会被回收。
三、Worker线程的销毁与资源清理
Worker线程的销毁通过processWorkerExit完成:
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 异常退出时减少worker计数decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks; // 更新总任务数workers.remove(w); // 从集合移除} finally {mainLock.unlock();}tryTerminate(); // 尝试终止线程池int c = ctl.get();if (runStateLessThan(c, STOP)) // 线程池未停止时补充新workerif (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && !workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return;}addWorker(null, false); // 补充非核心线程}
- 资源释放:移除worker并更新任务统计。
- 线程补充:若线程数低于
corePoolSize(或1,当队列非空时),会创建新worker。
四、最佳实践与调优建议
-
合理配置参数:
corePoolSize:根据业务并发量设置,避免频繁创建销毁线程。maximumPoolSize:应对突发流量,但需警惕资源耗尽。keepAliveTime:非核心线程空闲回收时间,默认60秒。
-
选择合适的队列:
SynchronousQueue:无容量,适合高并发、任务执行快的场景。LinkedBlockingQueue:无界队列,需防止OOM。ArrayBlockingQueue:有界队列,需配合拒绝策略。
-
拒绝策略选择:
AbortPolicy(默认):抛异常,适合严格场景。CallerRunsPolicy:调用线程执行任务,降低负载。DiscardOldestPolicy:丢弃最旧任务,尝试提交新任务。
-
监控与调优:
- 通过
ThreadPoolExecutor的activeCount()、getQueue().size()等方法监控运行状态。 - 使用JMX或自定义指标收集线程池指标。
- 通过
五、总结
Java线程池通过execute()方法接收任务,经由队列管理和Worker线程执行,最终通过processWorkerExit完成资源清理。其核心设计包括:
- 状态控制:通过
ctl原子变量管理线程池状态和worker数量。 - 任务调度:优先使用核心线程,队列满后创建非核心线程,超量时触发拒绝策略。
- 线程复用:Worker线程通过循环从队列拉取任务,避免频繁创建销毁。
理解线程池源码有助于开发者优化并发编程性能,避免资源浪费和系统过载。在实际应用中,需结合业务场景合理配置参数,并通过监控动态调整。