JAVA线程池简单分析

       java 1.5引入了java.util.concurrent包,作者是Doug Lea这位史诗级别的老爷子。今天我们只挑选并发包下线程池的部分内容分析。

核心类

       对于线程池的使用开发者都已经得心应手了。如果查看源码,都和ThreadPoolExecutor 这个类相关。

配置规则

       ThreadPoolExecutor 提供了四种构造方法实现(这里只介绍一种):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{

if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

       参数大概意思如下:

  • corePoolSize:核心运行的poolSize,也就是当超过这个范围的时候,就需要将新的Runnable放入到等待队列workQueue中了,我们把这些Runnable就叫做要去执行的任务吧。
  • maximumPoolSize:一般你用不到,当大于了这个值就会将任务由一个丢弃处理机制来处理,但是当你发生:newFixedThreadPool的时候,corePoolSize和maximumPoolSize是一样的,而corePoolSize是先执行的,所以他会先被放入等待队列,而不会执行到下面的丢弃处理中,看了后面的代码你就知道了。
  • workQueue:等待队列,当达到corePoolSize的时候,就向该等待队列放入线程信息(默认为一个LinkedBlockingQueue),运行中的线程属性为:workers,为一个HashSet;我们的Runnable内部被包装了一层,后面会看到这部分代码;这个队列默认是一个无界队列(你也可以设定一个有界队列),所以在生产者疯狂生产的时候,考虑如何控制的问题。
  • keepAliveTime:默认都是0,当线程没有任务处理后,保持多长时间,当你使用:newCachedThreadPool(),它将是60s的时间。这个参数在运行中的线程从workQueue获取任务时,当(poolSize >corePoolSize || allowCoreThreadTimeOut)会用到,当然allowCoreThreadTimeOut要设置为true,也会先判定keepAliveTime是大于0的。
  • threadFactory:是构造Thread的方法,你可以自己去包装和传递,主要实现newThread方法即可。
  • handler:也就是参数maximumPoolSize达到后丢弃处理的方法,java提供了5种丢弃处理的方法,当然你也可以自己根据实际情况去重写,主要是要实现接口:RejectedExecutionHandler中的方法: public void rejectedExecution(Runnabler, ThreadPoolExecutor e) java默认的是使用:AbortPolicy,他的作用是当出现这中情况的时候会抛出一个异常。

其余的handler还包括:
1、CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程。
2、DiscardOldestPolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去。
3、DiscardPolicy:什么也不做。
4、AbortPolicy:java默认,抛出一个异常:RejectedExecutionException。
你可以自己写一个,例如我们想在这个处理中,既不是完全丢弃,也不是完全启动,也不是抛异常,而是控制生产者的线程,那么你就可以尝试某种方式将生产者的线程blocking住,其实就有点类似提到的Semaphor的功能了。

状态转换

       ThreadPoolExecutor 类中将线程状态( runState)分为了以下五种:

RUNNING:可以接受新任务并且处理进入队列中的任务
SHUTDOWN:不接受新任务,但是仍然执行队列中的任务
STOP:不接受新任务也不执行队列中的任务
TIDYING:所有任务中止,队列为空,进入该状态下的任务会执行 terminated()方法
TERMINATED: terminated()方法执行完成后进入该状态

       状态之间的转换如下:

  • RUNNING -> SHUTDOWN:调用了 shutdown()方法,可能是在 finalize()方法中被隐式调用
  • (RUNNING or SHUTDOWN) -> STOP:调用 shutdownNow()
  • SHUTDOWN -> TIDYING:当队列和线程池都为空时
  • STOP -> TIDYING:线程池为空时
  • TIDYING -> TERMINATED:terminated()方法执行完成

       这几个变量的定义代码如下:

1
2
3
4
5
6
7
8
9
10
11
//原子变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

       上面的原子变量ctl 是整个类的核心,AtomicInteger保证了对这个变量的操作是原子的,通过巧妙的操作,ThreadPoolExecutor用这一个变量保存了两个内容:

  • 所有有效线程的数量(workerCount)
  • 各个线程的状态(runState)

       低29位存线程数workerCount,高3位存runState。

       所以将上述六个常量打印出来如下:

  • 00011111111111111111111111111111 —— CAPACITY
  • 11100000000000000000000000000000 —— RUNNING
  • 00000000000000000000000000000000 —— SHUTDOWN
  • 00100000000000000000000000000000 —— STOP
  • 01000000000000000000000000000000 —— TIDYING
  • 01100000000000000000000000000000 —— TERMINATED

       第一行代表线程容量,后面5行提取高3位得到:

  • 111 - RUNNING
  • 000 - SHUTDOWN
  • 001 - STOP
  • 010 - TIDYING
  • 011 - TERMINATED

       分别对应5种状态,可以看到这样定义之后,只需要通过简单的移位操作就可以进行状态的转换。
       围绕ctl变量有一些操作,了解这些方法是看懂后面一些晦涩代码的基础:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 这个方法用于取出runState的值 因为CAPACITY值为:00011111111111111111111111111111
* ~为按位取反操作,则~CAPACITY值为:11100000000000000000000000000000
* 再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值
*
* @param c
* 该参数为存储runState和workerCount的int值
* @return runState的值
*/

private static int runStateOf(int c) {
return c & ~CAPACITY;
}


/**
* 这个方法用于取出workerCount的值
* 因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了
* 保留参数的低29位,也就是workerCount的值
*
* @param c
* ctl, 存储runState和workerCount的int值
* @return workerCount的值
*/

private static int workerCountOf(int c) {
return c & CAPACITY;
}

/**
* 将runState和workerCount存到同一个int中
* “|”运算的意思是,假设rs的值是101000,wc的值是000111,则他们位或运算的值为101111
*
* @param rs
* runState移位过后的值,负责填充返回值的高3位
* @param wc
* workerCount移位过后的值,负责填充返回值的低29位
* @return 两者或运算过后的值
*/

private static int ctlOf(int rs, int wc) {
return rs | wc;
}

// 只有RUNNING状态会小于0
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

重要方法

execute

       给线程池加入任务的方法为execute方法,该方法策略大概分三步:
       1)活动线程小于corePoolSize的时候创建新的线程;
       2)活动线程大于corePoolSize时都是先加入到任务队列当中;
       3)任务队列满了再去启动新的线程,如果线程数达到最大值就拒绝任务。
       代码及注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// 活动线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 直接启动新的线程。第二个参数true:addWorker中会重新检查workerCount是否小于corePoolSize
