Java线程池源码深度解析:生命周期全流程揭秘

Java线程池源码深度解析:从任务提交到Worker线程的生命周期

Java线程池是并发编程的核心组件,通过复用线程资源提升系统吞吐量,避免频繁创建销毁线程的开销。本文将从源码层面深度解析线程池的任务提交、队列管理、Worker线程创建与销毁的全生命周期,帮助开发者理解其底层机制。

一、任务提交:execute()方法的入口逻辑

线程池的任务提交入口是ThreadPoolExecutor.execute(Runnable command)方法,其核心逻辑分为三步:

1.1 快速失败检查

  1. if (command == null)
  2. throw new NullPointerException();

源码首先校验任务非空,避免NPE。

1.2 核心参数计算

  1. int c = ctl.get(); // 获取线程池状态和worker数量
  2. if (workerCountOf(c) < corePoolSize) {
  3. if (addWorker(command, true)) // 尝试创建核心线程
  4. return;
  5. c = ctl.get(); // 失败后重新获取状态
  6. }
  • ctl是AtomicInteger,高3位存储运行状态(RUNNING/SHUTDOWN等),低29位存储worker数量。
  • 若当前worker数小于corePoolSize,直接创建新worker执行任务(核心线程不空闲也不会被回收)。

1.3 任务入队与拒绝策略

  1. if (isRunning(c) && workQueue.offer(command)) {
  2. int recheck = ctl.get();
  3. if (!isRunning(recheck) && remove(command))
  4. reject(command); // 线程池关闭时从队列移除任务并拒绝
  5. else if (workerCountOf(recheck) == 0)
  6. addWorker(null, false); // 队列非空但无worker时创建非核心线程
  7. } else if (!addWorker(command, false)) // 队列满时尝试创建非核心线程
  8. reject(command); // 超过最大线程数则触发拒绝策略
  • 任务优先入队(workQueue),若队列满则尝试创建非核心线程(最大到maximumPoolSize)。
  • 拒绝策略通过RejectedExecutionHandler实现,默认抛出RejectedExecutionException

二、Worker线程的创建与运行

Worker是线程池的核心工作单元,继承AQS并实现Runnable接口:

2.1 Worker的创建(addWorker方法)

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. // 检查线程池状态是否允许添加worker
  6. if (runStateAtLeast(c, SHUTDOWN)
  7. && (runStateAtLeast(c, STOP)
  8. || firstTask != null
  9. || workQueue.isEmpty()))
  10. return false;
  11. for (;;) {
  12. int wc = workerCountOf(c);
  13. if (wc >= CAPACITY ||
  14. wc >= (core ? corePoolSize : maximumPoolSize))
  15. return false;
  16. if (compareAndIncrementWorkerCount(c))
  17. break retry; // CAS增加worker数成功则退出循环
  18. c = ctl.get(); // 失败后重新检查状态
  19. }
  20. }
  21. boolean workerStarted = false;
  22. boolean workerAdded = false;
  23. Worker w = null;
  24. try {
  25. w = new Worker(firstTask); // 创建Worker对象
  26. final Thread t = w.thread;
  27. if (t != null) {
  28. final ReentrantLock mainLock = this.mainLock;
  29. mainLock.lock();
  30. try {
  31. // 再次检查线程池状态
  32. int rs = runStateOf(ctl.get());
  33. if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
  34. if (t.isAlive()) // 线程未启动则抛异常
  35. throw new IllegalThreadStateException();
  36. workers.add(w); // 添加到worker集合
  37. workerAdded = true;
  38. }
  39. } finally {
  40. mainLock.unlock();
  41. }
  42. if (workerAdded) {
  43. t.start(); // 启动线程
  44. workerStarted = true;
  45. }
  46. }
  47. } finally {
  48. if (!workerStarted)
  49. addWorkerFailed(w); // 启动失败则清理资源
  50. }
  51. return workerStarted;
  52. }
  • 双重检查:先通过CAS增加worker计数,再通过锁保证线程安全地添加到workers集合。
  • Worker构造:传入firstTask作为初始任务,线程启动后执行Worker.run()

