并发编程(一)——并发容器

Java多线程相关概要总结:#

  • 一致性
  • 线程相关:
    • 线程实现、Runnable、Callable、Future、CompletableFuture
    • golang纤程
  • 锁相关:
    • volatile实现
    • synchronized实现
    • AQS
    • ReentrantLock实现
    • CAS实现
  • 其他JUC同步工具:
    • CountDownAndLatch
    • Condition、await、signal: 同步队列
    • CyclicBarrier:
  • 同步容器:
    • Map、Set、List无锁
    • ConcurrentHashMap
    • ConcurrentSkipListSet
    • CopyOnWriteArrayList
    • CopyOnWriteArraySet
    • BlockingQueue
    • BlockingDeque
  • 线程池:
    • 俩接口Executors和ExecutorService
      • 基础实现ThreadPoolExecutor,和内置定义的newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool、ScheduledThreadPoolExecutor。所有线程都使用一个阻塞队列去排任务。
      • ForkJoinPool和newWorkStealingPool:每个线程一个队列。根据阈值拆分、RecursiveTask、RecursiveAction
  • 引用类型:
    • 强引用、软引用、弱引用、虚引用
  • Disruptor:
    • 目前单机性能最高的JVM内MQ(和BlockingQueue等比)
    • RingBuffer、EventFactory、Event、缓存行
  • JMH:
    • Java MicroBenchmark Harnes:微基准测试。
    • org.openjdk.jmh包,插件,@BenchMark @Warmup预热、@BenchmarkMode(Mode.Throughput)

多线程必要性#

  • 压榨机器资源,废话。

  • 服务隔离,高可用:首先一个JVM中的多个线程才能带来服务隔离,每一个线程完成自己的逻辑。如果一个线程从头走到尾,所有的业务都是自己,是做不到隔离的,崩溃就完蛋。多个可以做到降级,比如短信线程挂了,交易还能用。

并发容器介绍#

JDK的容器介绍#

大致分为CollectionMap两类,Collection下面又有传统的ListSet,以及更灵活的Queue

List和Set#

image-20200817195043660

更灵活的Queue#

image-20200817195442756

Queue有更多灵活的api,为多线程带来更友好的选择:

LinkedQueue是无界的,ArrayQueue是无界的。

  • offer满了返回false
  • put满了阻塞
  • add满了抛异常
  • clear
  • take阻塞

线程池#

Callable、Future等基础#

Callable#

Callable是一个和Runnable类似,只有一个带返回值的V callable()方法。而传统的Runnable的run()是没有返回值的。

Future#

一个获取线程返回值的接口

  • cancel
  • get
  • get(time)
  • isCancelled
  • isDone

FutureTask更灵活的选择#

可以理解为同时实现了Runnable和Future接口,既是一个运行者,也存储运行结果。

  • 既然实现了Runnable接口,就可以传入Thread中运行
  • 既然实现了Future接口,就可以获取运行返回值
  • 在后面的WorkStealingPool和ForkJoinPool中有大量应用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* FutureTask既是一个提交任务的task
* 也是一个获取结果的future(执行结果也存在自己里面)
*
* 他实现了RunnableFuture接口
* RunnableFuture接口实现了Runnable和Future接口
*
* 所以就既可以run 也可以作为future
*/
public class _02_FutureTask {

public static void main(String[] args) throws ExecutionException, InterruptedException {

// new FutureTask需传入一个callable,有返回值
FutureTask<Integer> futureTask = new FutureTask<>(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
System.out.println("使用一个thread传入futuretask,将其视为Runnable运行...");
new Thread(futureTask).start();
System.out.println("异步获取返回值...");
System.out.println(futureTask.get());
}
}

CompletableFuture 非常灵活#

可以用来管理多个Future的运行过程和运行结果。提供了对一堆任务的管理,比如需要N个任务同时完成join才能继续。

可以避免各种if等。

1
2
3
4
5
6
7
8
9
10
11
12
// 声明三个CompletableFuture
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> task1());
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> task2());
CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> task3());

