倭マン's BLOG

くだらない日々の日記書いてます。 たまにプログラミング関連の記事書いてます。 書いてます。

蒐集してやんよ java.util.stream.Collectors クラス (4) - parallel ストリーム と concurrent コレクタ -

Java の Collectors クラスに定義されているメソッドを見ていくシリーズ(目次)。 Stream や Collector に関する記事をそこそこ書いてきて今さらいうのもアレですが、parallel ストリームと concurrent コレクタに関してちょっと(というか根本的に?)勘違いしてた気がするので、ちょっとここいらに関する記事を書いておくことに。

java.util.stream.Collectors クラスの static メソッドから取得できるコレクタのなかで特性 (characteristics) が CONCURRENT なのは toConcurrentMap() と groupingByConcurrent() の2つでした()。 これらのメソッドで返されるコレクタは CONCURRENT 特性を持ちますが、これらのコレクタを単に使うだけで並行処理をしてくれるワケではないようです(よく考えてみれば当たり前かな・・・)。

CONCURRENT 特性を持つコレクタは並行実行しても、したがって parallel ストリームに対して用いても返り値となるコンテナオブジェクトには同期化して処理が行われるってことだと思いますが、そうなると例えばストリームの要素をグループ化する際に sequential / parallel ストリームと groupingBy() / groupingByConcurrent() の組合せ、計4通りの実行方法があることになります。 以下でこれらを実際に試してみます。 ただし、以下のサンプルで次の DebugList クラスを用いています:

class DebugList<E> extends LinkedList<E>{
		
    @Override
    public boolean add(E e){
        // 「JRuby --> [Java, Jython]」のように --> で追加情報を表示
        System.out.printf("%20s --> %s\n", e, this);
        return super.add(e);
    }

    @Override
    public boolean addAll(Collection<? extends E> c){
        // 「[JRuby] ->> [Java, Jython]」のように ->> で追加情報を表示
        System.out.printf("%20s ->> %s\n", c, this);
        return super.addAll(c);
    }
}

add() と addAll() が呼ばれた時に、それぞれ「-->」, 「->>」で表示するようにしています。

Sequential ストリーム × groupingBy()
まずは完全に非並行な場合。 groupingBy() メソッドで、文字列の頭文字によるグループ化をしています。 ストリームの要素は頭文字が重なるように適当に選んでるだけです。

Stream<String> stream =
    Stream.of("Java", "Groovy", "Scala", "Clojure", "Jython", "JRuby",
              "Fantom", "Smalltalk", "C", "C++", "C#", "JavaScript",
              "Go", "F#", "Scheme", "COBOL", "FORTRAN");

Map<Character, List<String>> dic0 = stream.collect(
    groupingBy(s -> s.charAt(0), LinkedHashMap::new, toCollection(DebugList::new)));

System.out.println();

dic0.forEach((c, list) -> println(c + " : "+list));

