Java 里的线程池 | LIXI.FUN
0%

Java 里的线程池

线程池

池化思想在线程中的实际应用,具体到 Java 中就是

类别 说明
ExecutorService 定义线程池行为的接口
ThreadPoolExecutor 具体的实现类
Executors 用于创建线程池的工具类

为什么使用线程池?

  1. 降消耗 - 减少因大量创建和销毁重量级资源耗费的系统资源;
  2. 加限制 - 合理的线程池参数设置可以限制应用所能申请的资源,避免系统资源耗尽。

同样适用于其他各种连接池。

常用线程池

线程池 队列类型 适用场景
newFixedThreadPool LinkedBlockingQueue (Integer.MAX_VALUE) CPU 密集,耗时长的任务
newCachedThreadPool SynchronousQueue 大量耗时短的任务
newSingleThreadPool LinkedBlockingQueue 串行执行
newScheduledThreadPool DelayedWorkQueue 周期定时执行

线程池的原理

核心参数

1
2
3
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final BlockingQueue<Runnable> workQueue;
参数 意义
corePoolSize 核心线程数(一般也就是最小线程数)
maximumPoolSize 最大线程数
workQueue 任务队列的长度

添加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 源码里的注释实在是太清晰了,直接看源码吧
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

RUNNING 状态下,提交一个任务 execute(Runnable command),如果线程池中的线程数小于 corePoolSize 则新建一个线程用于执行此任务,如果线程池中的线程数大于等于 corePoolSize 且小于 maxPoolSize 则任务会被放入 workQueue 中排队等待,如果队列满了,就新建线程执行,如果不仅队列满了,还达到了 maxPoolSize 那么只能执行拒绝策略了。

创建线程池注意事项

在《阿里巴巴 Java 开发手册》中推荐手动使用 ThreadPoolExecutor 创建线程池,而不是直接用 Executors 去创建,理由如下:

FixedThreadPool 和 SingleThreadPool:允许的请求队列长度是 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

1
2
3
4
5
6
7
8
9
10
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public LinkedBlockingQueue() {
// 队列长度为 Integer.MAX_VALUE
this(Integer.MAX_VALUE);
}

CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

1
2
3
4
5
6
public static ExecutorService newCachedThreadPool() {
// 第二个参数 maximumPoolSize 是 Integer.MAX_VALUE
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

生命周期

五种运行状态

状态 说明
RUNNING 能接受新任务,并且处理队列中的任务
SHUTDOWN 不接受新任务,但是处理队列中的任务
STOP 不接受新任务,也不处理队列中的任务,同时终止正在运行的任务
TIDYING 所有的任务都被中止, workerCount 是 0,状态转换到 TIDYING,然后运行 terminated() 钩子函数
TERMINATED terminated() 函数执行完成

表示状态的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 初始值 111_0{29} | 000_0{29} = 111_0{29}
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 用于表示线程数量的位 32 - 3 = 29 位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程的容量 (1 << 29) - 1,存储在低 29 位中 000_1{29}
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 运行状态被存储在高 3 位中,3 位,可表示 2^3 八种状态,用了 5 种
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 111_0{29}
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000_0{29}
private static final int STOP = 1 << COUNT_BITS; // 001_0{29}
private static final int TIDYING = 2 << COUNT_BITS; // 010_0{29}
private static final int TERMINATED = 3 << COUNT_BITS; // 011_0{29}

// Packing and unpacking ctl
// 获取运行状态, c & 111_0{29}
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取工作线程数 c & 000_1{29}
private static int workerCountOf(int c) { return c & CAPACITY; }
// 运行状态和工作线程数的组合
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

拒绝策略

策略 说明
CallerRunsPolicy 在调用者的线程内执行
AbortPolicy (默认) 直接抛出 RejectedExecutionException 拒绝运行
DiscardPolicy 静默丢弃,什么也不做
DiscardOldestPolicy 丢弃最老的任务,转为执行提交的任务

拒绝策略源码分析

拒绝策略的接口

1
2
3
4
5
6
7
8
9
10
package java.util.concurrent;

public interface RejectedExecutionHandler {

/**
* 拒绝策略执行的动作
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

}

RejectedExecutionHandler 的实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 在调用者的线程中执行
* 如果当前线程池被关闭,则丢弃
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {

/**
* 线程池 e 没有关闭的时候,用调用者的线程执行 r
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 直接调用 run() 方法就是在调用者的线程中执行
r.run();
}
}
}

/**
* 直接抛出 RejectedExecutionException 异常
*/
public static class AbortPolicy implements RejectedExecutionHandler {

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 抛出异常,拒绝执行
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

/**
* 静默丢弃,什么也不做
*/
public static class DiscardPolicy implements RejectedExecutionHandler {

/**
* 什么也不做
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

/**
* 丢弃最老的任务,提交当前任务
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 丢弃最老的任务
e.getQueue().poll();
// 提交当前任务
e.execute(r);
}
}
}

ThreadPoolExecutor 中的默认拒绝策略

1
2
3
4
5
6
7
/**
* 在 ThreadPoolExecutor 中的默认拒绝策略
*
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
觉得有收获就鼓励下作者吧