// 需要等三个CompletableFuture都完成才能继续
CompletableFuture.allOf(completableFuture1,completableFuture2,completableFuture3).join();
// 从三个completableFuture里面拿出来结果,继续运行
//System.out.println(completableFuture3.get());

// 或者任意一个任务运行完毕,就获取运行的结果
System.out.println(CompletableFuture.anyOf(completableFuture1,completableFuture2,completableFuture3).get());

线程池#

分为ForkJoinPoolThreadPoolExecutor两大类。

  • ThreadPoolExecutor普通线程池

  • ForkJoinPool是任务分解的线程池

    image-20200818191735618

Executors#

执行器接口,可以允许我们把线程的定义和运行分开。

  • Executors里面提供了很多创建线程池的方法,包括一下方法:

    • 以下方法都是有问题的,不建议直接使用!需要使用手动7个参数方式创建

    • newFixedThreadPool(int nThreads):最大最小线程数量相等,可以并行

      • new ThreadPoolExecutor(nThreads, nThreads,   // 线程数固定
                                              0L, TimeUnit.MILLISECONDS,
                                              new LinkedBlockingQueue<Runnable>());  // 任务太多,OOM
        <!--code2-->
        
        
    • ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) // 线程数可能巨多,OOM

    • new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                    new DelayedWorkQueue());  // 使用了一个DelayedWorkQueue来完成调度
      <!--code3-->
      
      
    • ScheduledExecutorService newSingleThreadScheduledExecutor()

    • newWorkStealingPool(int parallelism) --------------1.8新增的工作窃取线程池

上面的线程池在并发大的时候就会有问题,比如FixedThreadPool会堆积任务、CachedThreadPool会堆积线程

ExecutorService#

  • ExecutorService里面定义了线程池的生命周期:
    • submit(Callable task);
    • submit(Runnable task);
    • submit(Runnable task, T result);
    • shutdown();
    • boolean isShutdown();
    • List<Future> invokeAll(Collection<? extends Callable> tasks) 给一堆任务,返回一堆Future

默认线程数设置#

  • 经验值+压力测试。

  • 一个经验公式如下:

    • N是Cpu核心数量,
    • U是比如期望当前线程池程序占用CPU用到50%就是0.5(还有其他线程池在运行)
    • W/C是等待时间和计算时间的壁纸。在几乎无IO的场景这个值趋近于0,在IO很多wait很多的场景很大。这个值很难估算。
  • 还是需要压力测试,调整。

    image-20200819001649200

如果CPU密集型,可以CPU核数+1,期望最高100%.

何时用CachedPool何时用FixPool#

  • Cache是任务不平稳时候采用,可以波谷节省资源
  • FixPool是任务数量产生平稳,没什么太大波动的时候采用,节省创建开销

自定义线程池 new ThreadPoolExecutor#

自定义创建线程池的必要性#

不允许自行手动new Thread而是使用线程池:

  • 这样可以减少线程创建和销毁开销,以及资源不足开销。否则容易造成过度创建线程,切换过度的问题。

不能使用Executors去创建内置线程池,要使用ThreadPoolExecutor创建。

  • Executors 返回的线程池对象的弊端如下:

    • 1) FixedThreadPool 和 SingleThreadPool:
      • 使用了LinkedBlockingQueue,允许的请求任务队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
    • 2) CachedThreadPool:
      • 允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM

自定义线程池的7个参数#

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,                          // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler) // 拒绝策略

任务队列BlokingQueue#

  • 各种阻塞队列去做任务队列。
    • ArrayBlockingQueue是有界的,
    • LinkedBlockingQueue最大是Integer.MaxValue
    • TransferQueue 1长度
    • SynchousQueue 0长度

线程工厂ThreadFactory#

// 根据runnable去产生thread的工厂。
// 默认这个threadFactory里面产生线程指定了group、线程名(前缀+增量整数)、指定不是守护线程、普通优先级。
Executors.defaultThreadFactory(),

阿里推荐类似这样创建线程工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class UserThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
// 定义线程组名称,在 jstack 问题排查时,非常有帮助
UserThreadFactory(String whatFeaturOfGroup) {
// 指定一个线程工厂
namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
}