if (addWorker(command, true))
// 添加成功返回
return;
c = ctl.get();
}
// 活动线程数 >= corePoolSize
// runState为RUNNING && 队列未满
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// double check
// 非RUNNING状态 则从workQueue中移除任务并拒绝
if (!isRunning(recheck) && remove(command))
reject(command);// 采用线程池指定的策略拒绝任务
// 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败
else if (workerCountOf(recheck) == 0)
// 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
// 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
addWorker(null, false);

// 两种情况:
// 1.非RUNNING状态拒绝新的任务
// 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}

       注释比较清楚了就不再解释了,其中比较难理解的应该是addWorker(null, false);这一行,这要结合addWorker一起来看。 主要目的是防止HUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。

addWorker

       addWorker即创建新线程。检查在当前线程池状态和限制下能否创建一个新线程,如果可以,会相应改变workerCount,每个worker都会运行他们的firstTask。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);// 当前线程池状态

// Check if queue empty only if necessary.
// 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
// workQueue.isEmpty())
// 满足下列调价则直接返回false,线程创建失败:
// rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时不再接受新的任务,且所有任务执行结束
// rs = SHUTDOWN:firtTask != null 此时不再接受任务,但是仍然会执行队列中的任务
// rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null,
// false),任务为null && 队列为空
// 最后一种情况也就是说SHUTDONW状态下,如果队列不为空还得接着往下执行,为什么?add一个null任务目的到底是什么?
// 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了不再接受新任务
// 但是此时队列不为空,那么还得创建线程把任务给执行完才行。
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;

// 走到这的情形:
// 1.线程池状态为RUNNING
// 2.SHUTDOWN状态,但队列中还有任务需要执行
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))// 原子操作递增workCount
break retry;// 操作成功跳出的重试的循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)// 如果线程池的状态发生变化则重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

// wokerCount递增成功

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 并发的访问线程池workers对象必须加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);

// RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将新启动的线程添加到线程池中
workers.add(w);
// 更新largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
// 当等待keepAlieTime还没有任务执行则该线程结束。见runWoker和getTask方法的代码。
if (workerAdded) {
t.start();// 最终执行的是ThreadPoolExecutor的runWoker方法
workerStarted = true;
}
}
} finally {
// 线程启动失败,则从wokers中移除w并递减wokerCount
if (!workerStarted)
// 递减wokerCount会触发tryTerminate方法
addWorkerFailed(w);
}
return workerStarted;
}

       内部类Worker是对任务的封装,所有submit的Runnable都被封装成了Worker,它本身也是一个Runnable, 然后利用AQS框架实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行,可以看后面shutdown和shutdownNow方法的分析。Worker中获取和释放锁相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// state只有0和1,互斥
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;// 成功获得锁
}
// 线程进入等待队列
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

       之所以不用ReentrantLock是为了避免任务执行的代码中修改线程池的变量,如setCorePoolSize,因为ReentrantLock是可重入的。

runWorker

       任务添加成功后实际执行的是runWorker这个方法,这个方法非常重要,简单来说它做的就是:
       1)第一次启动会执行初始化传进来的任务firstTask;
       2)然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// Worker的构造函数中抑制了线程中断setState(-1),所以这里需要unlock从而允许中断
