Hexo

点滴积累 豁达处之

0%

线程池源码分析(java8)

java线程的创建、销毁和线程减切换是一件比较耗费计算机资源的事。

如果我们需要用多线程处理任务,并频繁的创建、销毁线程会造成计算机资源的无端浪费,因此出现了线程池技术。

1 线程池类图

executor类图

概括一下:

  • Executor是最基础的执行接口;
  • ExecutorService接口继承了Executor在其上做了一些shutdown()submit()的扩展可以说是真正的线程池接口;
  • AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;
  • TheadPoolExecutor继承了AbstractExecutorService,是线程池的具体实现;
  • ScheduledExecutorService接口继承了ExecutorService接口,提供了带”周期执行”功能ExecutorService;
  • ScheduledThreadPoolExecutor既继承了TheadPoolExecutor线程池也实现了ScheduledExecutorService接口,是带”周期执行”功能的线程池;
  • Executors是线程池的静态工厂,其提供了快捷创建线程池的静态方法。

从图中可以看出Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为

1,execute(Runnable command):履行Ruannable类型的任务,

2,submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象

3,shutdown():在完成已提交的任务后封闭办事,不再接管新任务,

4,shutdownNow():停止所有正在履行的任务并封闭办事。

5,isTerminated():测试是否所有任务都履行完毕了。

6,isShutdown():测试是否该ExecutorService已被关闭。

线程池的使用

线程池的创建

线程池的创建可以通过创建 ThreadPoolExecutor 对象或者调用 Executors 的工厂方法来创建线程池。但是在阿里巴巴的java开发手册中提到:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:

允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM

ThreadPoolExecutor

因此先看一下怎么通过创建 ThreadPoolExecutor 对象来创建一个线程池。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

这是 ThreadPoolExecutor 的构造方法,其中的参数含义如下:

  • corePoolSize:核心线程池大小, 当新的任务到线程池后,线程池会创建新的线程(即使有空闲线程),直到核心线程池已满。
  • maximumPoolSize:最大线程池大小,顾名思义,线程池能创建的线程的最大数目
  • keepAliveTime:程池的工作线程空闲后,保持存活的时间
  • TimeUnit: 时间单位
  • BlockingQueue:用来储存等待执行任务的队列
  • threadFactory:线程工厂
  • RejectedExecutionHandler: 当队列和线程池都满了时拒绝任务的策略

重要参数的说明:

  • corePoolSize 和 maximumPoolSize

    默认情况下线程中的线程初始时为 0, 当有新的任务到来时才会创建新线程,当线程数目到达 corePoolSize 的数量时,新的任务会被缓存到 workQueue 队列中。如果不断有新的任务到来,队列也满了的话,线程池会再新建线程直到总的线程数目达到 maximumPoolSize。如果还有新的任务到来,则要根据 handler 对新的任务进行相应拒绝处理。

  • BlockingQueue

    一个阻塞队列,用来存储等待执行的任务,常用的有如下几种:

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
  • RejectedExecutionHandler

    当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。有下面四种JDK提供的策略:

    1. AbortPolicy,表示无法处理新任务时抛出异常, 默认策略
    2. CallerRunsPolicy:用调用者所在线程来运行任务。
    3. DiscardOldestPolicy: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
    4. DiscardPolicy:不处理,丢弃掉

    除了这些JDK提供的策略外,还可以自己实现 RejectedExecutionHandler 接口定义策略。

创建线程池示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CreateThreadPool {
public static void main(String args[]) {
//不建议的做法
ExecutorService executorService = Executors.newFixedThreadPool(2);

//使用 guava 开源框架的 ThreadFactoryBuilder 给线程池的线程设置名字
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-thread-%d").build();

ExecutorService pool = new ThreadPoolExecutor(4, 10, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(256),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());

pool.execute(() -> System.out.println(Thread.currentThread().getName()));
pool.execute(() -> System.out.println(Thread.currentThread().getName()));
pool.execute(() -> System.out.println(Thread.currentThread().getName()));
pool.shutdown();
}
}

//输出:
demo-thread-0
demo-thread-1
demo-thread-2

线程池状态

线程池重点属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

​ ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

ctl相关方法

private static int runStateOf(int c) { return c & ~CAPACITY; }

