JavaSE8 Goldへの道(Upgrade to Java SE 8 Programmer 1Z0-810 試験対策)11回目です。
一連の記事は「JavaSE8Gold」ラベルを付けていきます。
今回は並列ストリームです。
一連の記事は「JavaSE8Gold」ラベルを付けていきます。
今回は並列ストリームです。
並列ストリーム
ストリームには直列(sequential)と並列(parallel)があります。
これまで紹介してきたのは直列の方でした。(その5で少しだけ並列の方にも触れました)
直列ストリームは1つのスレッドで処理されます。対して並列ストリームではマルチスレッドで処理されます。
スレッドの管理は内部で自動的にやってくれるため、プログラマが意識する必要はありません。
ストリームAPIではデフォルトでは直列ストリームが生成されます。並列ストリームを使うには、生成後に
※
逆に
並列かどうかというのは内部的な状態で、型は
文末に載せている試験の参考サイトでは、以下のような例が載っています。
直列ストリームで1~10までの数値が入ったストリームを出力すると、当然ながら順番通りに出てきます。
これまで紹介してきたのは直列の方でした。(その5で少しだけ並列の方にも触れました)
直列ストリームは1つのスレッドで処理されます。対して並列ストリームではマルチスレッドで処理されます。
スレッドの管理は内部で自動的にやってくれるため、プログラマが意識する必要はありません。
ストリームAPIではデフォルトでは直列ストリームが生成されます。並列ストリームを使うには、生成後に
BaseStream#parallel
メソッドを呼ぶか、ストリーム生成時にCollection#parallelStream
メソッドを呼ぶことで生成できます。※
BaseStream
はStream
のスーパーインタフェースです。逆に
BaseStream#sequential
メソッドを呼ぶと並列ストリームから直列ストリームに変換できます。並列かどうかというのは内部的な状態で、型は
Stream
で変わらないのでコーディングは全く同じにできます。文末に載せている試験の参考サイトでは、以下のような例が載っています。
直列ストリームで1~10までの数値が入ったストリームを出力すると、当然ながら順番通りに出てきます。
System.out.print("sequential:"); IntStream.rangeClosed(1, 10) .forEach(i -> System.out.print(i + " "));
実行結果
sequential:1 2 3 4 5 6 7 8 9 10
parallel
メソッドで並列ストリームにすると、複数スレッドで処理されるため順序が不定になります。System.out.print("parallel:"); IntStream.rangeClosed(1, 10) .parallel() .forEach(i -> System.out.print(i + " "));
実行結果
parallel:7 6 8 3 9 10 5 4 2 1
スレッド名を出力させてみると、今使っている2コア2スレッドのPCでは4スレッドが動いているのがわかります。
IntStream.rangeClosed(1, 10) .parallel() .forEach(i -> System.out.println(Thread.currentThread().getName() + ":" + i));
実行結果
main:7
ForkJoinPool.commonPool-worker-2:3
ForkJoinPool.commonPool-worker-1:9
ForkJoinPool.commonPool-worker-2:5
main:6
ForkJoinPool.commonPool-worker-2:4
main:2
ForkJoinPool.commonPool-worker-1:10
ForkJoinPool.commonPool-worker-2:1
ForkJoinPool.commonPool-worker-3:8
ForkJoinPool.commonPool-worker-2:3
ForkJoinPool.commonPool-worker-1:9
ForkJoinPool.commonPool-worker-2:5
main:6
ForkJoinPool.commonPool-worker-2:4
main:2
ForkJoinPool.commonPool-worker-1:10
ForkJoinPool.commonPool-worker-2:1
ForkJoinPool.commonPool-worker-3:8
試験範囲としてはこれくらいのようですが、もう少し考察してみます。
どういうときに使うべきか
ストリームが内部で持つフラグ
当然ながら、全てのストリームを並列にすればいいというのは間違いです。CPUコアが1つの場合並列化しても意味ないですし、要素数が少ない・中間操作の処理負荷が低いと、並列化でスレッドを管理するオーバーヘッドの方が高くなってしまいます。
要素数がある程度多い、または中間操作が複雑で時間がかかるほど並列化の効果は上がると思いますが、実際のところ一度測ってみるのが確実です。
「この中間操作・終端操作は並列ストリームの方が良い」みたいなのがあればいいのですが、APIドキュメントに一部書いてあるものの網羅しきってはいません。
例えば
findFirst
とfindAny
という終端操作があります。どちらも基本は中間操作filter
と組み合わせて、条件を満たす最初の要素/いずれかの要素が見つかった時点で返します。「いずれか」ってなんだよという話ですが、
findFirst
の方は要素が現れる順番を意識していて、仮に並列ストリームであっても出てきた順番の中で最初の要素を返します。findAny
の方は順番を意識しないため、並列ストリームだと結果が不定になる代わりに、パフォーマンスが上がります。long st = System.currentTimeMillis(); OptionalDouble o = LongStream.range(1, 10_000_000_000L) .parallel() .mapToDouble(Math::sqrt) .filter(l -> l > 30000) .findAny(); // .findFirst(); System.out.println(System.currentTimeMillis() - st); System.out.println(o);
実行結果(findFirst)
3240
OptionalDouble[30000.000016666665]
→常に同じ結果
OptionalDouble[30000.000016666665]
→常に同じ結果
1056
OptionalDouble[37238.67277441558]
933
OptionalDouble[58356.28965244449]
918
OptionalDouble[49632.43804207083]
など
OptionalDouble[37238.67277441558]
933
OptionalDouble[58356.28965244449]
918
OptionalDouble[49632.43804207083]
など
APIドキュメントを眺めてみると、ストリームの性質として直列/並列以外に*順序付けされているか*があるのがわかります。
また速度も
上のコードを
また速度も
findAny
の方が早く終わっています。BaseStream#unorderd
メソッドを呼ぶと順序付けを解除することができるとあるほか、IntStream#range
は「順序付けされた」、IntStream#generate
は「順序付けされていない」ストリームを返すとあります。上のコードを
IntStream#generate
に置き換えてみると、long st = System.currentTimeMillis(); AtomicLong al = new AtomicLong(1); OptionalDouble o = LongStream.generate(() -> al.getAndAdd(1)) .parallel() .mapToDouble(Math::sqrt) .filter(l -> l > 30000) .findFirst(); System.out.println(o); System.out.println(System.currentTimeMillis() - st);
実行結果(
findFirst
、findAny
どちらもほぼ同じ)
18150
OptionalDouble[30000.0001]
15054
OptionalDouble[30000.000133333335]
15444
OptionalDouble[30000.00015]
OptionalDouble[30000.0001]
15054
OptionalDouble[30000.000133333335]
15444
OptionalDouble[30000.00015]
となり、
※ラムダ式から外部変数を参照する場合、finalもしくは実質的final(finalは付いていないけど変更されていない)である必要があります。またこの例ではマルチスレッドで
しかし、
findFirst
でも結果が不定でfindAny
を使っても速くなりませんでした。※ラムダ式から外部変数を参照する場合、finalもしくは実質的final(finalは付いていないけど変更されていない)である必要があります。またこの例ではマルチスレッドで
generate
に渡しているSupplier
が実行されるため、生成される値がおかしくならないようにjava.util.concurrent.atomic.AtomicLong
を使っています。しかし、
IntStream#range
と同じく「順序付けされた」ストリームを生成するIntStream#iterate
を使ってみると、OptionalDouble o = LongStream.iterate(1, l -> l + 1) //あとは同じ
実行結果(
findFirst
、findAny
どちらもほぼ同じ)
3147
OptionalDouble[30000.000016666665]
OptionalDouble[30000.000016666665]
と、常に同じ結果になりました。
ストリームAPIの実装はとても複雑で全部把握できていませんが、内部でいくつかの状態を持っているようです。
非公開で
ストリームAPIの実装はとても複雑で全部把握できていませんが、内部でいくつかの状態を持っているようです。
非公開で
StreamOpFlag
というenumがあり、これには以下の値が定義されています。- DISTINCT
- SORTED
- ORDERED
- SIZED
- SHORT_CIRCUIT
LongStream#range
とLongStream#iterate
を比べたとき、おそらく前者はサイズが決まっているからSIZEDが立っていそうです。peek
メソッドでスレッド名と値を出力してみると、LongStream#range
の方はスレッドに割り当てられる値が突然大きくなったりしており、複数スレッドに割り当てる要素をサイズに応じて変えてるように見えます。LongStream#iterate
の方はサイズがわからない無限ストリームなので、出力を見ると1024個ずつ割り振ってるようでした。そこで
LongStream#iterate
を使ったコードでfilter
の条件をもっと小さくしてみると、findAny
で結果がばらつくようになりました。終端操作の並列性
前回その10で紹介したCollectors#toMap
、Collectors#groupingBy
には並列化バージョンがあり、それぞれCollectors#toConcurrentMap
とCollectors#groupingByConcurrent
です。Map
の代わりにjava.util.concurrent.ConcurrentMap
を生成します。しかし、APIドキュメントを見ると「並列ストリームの場合Concurrentが付く方を使わなければならない」とは書いていません。
これは通常並列ストリームでは各スレッドごとにコンテナが用意され最後に集約されるのですが、
Map
を集約するには他のコレクションよりコストが高くなる可能性があるため、Concurrent付きの方ではConcurrentMap
を1つだけ生成して全スレッドが値を入れていくということのようです。現に
toList
やtoSet
にはConcurrentバージョンがありません。プログラマは、ストリームのソースの特性(
List
なのかSet
なのか、サイズが決まっているか、順序付けされているか)を考慮した上で、適切な終端操作を選んで並列ストリームを使う必要がありそうです。というわけで
後半は試験から外れましたが、並列ストリーム奥が深いです。いろいろ試してみて経験値を積む必要がありそうです。
今回も以下のサイトとGoldの通常試験の参考書を参考にしています。