Map の実装として LinkedHashMap を使っているのは要素を追加した順序を保つためです。 これを実行すると以下のように表示されるはずです:

                Java --> []
              Groovy --> []
               Scala --> []
             Clojure --> []
              Jython --> [Java]
               JRuby --> [Java, Jython]
              Fantom --> []
           Smalltalk --> [Scala]
                   C --> [Clojure]
                 C++ --> [Clojure, C]
                  C# --> [Clojure, C, C++]
          JavaScript --> [Java, Jython, JRuby]
                  Go --> [Groovy]
                  F# --> [Fantom]
              Scheme --> [Scala, Smalltalk]
               COBOL --> [Clojure, C, C++, C#]
             FORTRAN --> [Fantom, F#]

J : [Java, Jython, JRuby, JavaScript]
G : [Groovy, Go]
S : [Scala, Smalltalk, Scheme]
C : [Clojure, C, C++, C#, COBOL]
F : [Fantom, F#, FORTRAN]

ストリームの各要素に対する処理はストリームの順序に従って行われています("Java" で始まり "FORTRAN" で終わる)。 また、要素はすべて add() によって(コレクタで言うと accumulator によって)追加されています。 これはまぁ、そのままの処理って感じですね。

parallel ストリーム × groupingBy()
次は parallel ストリームに対して groupingBy() を使う場合。 これって何となくマズそうな組合せの気がするけど・・・

Stream<String> stream =
    Stream.of("Java", "Groovy", "Scala", "Clojure", "Jython", "JRuby",
              "Fantom", "Smalltalk", "C", "C++", "C#", "JavaScript",
              "Go", "F#", "Scheme", "COBOL", "FORTRAN");

Map<Character, List<String>> dic1 = stream.parallel().collect(
    groupingBy(s -> s.charAt(0), LinkedHashMap::new, toCollection(DebugList::new)));

System.out.println();

dic1.forEach((c, list) -> println(c + " : "+list));

stream.paralell() によって parallel ストリームを生成している部分以外は上記のサンプルと同じです。 これを実行すると

               COBOL --> []
              Fantom --> []
             FORTRAN --> []
               Scala --> []
              Scheme --> []
           Smalltalk --> []
             Clojure --> []
              Jython --> []
               JRuby --> [Jython]
                  C# --> []
                Java --> []
          JavaScript --> []
                   C --> []
              Groovy --> []
                 C++ --> [C]
     [Jython, JRuby] ->> [Java]
                [C#] ->> [C, C++]
         [Smalltalk] ->> [Scala]
                  Go --> []
                  F# --> []
           [FORTRAN] ->> [F#]
             [COBOL] ->> [C, C++, C#]
 [C, C++, C#, COBOL] ->> [Clojure]
        [JavaScript] ->> [Java, Jython, JRuby]
                [Go] ->> [Groovy]
       [F#, FORTRAN] ->> [Fantom]
            [Scheme] ->> [Scala, Smalltalk]

J : [Java, Jython, JRuby, JavaScript]
G : [Groovy, Go]
S : [Scala, Smalltalk, Scheme]
C : [Clojure, C, C++, C#, COBOL]
F : [Fantom, F#, FORTRAN]

少なくとも拙者の環境ではうまく動いてますね。 先ほどのサンプルと違って addAll() による(combiner による)要素の追加も行われています。 結果はキチンと先ほどのものと同じになっているので、これはこれで大丈夫っぽいですね。 マルチコアプロセッサでやったらダメだった、みたいな人いればご報告を。

sequential ストリーム × groupingByConcurrent()
次は sequential ストリームに CONCURRENT コレクタである groupingByConcurrent() を使う場合。 なんか無駄な同期があるだけで結果は上手くいくんじゃないかな?と予想。

Stream<String> stream =
    Stream.of("Java", "Groovy", "Scala", "Clojure", "Jython", "JRuby",
              "Fantom", "Smalltalk", "C", "C++", "C#", "JavaScript",
              "Go", "F#", "Scheme", "COBOL", "FORTRAN");

Map<Character, List<String>> dic2 = stream.collect(
    groupingByConcurrent(s -> s.charAt(0), toCollection(DebugList::new)));

System.out.println();

dic2.forEach((c, list) -> println(c + " : "+list));

ほとんど、最初のサンプルで groupingBy() の代わりに groupingByConcurrent() を使っているだけです。 返される ConcurrentMap の実装は指定していません。 実行結果は

                Java --> []
              Groovy --> []
               Scala --> []
             Clojure --> []
              Jython --> [Java]
               JRuby --> [Java, Jython]
              Fantom --> []
           Smalltalk --> [Scala]
                   C --> [Clojure]
                 C++ --> [Clojure, C]
                  C# --> [Clojure, C, C++]
          JavaScript --> [Java, Jython, JRuby]
                  Go --> [Groovy]
                  F# --> [Fantom]
              Scheme --> [Scala, Smalltalk]
               COBOL --> [Clojure, C, C++, C#]
             FORTRAN --> [Fantom, F#]

S : [Scala, Smalltalk, Scheme]
J : [Java, Jython, JRuby, JavaScript]
G : [Groovy, Go]
F : [Fantom, F#, FORTRAN]
C : [Clojure, C, C++, C#, COBOL]

まぁ sequential ストリームに対して処理を行っているので、処理される要素の順序は "Java" で始まり "FORTRAN" に終わる通常の順序。 ConcurrentMap の実装クラスで(LinkedHashMap のように)追加順序を保存するものはなさそうなので結果の Map はエントリの順序が整列されていませんが、各エントリの値であるリストの内部では順序が保たれてますね。

parallel ストリーム × groupingByConcurrent()
最後は parallel ストリームと groupingByConcurrent() の合わせ技。 サンプル・コード自体は問題ないかと:

Stream<String> stream =
    Stream.of("Java", "Groovy", "Scala", "Clojure", "Jython", "JRuby",
              "Fantom", "Smalltalk", "C", "C++", "C#", "JavaScript",
              "Go", "F#", "Scheme", "COBOL", "FORTRAN");

Map<Character, List<String>> dic3 = stream.parallel().collect(
    groupingByConcurrent(s -> s.charAt(0), toCollection(DebugList::new)));

System.out.println();

dic3.forEach((c, list) -> println(c + " : "+list));

これを実行すると

               COBOL --> []
              Fantom --> []
             FORTRAN --> [Fantom]
                  C# --> [COBOL]
           Smalltalk --> []
              Scheme --> [Smalltalk]
          JavaScript --> []
              Jython --> [JavaScript]
                   C --> [COBOL, C#]
               JRuby --> [JavaScript, Jython]
                 C++ --> [COBOL, C#, C]
               Scala --> [Smalltalk, Scheme]
             Clojure --> [COBOL, C#, C, C++]
                Java --> [JavaScript, Jython, JRuby]
                  Go --> []
              Groovy --> [Go]
                  F# --> [Fantom, FORTRAN]

C : [COBOL, C#, C, C++, Clojure]
G : [Go, Groovy]
S : [Smalltalk, Scheme, Scala]
J : [JavaScript, Jython, JRuby, Java]
F : [Fantom, FORTRAN, F#]

要素の処理順序は完全に変わってしまってますね。 ただしすべて add() による(accumulator による)追加だけしか行われていません。

さて、これら4つのサンプルをまとめると

  • 順序を保存したいなら sequential ストリーム
  • combiner を使いたくないなら groupingByConcurrent()

あと、(マルチコア環境での)パフォーマンスも重要なファクターになってくると思いますが、今日が終わってしまうんでこの辺で。 誰か試して~。

Java並行処理プログラミング ―その「基盤」と「最新API」を究める―

Java並行処理プログラミング ―その「基盤」と「最新API」を究める―