private static int workerCountOf(int c) { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

  • runStateOf:获取运行状态;
  • workerCountOf:获取活动线程数;
  • ctlOf:获取运行状态和活动线程数的值。

线程池存在5种状态

RUNNING = -1 << COUNT_BITS; //高3位为111

SHUTDOWN = 0 << COUNT_BITS; //高3位为000

STOP = 1 << COUNT_BITS; //高3位为001

TIDYING = 2 << COUNT_BITS; //高3位为010

TERMINATED = 3 << COUNT_BITS; //高3位为011

1、RUNNING

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。

(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

2、 SHUTDOWN

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。

(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

3、STOP

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

4、TIDYING

(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。

(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

5、 TERMINATED

(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。

(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

进入TERMINATED的条件如下:

  • 线程池不是RUNNING状态;
  • 线程池状态不是TIDYING状态或TERMINATED状态;
  • 如果线程池状态是SHUTDOWN并且workerQueue为空;
  • workerCount为0;
  • 设置TIDYING状态成功

juc_executor04

线程池的种类

SingleThreadExecutor

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。

此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

1
2
3
4
5
6
7
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

FixedThreadPool

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。

线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

1
2
3
4
5
6
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
});
}
}
}

返回结果:
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....

CachedThreadPool

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,
那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。

此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小, 极端情况下会因为创建过多线程而耗尽系统资源

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

这里虽然指定 maximumPool 为 Integer.MAX_VALUE,但没什么意义,如果不能满足任务执行需求,CachedThreadPool 还会继续创建新的线程。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
for (int i = 0; i < 10; i++) {
// 延迟10秒执行任务,如果想要执行周期性的任务可以用下面的方式,每秒执行一次
pool.schedule(() -> {
System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
}, 10, TimeUnit.SECONDS);
}
}
}

ScheduledThreadPool

主要用来在给定的延迟之后运行任务,或者定期执行任务。

1
2
3
4
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

newWorkStealingPool

newWorkStealingPool 是jdk1.8才有的,会根据所需的并行层次来动态创建和关闭线程,通过使用多个队列减少竞争,底层用的 ForkJoinPool来实现的。ForkJoinPool 的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

public static void main(String[] args) throws Exception {
// 设置并行级别为2,即默认每时每刻只有2个线程同时执行
ExecutorService m = Executors.newWorkStealingPool(2);
for (int i = 1; i <= 10; i++) {
final int count=i;
m.submit(new Runnable() {
@Override
public void run() {
Date now=new Date();
System.out.println("线程" + Thread.currentThread() + "完成任务:"
+ count+" 时间为:"+ now.getSeconds());
try {
Thread.sleep(1000);//此任务耗时1s
} catch (InterruptedException e) {
e.printStackTrace();
}
}

});

}
while(true){
//主线程陷入死循环,来观察结果,否则是看不到结果的
}
}
}

返回结果:
线程Thread[ForkJoinPool-1-worker-1,5,main]完成任务:1 时间为:7
线程Thread[ForkJoinPool-1-worker-0,5,main]完成任务:2 时间为:7
线程Thread[ForkJoinPool-1-worker-1,5,main]完成任务:3 时间为:8
线程Thread[ForkJoinPool-1-worker-0,5,main]完成任务:4 时间为:8
线程Thread[ForkJoinPool-1-worker-1,5,main]完成任务:5 时间为:9
线程Thread[ForkJoinPool-1-worker-0,5,main]完成任务:6 时间为:9
线程Thread[ForkJoinPool-1-worker-1,5,main]完成任务:7 时间为:10
线程Thread[ForkJoinPool-1-worker-0,5,main]完成任务:8 时间为:10
线程Thread[ForkJoinPool-1-worker-1,5,main]完成任务:9 时间为:11
线程Thread[ForkJoinPool-1-worker-0,5,main]完成任务:10 时间为:11

线程池的拒绝策略

当请求任务不断的过来,而系统此时又处理不过来的时候,我们需要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。

  • AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。

    1
    2
    3
    4
    5
    6
    7
    8
    public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
    " rejected from " +
    e.toString());
    }
    }
  • CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。

    1
    2
    3
    4
    5
    6
    7
    8
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
    r.run();
    }
    }
    }
  • DiscardOleddestPolicy策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
    e.getQueue().poll();
    e.execute(r);
    }
    }
    }
  • DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。

    1
    2
    3
    4
    5
    public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
    }

