以 Tomcat 为例分析 Java 中的线程池
首先,为什么会有“池”的概念?
我们的项目在运行过程中,需要使用系统资源(CPU、内存、网络、磁盘等)来完成信息的处理,比如在 JVM 中新建对象就需要消耗 CPU 和内存资源,当需要频繁创建大量的对象,并且这些对象的存活时间短,就意味着需要进行频繁销毁,那么很有可能这部分代码会成为性能的瓶颈。
而“池”就是用来解决这个问题的,简单来说,对象池就是把用过的对象保存起来,等下一次需要这种对象的时候,直接从对象池中拿出来重复使用,避免频繁地创建和销毁。
Java 线程池
ThreadPoolExecutor
看下 java.util.concurrent.ThreadPoolExecutor 中的构造方法
java"> /**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
每次提交任务时,如果线程数还没达到核心线程数 corePoolSize,线程池就创建新线程来执行。当线程数达到**corePoolSize **后,新增的任务就放到工作队列 workQueue 里,而线程池中的线程则努力地从 workQueue 里拉活来干,也就是调用 poll 方法来获取任务。
如果任务很多,并且 workQueue 是个有界队列,队列可能会满,此时线程池就会紧急创建新的临时线程来救场,如果总的线程数达到了最大线程数 maximumPoolSize,则不能再创建新的临时线程了,转而执行拒绝策略 handler,比如抛出异常或者由调用者线程来执行任务
等。
如果高峰过去了,线程池比较闲了怎么办?临时线程使用 poll(keepAliveTime, unit)方法从工作队列中拉活干,请注意 poll 方法设置了超时时间,如果超时了仍然两手空空没拉到活,表明它太闲了,这个线程会被销毁回收。
那还有一个参数 threadFactory 是用来做什么的呢?通过它你可以扩展原生的线程工厂,比如给创建出来的线程取个有意义的名字。
注意这些默认策略是可以修改的:
- 声明线程池后立即调用 prestartAllCoreThreads 方法,来启动所有核心线程;
- 传入 true 给 allowCoreThreadTimeOut 方法,来让线程池在空闲的时候同样回收核心线程。
FixedThreadPool/CachedThreadPool
Java 提供了一些默认的线程池实现,比如 FixedThreadPool 和 CachedThreadPool,它们的本质就是给 ThreadPoolExecutor 设置了不同的参数,是定制版的 ThreadPoolExecutor。
java">public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
对比一下我们可以发现,线程池的两个关键点就是:
- 是否限制线程个数。
- 是否限制队列长度。
FixedThreadPool 的核心线程数就是最大线程数,当忙不过来时 task 会被丢到 LinkedBlockingQueue 队列中,注意:这是一个无界队列,也就是在任务量足够大时会触发 OOM。
CachedThreadPool 的核心线程数为 0,最大线程数是 Integer 的最大值,因此它对线程个数不做限制
,忙不过来时无限创建临时线程,闲下来时再回收。它的任务队列是 SynchronousQueue,表明队列长度为 0。在任务量足够大时会触发 OOM,因为资源是有限的,无法一直创建新线程。
因此,不建议使用 Executors 提供的两种快捷的线程池
,原因如下:
- 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数。
- 任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题。
拒绝策略
拒绝策略 | 特点 | 适用场景 |
---|---|---|
AbortPolicy | 默认策略,抛出 RejectedExecutionException 异常。 | 不允许丢失任务,任务必须立即处理的场景。 |
CallerRunsPolicy | 任务由调用者线程执行,降低并发度。 | 调用者线程可以处理任务,适合减少任务提交速度的场景。 |
DiscardPolicy | 丢弃无法执行的任务,不抛出异常。 | 可以容忍任务丢失,适合不重要的任务丢弃。 |
DiscardOldestPolicy | 丢弃队列中最旧的任务,加入新任务。 | 适合丢弃最旧任务,保持队列中的新任务。 |
其中,注意 CallerRunsPolicy,在实际业务开发中,可能会导致 tomcat 的工作线程来进行业务的处理,进一步降低系统并发度。
Tomcat 线程池
Java 线程池是先用工作队列来存放来不及处理的任务,满了之后再扩容线程池。当我们的工作队列设置得很大时,最大线程数这个参数显得没有意义,因为队列很难满,或者到满的时候再去扩容线程池已经于事无补了。那么,我们有没有办法让线程池更激进一点,优先开启更多的线程,而把队列当成一个后备方案呢?
有的兄弟,有的
按照正常的线程池流程,在任务到来时,如果队列已满,就会创建新的非核心线程,那么可以重写队列的 offer 方法,造成队列已满的假象,在线程数达到最大线程数时,执行拒绝策略的时候,把任务尝试加入队列,如果这时队列真的满了,再按照拒绝策略处理。
ThreadPoolExecutor
java"> public void execute(Runnable command, long timeout, TimeUnit unit) {
// 计数器 + 1,维护提交到了线程池但是还没执行完成的任务数量
submittedCount.incrementAndGet();
try {
// 尝试进行处理
executeInternal(command);
} catch (RejectedExecutionException rx) {
if (getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue) getQueue();
try {
// 继续尝试把任务放到任务队列中去
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
// 计数器 - 1,抛出异常
submittedCount.decrementAndGet();
throw rx;
}
}
}
private void executeInternal(Runnable command) {
int c = ctl.get();
// 线程数 < 核心线程数,创建一个核心线程来接收
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) {
return;
}
c = ctl.get();
}
// 线程数 >= 核心线程数,尝试让队列接收
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) {
reject(command);
} else if (workerCountOf(recheck) == 0) { // 有可能此时有线程死亡了,再次检查是否需要添加线程
addWorker(null, false);
}
} else if (!addWorker(command, false)) { // 队列接收可能返回 false,尝试创建非核心线程来处理
reject(command);
}
}
TaskQuene,注意继承了 LinkedBlockingQueue 无界队列,如果不重写线程池执行方法,新请求只会放入队列,直到 OOM。
java"> // TaskQueue extends LinkedBlockingQueue<Runnable>
// 进入此方法的前提是 当前线程数已经达到了核心线程数
public boolean offer(Runnable o) {
//we can't do any checks
if (parent == null) {
return super.offer(o);
}
// we are maxed out on threads, simply queue the object
// 线程数已经到了最大值,不能创建新线程了,只能把任务添加到任务队列。
if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
return super.offer(o);
}
// we have idle threads, just add it to the queue
// 已提交的任务数 <= 当前线程数,表示还有空闲线程,无需创建新线程
// AtomicInteger submittedCount 维护已经提交到了线程池,但是还没有执行完的任务个数
if (parent.getSubmittedCount() <= (parent.getPoolSize())) {
return super.offer(o);
}
// if we have less threads than maximum force creation of a new thread
// 已提交的任务数 > 当前线程数,并且当前线程数 < 最大线程数,返回 false 创建新线程
if (parent.getPoolSize() < parent.getMaximumPoolSize()) {
return false;
}
// if we reached here, we need to add it to the queue
// 其他情况下添加到队列中
return super.offer(o);
}
可以看到,在 24 行如果 当前线程数 > 核心线程数,且 < 最大线程数,会优先创建新的非核心线程,而不是优先使用队列。原因是队列继承了无界队列,如果先放入队列会导致最大线程数失效,定制版的任务队列,重写了 offer 方法,使得在任务队列长度无限制的情况下,线程池仍然有机会创建新的线程。
Tomcat 的线程池与 Java 原生线程池的最大区别是:在线程数达到最大线程数后,继续尝试把任务添加到任务队列中去,如果这时候插入失败,再真正执行拒绝策略。
最佳实践
要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列,既然使用了线程池就需要确保线程池是在复用的,每次 new 一个线程池出来可能比不用线程池还糟糕。复用线程池不代表应用程序始终使用同一个线程池,我们应该根据任务的性质来选用不同的线程池。特别注意 IO 绑定的任务和 CPU 绑定的任务对于线程池属性的偏好,如果希望减少任务间的相互干扰,考虑按需使用隔离的线程池。
- 对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队列。
- 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做缓冲。