2.2 Worker的运行(runWorker方法)

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // 释放初始锁,允许中断
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock(); // 获取锁防止被中断
  10. if ((runStateAtLeast(ctl.get(), STOP)
  11. || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
  12. && !wt.isInterrupted())
  13. wt.interrupt(); // 线程池停止时中断worker
  14. try {
  15. beforeExecute(wt, task); // 钩子方法
  16. Throwable thrown = null;
  17. try {
  18. task.run(); // 执行任务
  19. } catch (RuntimeException x) {
  20. thrown = x; throw x;
  21. } catch (Error x) {
  22. thrown = x; throw x;
  23. } catch (Throwable x) {
  24. thrown = x; throw new Error(x);
  25. } finally {
  26. afterExecute(task, thrown); // 钩子方法
  27. }
  28. } finally {
  29. task = null;
  30. w.completedTasks++; // 任务计数+1
  31. w.unlock();
  32. }
  33. }
  34. completedAbruptly = false;
  35. } finally {
  36. processWorkerExit(w, completedAbruptly); // 清理worker
  37. }
  38. }
  • 任务获取:优先执行firstTask,后续通过getTask()从队列拉取任务。
  • 中断处理:线程池停止时(STATE >= STOP)会中断worker线程。
  • 钩子方法beforeExecuteafterExecute允许子类扩展。

2.3 任务获取策略(getTask方法)

  1. private Runnable getTask() {
  2. boolean timedOut = false; // 是否超时
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // 检查线程池状态
  7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8. decrementWorkerCount(); // 减少worker计数
  9. return null;
  10. }
  11. int wc = workerCountOf(c);
  12. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  13. if ((wc > maximumPoolSize || (timed && timedOut))
  14. && (wc > 1 || workQueue.isEmpty())) {
  15. try {
  16. if (compareAndDecrementWorkerCount(c))
  17. return null; // 超过最大线程数或超时则退出
  18. continue;
  19. }
  20. }
  21. try {
  22. Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
  23. : workQueue.take(); // 非超时则阻塞获取
  24. if (r != null)
  25. return r;
  26. timedOut = true;
  27. } catch (InterruptedException retry) {
  28. timedOut = false;
  29. }
  30. }
  31. }
  • 核心线程保留:若allowCoreThreadTimeOut=false,核心线程会一直存活(即使空闲)。
  • 非核心线程回收:空闲时间超过keepAliveTime的非核心线程会被回收。

三、Worker线程的销毁与资源清理

Worker线程的销毁通过processWorkerExit完成:

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. if (completedAbruptly) // 异常退出时减少worker计数
  3. decrementWorkerCount();
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. completedTaskCount += w.completedTasks; // 更新总任务数
  8. workers.remove(w); // 从集合移除
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate(); // 尝试终止线程池
  13. int c = ctl.get();
  14. if (runStateLessThan(c, STOP)) // 线程池未停止时补充新worker
  15. if (!completedAbruptly) {
  16. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  17. if (min == 0 && !workQueue.isEmpty())
  18. min = 1;
  19. if (workerCountOf(c) >= min)
  20. return;
  21. }
  22. addWorker(null, false); // 补充非核心线程
  23. }
  • 资源释放:移除worker并更新任务统计。
  • 线程补充:若线程数低于corePoolSize(或1,当队列非空时),会创建新worker。

四、最佳实践与调优建议

  1. 合理配置参数

    • corePoolSize:根据业务并发量设置,避免频繁创建销毁线程。
    • maximumPoolSize:应对突发流量,但需警惕资源耗尽。
    • keepAliveTime:非核心线程空闲回收时间,默认60秒。
  2. 选择合适的队列

    • SynchronousQueue:无容量,适合高并发、任务执行快的场景。
    • LinkedBlockingQueue:无界队列,需防止OOM。
    • ArrayBlockingQueue:有界队列,需配合拒绝策略。
  3. 拒绝策略选择

    • AbortPolicy(默认):抛异常,适合严格场景。
    • CallerRunsPolicy:调用线程执行任务,降低负载。
    • DiscardOldestPolicy:丢弃最旧任务,尝试提交新任务。
  4. 监控与调优

    • 通过ThreadPoolExecutoractiveCount()getQueue().size()等方法监控运行状态。
    • 使用JMX或自定义指标收集线程池指标。

五、总结

Java线程池通过execute()方法接收任务,经由队列管理和Worker线程执行,最终通过processWorkerExit完成资源清理。其核心设计包括:

  • 状态控制:通过ctl原子变量管理线程池状态和worker数量。
  • 任务调度:优先使用核心线程,队列满后创建非核心线程,超量时触发拒绝策略。
  • 线程复用:Worker线程通过循环从队列拉取任务,避免频繁创建销毁。

理解线程池源码有助于开发者优化并发编程性能,避免资源浪费和系统过载。在实际应用中,需结合业务场景合理配置参数,并通过监控动态调整。