// 创建线程
@Override
public Thread newThread(Runnable task) {
String name = namePrefix + nextId.getAndIncrement();
// group runnable name 新线程所需的堆栈大小或零以指示该参数是被忽略
Thread thread = new Thread(null, task, name, 0, false);
System.out.println(thread.getName());
return thread;
}
}

拒绝策略#

当线程池到达最大线程数,且任务队列满的时候执行的操作(或者线程池已经关闭也会执行)。默认有四种,但是生产都不用。

  • 接口 RejectedExecutionHandler 决绝执行拦截器,只有一个方法void rejectedExecution(Runnable r, ThreadPoolExecutor executor),被ThreadPoolExecutor在任务队列也满了的时候执行。

  • DiscardPolicy() 啥也没干的空实现,悄悄丢掉当前任务

  • ThreadPoolExecutor.DiscardOldestPolicy(); ------丢弃掉最老的,游戏服务丢掉最老的位置信息。

  • AbortPolicy() 抛了一个异常,调用者去处理异常。-----------------------默认策略,抛出RejectedExecutionException

  • CallerRunsPolicy(),当前调用者来运行(谁提交的任务,就谁来运行)

  • 一般生产上面的一个都不用,自定义,将在RejectedExecutionHandler的实现策略中将任务保存在kafka、redis、mq、数据库中,并打印日志,以备后续再来读取执行。

  • 然后考虑调节线程大小、任务队列大小。再不行的话就要加机器。

ThreadPoolExecutor源码#

  • 内部使用了一个原子的int来保存控制状态 AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    • 前3位是状态,后面29位是线程数量。
    • 使用一个int去存放两个值可以减少加锁次数。

主要基础成员变量#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadPoolExecutor extends AbstractExecutorService {
// 前3位是状态,后面29位是线程数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 减去三位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 1左移29位再-1 就是 最大2^29-1个线程,是全1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 线程状态,高3位存放,默认RUNNING
private static final int RUNNING = -1 << COUNT_BITS; // 正常运行
private static final int SHUTDOWN = 0 << COUNT_BITS; // 调用shutdown方法后
private static final int STOP = 1 << COUNT_BITS; // shutDownNow后
private static final int TIDYING = 2 << COUNT_BITS; // 正在结束
private static final int TERMINATED = 3 << COUNT_BITS; // 线程全都结束了


// Packing and unpacking ctl
// 获取state,按位与,与全0计算,低位全0. 只剩高位了。
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程数量,按位与,高3位全变0
private static int workerCountOf(int c) { return c & CAPACITY; }
// 合并数量和状态
private static int ctlOf(int rs, int wc) { return rs | wc; }

构造方法#

七个参数,见上一节

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

执行任务execute方法#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 获取状态,看看低位worker数量是否够用,小于核心线程数直接添加创建
if (workerCountOf(c) < corePoolSize) { // 判断没超核心线程数
if (addWorker(command, true)) // 1. 添加核心线程(true),并执行这个任务,成功程返回
return;
c = ctl.get(); // 没成功,说明ctl并发的时候变化了,重新获取一次
}
if (isRunning(c) && workQueue.offer(command)) { // 2. 再次判断运行中(并发) ,并尝试丢到队列里成功
int recheck = ctl.get(); // 再次检查(并发)
if (! isRunning(recheck) && remove(command)) // 如果停止了,就删除任务,拒绝策略处理
reject(command);
else if (workerCountOf(recheck) == 0) // 3.如果worker为空就创建非核心线程(false)
addWorker(null, false);
}
// 3.未运行,或者队列没有位置。尝试启动非核心线程(false)。添加失败就拒绝策略处理
// 此时若线程池不是运行状态时候,addWorker也会判断,必然会失败
else if (!addWorker(command, false))
reject(command);
}

添加任务addWorker源码#

使用了两层自旋锁,外层去把worker的数量+1

内层添加到worker中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { // 外层自旋,worker数量+1
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) { // 内层自旋,worker添加
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) // 添加worker数量
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}