w.unlock();
// 用于标识是否异常终止,finally中processWorkerExit的方法会有不同逻辑
// 为true的情况:1.执行任务抛出异常;2.被中断。
boolean completedAbruptly = true;
try {
// 如果getTask返回null那么getTask中会将workerCount递减,如果异常了这个递减操作会在processWorkerExit中处理
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
// 任务执行前可以插入一些处理,子类重载该方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();// 执行用户任务
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 和beforeExecute一样,留给子类去重载
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}

completedAbruptly = false;
} finally {
// 结束线程的一些清理工作
processWorkerExit(w, completedAbruptly);
}
}

getTask

       runWorker方法里有一段会调用getTask判断,主要用来取出队列中的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 1.rs > SHUTDOWN 所以rs至少等于STOP,这时不再处理队列中的任务
// 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,这时还需要处理队列中的任务除非队列为空
// 这两种情况都会返回null让runWoker退出while循环也就是当前线程结束了,所以必须要decrement
// wokerCount
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 递减workerCount值
decrementWorkerCount();
return null;
}

// 标记从队列中取任务时是否设置超时时间
boolean timed; // Are workers subject to culling?

// 1.RUNING状态
// 2.SHUTDOWN状态,但队列中还有任务需要执行
for (;;) {
int wc = workerCountOf(c);

// 1.core thread允许被超时,那么超过corePoolSize的的线程必定有超时
// 2.allowCoreThreadTimeOut == false && wc >
// corePoolSize时,一般都是这种情况,core thread即使空闲也不会被回收,只要超过的线程才会
timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 从addWorker可以看到一般wc不会大于maximumPoolSize,所以更关心后面半句的情形:
// 1. timedOut == false 第一次执行循环, 从队列中取出任务不为null方法返回 或者
// poll出异常了重试
// 2.timeOut == true && timed ==
// false:看后面的代码workerQueue.poll超时时timeOut才为true,
// 并且timed要为false,这两个条件相悖不可能同时成立(既然有超时那么timed肯定为true)
// 所以超时不会继续执行而是return null结束线程。(重点:线程是如何超时的???)
if (wc <= maximumPoolSize && !(timedOut && timed))
break;

// workerCount递减,结束当前thread
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
// 需要重新检查线程池状态,因为上述操作过程中线程池可能被SHUTDOWN
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}

try {
// 1.以指定的超时时间从队列中取任务
// 2.core thread没有超时
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;// 超时
} catch (InterruptedException retry) {
timedOut = false;// 线程被中断重试
}
}
}

processWorkerExit

       线程退出会执行这个方法做一些清理工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 正常的话再runWorker的getTask方法workerCount已经被减一了
if (completedAbruptly)
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 累加线程的completedTasks
completedTaskCount += w.completedTasks;
// 从线程池中移除超时或者出现异常的线程
workers.remove(w);
} finally {
mainLock.unlock();
}

// 尝试停止线程池
tryTerminate();

int c = ctl.get();
// runState为RUNNING或SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 线程不是异常结束
if (!completedAbruptly) {
// 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 线程池还不为空那就不用担心了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 1.线程异常退出
// 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理
addWorker(null, false);
}
}

tryTerminate

       processWorkerExit方法中会尝试调用tryTerminate来终止线程池。这个方法在任何可能导致线程池终止的动作后执行:比如减少wokerCount或SHUTDOWN状态下从队列中移除任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 以下状态直接返回:
// 1.线程池还处于RUNNING状态
// 2.SHUTDOWN状态但是任务队列非空
// 3.runState >= TIDYING 线程池已经停止了或在停止了
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;

// 只能是以下情形会继续下面的逻辑:结束线程池。
// 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了
// 2.STOP状态,当调用了shutdownNow方法

// workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
// 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
if (workerCountOf(c) != 0) { // Eligible to terminate
// runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
// ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。
interruptIdleWorkers(ONLY_ONE);
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 进入TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 子类重载:一些资源清理工作
terminated();
} finally {
// TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
// 继续awaitTermination
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

shutdown和shutdownNow

       shutdown这个方法会将runState置为SHUTDOWN,会终止所有空闲的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
advanceRunState(SHUTDOWN);
// 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
// tryTerminate方法中会保证队列中剩余的任务得到执行。
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

       shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止所有的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// STOP状态:不再接受新任务且不再执行队列中的任务。
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 返回队列中还没有被执行的任务。
tasks = drainQueue();
}
finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

       主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock,
// 因此保证了中断的肯定是空闲的线程。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
}
finally {
mainLock.unlock();
}
}

void interruptIfStarted() {
Thread t;
// 初始化时state == -1
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}

       这就是前面提到的Woker类实现AQS的主要作用。AQS这个类在并发包下的地位还是举足轻重的,大家可以在并发编程网找找相关资料,或者这篇

结语

       因为注释比较多,所以看起来比较无聊。核心方法就那么些,实现细节就是那些double check的部分。还有状态和数量的计算。
结语

坚持技术分享,您的支持将鼓励我继续创作!