除了JDK默认提供的四种拒绝策略,我们可以根据自己的业务需求去自定义拒绝策略,自定义的方式很简单,直接实现RejectedExecutionHandler接口即可

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package org.springframework.integration.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class CallerBlocksPolicy implements RejectedExecutionHandler {
private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class);
private final long maxWait;
/**
* @param maxWait The maximum time to wait for a queue slot to be
* available, in milliseconds.
*/
public CallerBlocksPolicy(long maxWait) {
this.maxWait = maxWait;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
BlockingQueue<Runnable> queue = executor.getQueue();
if (logger.isDebugEnabled()) {
logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds");
}
if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {
throw new RejectedExecutionException("Max wait time expired to queue task");
}
if (logger.isDebugEnabled()) {
logger.debug("Task execution queued");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Interrupted", e);
}
}
else {
throw new RejectedExecutionException("Executor has been shut down");
}
}
}
1
2
3
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 100, 10, TimeUnit.SECONDS, workQueue, new CallerBlocksPolicy());

execute和submit的区别

在前面的讲解中,我们执行任务是用的execute方法,除了execute方法,还有一个submit方法也可以执行我们提交的任务。

execute

execute适用于不需要关注返回值的场景,只需要将线程丢到线程池中去执行就可以了

1
2
3
4
5
6
7
8
public class ThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
});
}
}

submit

submit方法适用于需要关注返回值的场景,submit方法的定义如下:

1
2
3
4
5
6
7
public interface ExecutorService extends Executor {
  ...
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  Future<?> submit(Runnable task);
  ...
}

其子类AbstractExecutorService实现了submit方法,可以看到无论参数是Callable还是Runnable,最终都会被封装成RunnableFuture,然后再调用execute执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

下面我们来看看这三个方法分别如何去使用:

  • submit(Callable task);

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class ThreadPool {
    public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(10);
    Future<String> future = pool.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
    return "Hello";
    }
    });
    String result = future.get();
    System.out.println(result);
    }
    }
  • submit(Runnable task, T result);

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class ThreadPool {
    public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(10);
    Data data = new Data();
    Future<Data> future = pool.submit(new MyRunnable(data), data);
    String result = future.get().getName();
    System.out.println(result);
    }
    }
    class Data {
    String name;
    public String getName() {
    return name;
    }
    public void setName(String name) {
    this.name = name;
    }
    }
    class MyRunnable implements Runnable {
    private Data data;
    public MyRunnable(Data data) {
    this.data = data;
    }
    @Override
    public void run() {
    data.setName("yinjihuan");
    }
    }
  • Future submit(Runnable task);

    1
    直接submit一个Runnable是拿不到返回值的,返回值就是null.

线程池的关闭

关闭线程池可以调用shutdownNow和shutdown两个方法来实现