// 真正添加线程的地方
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建一个Worker,内部有一个线程
final Thread t = w.thread;
if (t != null) {
// 加锁,因为下面要往hashset里面塞新的worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); // 再次判断线程池状态(并发),此时加锁了是可靠的

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable 线程是正常可启动的
throw new IllegalThreadStateException();
workers.add(w); // 给hashset里面添加这个新的worker
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; // 标志位
}
} finally {
mainLock.unlock(); // 解锁
}
if (workerAdded) { // 标志位添加成功,启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 启动失败,要去hashset里面remove
}
return workerStarted;
}

运行任务,Worker的run方法委托来的#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 自旋,拿一个firstTask初始任务,或者从任务队列里阻塞地拿一个
while (task != null || (task = getTask()) != null) {
w.lock(); // 加锁,CAS改状态操作
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行前的拓展点操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 执行runable的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行完毕的拓展点
afterExecute(task, thrown);
}
} finally {
task = null; // gc
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 加数量,从HashSet里面删除worker
processWorkerExit(w, completedAbruptly);
}
}

内部类Worker的源码(继承了AQS、实现Runnable)#

  • Worker继承了 AQS,说明 Worker 本身是个锁,而且在tryAcquire以及其他对AQS方法的实现,都说明了它不支持重入。因为参数都写死为1,如果是重入功能的锁的话,会支持累加。(而且实现都是很简单的CAS修改一个state)
  • 因为很多个任务都要等着往Worker提交,所以需要AQS来支持同步,自身变成一个锁。
  • 实现了Runnable,把run方法委托给外面线程池ExecutorService的runWorker(w)去执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
// 包了一个线程
final Thread thread;


// 一个初始任务
Runnable firstTask;
// 运行数量
volatile long completedTasks;

// ThreadFactory 创建线程,通过给定的任务(有可能为空)来创建初始化,初始化时会创建一条线程进行绑定
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

// 委派给线程池的runWorker方法
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

protected boolean isHeldExclusively() {
return getState() != 0;
}

// 尝试CAS获取一次锁,很简单一个CAS操作
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// 这里是1,不支持重入
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

WorkStealingPool/ForkJoinPool#

先看一眼类图的关系:

image-20200820013537597

常规的ThreadPoolExe

创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法来执行指定任务了。cutor是所有线程共享一个任务队列,WorkStealingPool底层是ForkJoinPool。它每个线程都有独自的一个任务队列。

当任务完成后会去其他线程的任务队列中窃取任务回来执行。

  • ForkJoinPool是适合把大任务切分为小任务的线程池。可以支持把一个任务拆分到很多线程中执行,执行完还可以汇总结果。

push和pop不用锁,poll偷的时候需要锁。

image-20200819171421891

Fork/Join线程池#

  • 抽象类ForkJoinTask是核心类,这个类实现了Future,可以获取值
  • RecursiveTask<V>ForkJoinTask的一个子类,带返回参数。
  • 还有一个类似的RecursiveAction是没有返回值的,只划分子任务运行,不返回RecursiveAction
  • 都需要定义一个构造方法(setter也行),去设置当前任务所需的参数
  • 都是需要重写compute()方法,并分解当前任务的参数到子任务中,然后获取子任务的值。
  • 创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法来执行指定任务了。前者异步future,后者同步。

两个主要方法:

  • fork()

fork()方法类似于线程的Thread.start()方法,但是它不是真的启动一个线程,而是将任务放入到工作队列中。

  • join()

join()方法类似于线程的Thread.join()方法,但是它不是简单地阻塞线程,而是利用工作线程运行其它任务。当一个工作线程中调用了join()方法,它将处理其它任务,直到注意到目标子任务已经完成了。

详细分析:

  • 它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
  • ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。
  • 这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。
  • 比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。
  • 那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
  • 所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
  • 以上程序的关键是fork()和join()方法。在ForkJoinPool使用的线程中,会使用一个内部队列来对需要执行的任务以及子任务进行操作来保证它们的执行顺序。

那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?

  • 首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。
  • 但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

ps:ForkJoinPool在执行过程中,会创建大量的子任务,导致GC进行垃圾回收,这些是需要注意的。

