Javaのforkjoinとparallel()について

  • このエントリーをはてなブックマークに追加

こんなコードで実験。
普通に考えると、Parallel()はJVM内で共通のPOOLを使い、使えない場合はcaller threadを使うようになる。
現に、ExecutorServiceを使う場合、そのような(commonのforkjoinとcaller threadを使う)挙動になる。

なのでparallel().foreachの中で重い処理を書くと全体に影響が出る。(もちろんcaller threadで実行されので処理が止まってしまうわけでは無い)

しかしながら、ForkJoinPoolを使った場合、全てのスレッドがForkJoinPoolで実行されるのである。なぜ共通のPoolが使われないのか?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//ExecutorService forkJoinPool = Executors.newFixedThreadPool(2);
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.execute(() -> {
Arrays.stream(new int[]{ 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20 })
.parallel()
.forEach(ii -> {
int r = ThreadLocalRandom.current().nextInt(5) + 2;
try {
TimeUnit.SECONDS.sleep(r);
} catch (Exception e) {}
System.out.printf("[%d]%s\t%s%n", r, new Date(), Thread.currentThread().getName());
});
});
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

ThreadPoolExecutorの場合

1
2
3
4
5
6
7
8
9
[3]Wed Oct 18 02:04:18 JST 2017	ForkJoinPool.commonPool-worker-3
[4]Wed Oct 18 02:04:19 JST 2017 ForkJoinPool.commonPool-worker-2
[5]Wed Oct 18 02:04:20 JST 2017 pool-1-thread-1
[5]Wed Oct 18 02:04:20 JST 2017 ForkJoinPool.commonPool-worker-1
[2]Wed Oct 18 02:04:20 JST 2017 ForkJoinPool.commonPool-worker-3
[3]Wed Oct 18 02:04:22 JST 2017 ForkJoinPool.commonPool-worker-2
[4]Wed Oct 18 02:04:24 JST 2017 pool-1-thread-1
[4]Wed Oct 18 02:04:24 JST 2017 ForkJoinPool.commonPool-worker-1
[3]Wed Oct 18 02:04:25 JST 2017 ForkJoinPool.commonPool-worker-2

ForkJoinPoolの場合

1
2
3
4
5
6
7
8
9
10
11
[2]Wed Oct 18 02:03:06 JST 2017	ForkJoinPool-1-worker-0
[2]Wed Oct 18 02:03:06 JST 2017 ForkJoinPool-1-worker-1
[3]Wed Oct 18 02:03:09 JST 2017 ForkJoinPool-1-worker-1
[4]Wed Oct 18 02:03:10 JST 2017 ForkJoinPool-1-worker-0
[2]Wed Oct 18 02:03:11 JST 2017 ForkJoinPool-1-worker-1
[2]Wed Oct 18 02:03:12 JST 2017 ForkJoinPool-1-worker-0
[3]Wed Oct 18 02:03:15 JST 2017 ForkJoinPool-1-worker-0
[4]Wed Oct 18 02:03:15 JST 2017 ForkJoinPool-1-worker-1
[2]Wed Oct 18 02:03:17 JST 2017 ForkJoinPool-1-worker-1
[3]Wed Oct 18 02:03:18 JST 2017 ForkJoinPool-1-worker-0
[4]Wed Oct 18 02:03:22 JST 2017 ForkJoinPool-1-worker-0

WHY..?

なぜだろうと思いソースを追いかけると、ForkJoinTaskで次のコードが。

1
2
3
4
5
6
7
8
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}

parallel()で使われるForkJoinPool.commonPoolもクラスはForkJoinWorkerThreadなので、ちょっと特別扱いされてるのかー。
ていうか、こんなん分かるわけないよ。

でも、これって実行した後の話だよね。。

“commonで実行しようと思ったけど、callerがforkjoinだからそっちで実行しよう” みたいな処理じゃないと辻褄が合わない気がする。他の場所にあるのかな?

ForkJoinTaskのこれか。

1
2
3
4
5
6
7
8
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
  • ForkJoinTaskは木構造を作っている(これを実行したらこれを実行するみたいなグラフ)
  • ForkJoinスレッドの中でForkJoin(parallel()も同様)を実行すると、この構造に連結されていく
  • ForkJoinPool.submit(stream.parallel().forEach) はこの構造を一気に構築する。
  • この構造上、途中に遅いヤツが居ると後続の処理は全て引きずられる
    • 待機列を一気に構築して、それぞれ処理する。並び替えは不可。というイメージ。
    • 時間が経過すると、一番遅いヤツに引きずられて空きWorkerだらけになる。
    • (スーパーのレジのイメージで、かつ100人の客が居たらいっきに100人分の列と順番を決めてしまう)
  • これに対してExecutorServiceは1つの行列(Queue)にタスクを積んで、空きWorkerができ次第順処理される
    • (コンビニのレジの列のイメージ)

この記事 の現象はまさにそれかなと思いました。

参考