shutdownNow:对正在执行的任务全部发出interrupt(),停止执行,对还未开始执行的任务全部取消,并且返回还没开始的任务列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ThreadPool {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(1);
for (int i = 0; i < 5; i++) {
System.err.println(i);
pool.execute(() -> {
try {
Thread.sleep(30000);
System.out.println("--");
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(1000);
List<Runnable> runs = pool.shutdownNow();
}
}

上面的代码模拟了立即取消的场景,往线程池里添加5个线程任务,然后sleep一段时间,线程池只有一个线程,如果此时调用shutdownNow后应该需要中断一个正在执行的任务和返回4个还未执行的任务,控制台输出下面的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
0
1
2
3
4
[fs.ThreadPool$$Lambda$1/990368553@682a0b20,
fs.ThreadPool$$Lambda$1/990368553@682a0b20,
fs.ThreadPool$$Lambda$1/990368553@682a0b20,
fs.ThreadPool$$Lambda$1/990368553@682a0b20]
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at fs.ThreadPool.lambda$0(ThreadPool.java:15)
at fs.ThreadPool$$Lambda$1/990368553.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

shutdown:当我们调用shutdown后,线程池将不再接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ThreadPool {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(1);
for (int i = 0; i < 5; i++) {
System.err.println(i);
pool.execute(() -> {
try {
Thread.sleep(30000);
System.out.println("--");
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(1000);
pool.shutdown();
pool.execute(() -> {
try {
Thread.sleep(30000);
System.out.println("--");
} catch (Exception e) {
e.printStackTrace();
}
});
}
}

上面的代码模拟了正在运行的状态,然后调用shutdown,接着再往里面添加任务,肯定是拒绝添加的,请看输出结果:

1
2
3
4
5
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task fs.ThreadPool$$Lambda$2/1747585824@3d075dc0 rejected from java.util.concurrent.ThreadPoolExecutor@214c265e[Shutting down, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at fs.ThreadPool.main(ThreadPool.java:24)

还有一些业务场景下需要知道线程池中的任务是否全部执行完成,当我们关闭线程池之后,可以用isTerminated来判断所有的线程是否执行完成,千万不要用isShutdown,isShutdown只是返回你是否调用过shutdown的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ThreadPool {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(1);
for (int i = 0; i < 5; i++) {
System.err.println(i);
pool.execute(() -> {
try {
Thread.sleep(3000);
System.out.println("--");
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(1000);
pool.shutdown();
while(true){
if(pool.isTerminated()){
System.out.println("所有的子线程都结束了!");
break;
}
Thread.sleep(1000);
}
}
}

线程池监控

public long getTaskCount() //线程池已执行与未执行的任务总数

public long getCompletedTaskCount() //已完成的任务数

public int getPoolSize() //线程池当前的线程数

public int getActiveCount() //线程池中正在执行任务的线程数量

juc_executor01

源码分析

execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* clt记录着runState和workerCount
*/
int c = ctl.get();
/*
* workerCountOf方法取出低29位的值,表示当前活动的线程数;
* 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
* 并把任务添加到该线程中。
*/
if (workerCountOf(c) < corePoolSize) {
/*
* addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
* 如果为true,根据corePoolSize来判断;
* 如果为false,则根据maximumPoolSize来判断
*/
if (addWorker(command, true))
return;
/*
* 如果添加失败,则重新获取ctl值
*/
c = ctl.get();
}
/*
* 如果当前线程池是运行状态并且任务添加到队列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl值
int recheck = ctl.get();
// 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
// 这时需要移除该command
// 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
* 这里传入的参数表示:
* 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
* 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
* 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}

简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:

  • 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
  • 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  • 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  • 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。

execute方法执行流程如下:

juc_executor02

addWorker方法

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);
/*
* 这个if判断
* 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
* 接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的情况
* 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false,
* 因为队列中已经没有任务了,不需要再添加线程了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
// 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
// 如果为false则根据maximumPoolSize来比较。
//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失败,则重新获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建Worker对象
w = new Worker(firstTask);
// 每一个Worker对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Worker类

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,请参见JDK源码。

Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。

在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。

Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

  • lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
  • 如果正在执行任务,则不应该中断线程;
  • 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
  • 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  • 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。

此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:

1
2
3
4
5
6
7
8
protected boolean tryAcquire(int unused) {
//cas修改state,不可重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。

正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。

runWorker方法

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
// 允许中断
w.unlock(); // allow interrupts
// 是否因为异常退出循环
boolean completedAbruptly = true;
try {
// 如果task为空,则通过getTask来获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

这里说明一下第一个if判断,目的是:

  • 如果线程池正在停止,那么要保证当前线程是中断状态;
  • 如果不是的话,则要保证当前线程不是中断状态;

这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP,回顾一下STOP状态:

不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。

STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()方法会复位中断的状态。

总结一下runWorker方法的执行过程:

  1. while循环不断地通过getTask()方法获取任务;
  2. getTask()方法从阻塞队列中取任务;
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  4. 调用task.run()执行任务;
  5. 如果task为null则跳出循环,执行processWorkerExit()方法;
  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。

completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。

getTask方法

getTask方法用来从阻塞队列中取任务,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private Runnable getTask() {
// timeOut变量的值表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
* 1. rs >= STOP,线程池是否正在stop;
* 2. 阻塞队列是否为空。
* 如果以上条件满足,则将workerCount减1并返回null。
* 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

/*
* wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
* 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
* 如果减1失败,则返回重试。
* 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
* 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null,说明已经超时,timedOut设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
timedOut = false;
}
}
}

这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。

什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。

​ getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。

processWorkerExit方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
// 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成的任务数
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
/*
* 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:

juc_executor03

总结

  • 分析了线程的创建,任务的提交,状态的转换以及线程池的关闭;
  • 这里通过execute方法来展开线程池的工作流程,execute方法通过corePoolSize,maximumPoolSize以及阻塞队列的大小来判断决定传入的任务应该被立即执行,还是应该添加到阻塞队列中,还是应该拒绝任务。
  • 介绍了线程池关闭时的过程,也分析了shutdown方法与getTask方法存在竞态条件;
  • 在获取任务时,要通过线程池的状态来判断应该结束工作线程还是阻塞线程等待新的任务,也解释了为什么关闭线程池时要中断工作线程以及为什么每一个worker都需要lock。

参考链接