第一个例子,Fibonacci数列#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
  /**
* RecursiveTask 是ForJoinTask的一个子类,带返回参数
* 还有一个类似的RecursiveAction是没有返回值的,只划分子任务运行,不返回RecursiveAction
*
* 都是需要重写execute方法
*/

static class MyFibonacciForJoinTask extends RecursiveTask<Integer>{

int n;

public MyFibonacciForJoinTask(int n){
this.n = n;
}

@Override
protected Integer compute() {
if(n > 2){
// 拆分子任务
// f(n-1)
MyFibonacciForJoinTask joinTask1 = new MyFibonacciForJoinTask(n - 1);
// f(n-2)
MyFibonacciForJoinTask joinTask2 = new MyFibonacciForJoinTask(n - 2);
//获取子任务结果
//return f(n-1) + f(n-2)

// 此处下面直接fork是错误的,会浪费当前线程,去委派。而不是先用当前线程运行一个任务,剩余的再委派
// 这两行打开,下面的invokeAll注释,会耗时飙升
// https://www.liaoxuefeng.com/article/1146802219354112
// joinTask1.fork();
// joinTask2.fork();
// invokeAll 同步执行所有的task,下面可以直接get,和join等价的
invokeAll(joinTask1,joinTask2);


//也可以如下方式节省线程调度,使用当前线程计算
// joinTask1.fork();
// // 第二个任务使用当先线程完成
// Integer compute2 = joinTask2.compute();
// Integer compute1 = joinTask1.join();
// return compute1 + compute2;


return joinTask1.join() + joinTask2.join();
}else{
return n-1; //f(1)= 0 f(2) = 1, f(3) = 1
}
}
}

/**
* 传统递归模式
* @param n
* @return
*/
private int getFibonacciRec(int n){
if(n > 2){
return getFibonacciRec(n-1) + getFibonacciRec(n-2);
}else{
return n-1;
}
}

/**
* ForkJoinPool,适合可以进行任务拆分合并的场合
* @throws IOException
*/
@Test
public void testForkJoinPool() {

int n = 40;

long t1 = System.currentTimeMillis();
System.out.println(getFibonacciRec(n));
long t2 = System.currentTimeMillis();
// forkjoin耗时更长。。。。
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()-1);
ForkJoinTask<Integer> submit = forkJoinPool.submit(new MyFibonacciForJoinTask(n)); // submit异步提交
System.out.println(submit.join());
long t3 = System.currentTimeMillis();

System.out.println(t2-t1);

System.out.println(t3-t2);
}

另一个例子,求[a,b]区间所有整数的sum#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 计算[a,b]之间所有整数和的任务
*/
static class MySumForkJoin extends RecursiveTask<Integer>{

private final int start;
private final int end;

public MySumForkJoin(int start, int end){
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
// 这里的阈值也可以大一点,比如相隔100个数就直接加,更大就继续拆。具体看业务
if(end - start > 1){
int middle = start+(end-start)/2;
System.out.println(String.format("【%d,%d】 【%d,%d】", start, middle, middle + 1, end));
MySumForkJoin sumForkJoin1 = new MySumForkJoin(start, middle);
MySumForkJoin sumForkJoin2 = new MySumForkJoin(middle + 1, end);
//执行子任务
// sumForkJoin1.fork();
// sumForkJoin2.fork();
invokeAll(sumForkJoin1,sumForkJoin2);
//获取子任务结果
return sumForkJoin1.join()+sumForkJoin2.join();
}else if(end > start){
return end + start;
}else{
// ==
return end;
}
}
}

@Test
public void testForkoinPool2() {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(new MySumForkJoin(2, 10));
System.out.println(forkJoinTask.join());
}

经典面试题QA#

  • 加入提供了一个闹钟服务,比如7:00叫醒,订阅的人很多如10亿。如何优化?

  • 并行parallel和并发concurrent:

    • 并发是任务提交,(如很多任务同时涌过来)
    • 并行是任务执行。(如多个任务真的在多核同时处理)
    • 并行是并发的子集。

基准测试工具JMH#

Java Micro-Benchmark

注意事项#

  • ThreadLocal 变量必须在finally中remove。