倭マン's BLOG

くだらない日々の日記書いてます。 たまにプログラミング関連の記事書いてます。 書いてます。

小川のせせらぎもやがてはうねる奔流に Stream インターフェース (5) - intermediate operators part 2 -

Java の Stream インターフェースに定義されているメソッドを見ていくシリーズ(目次)。 今回は前回に引き続き intermediate operator (中間演算子)を見ていきます。 今回扱うのはストリームの要素はそのままで、その種類を変更する中間演算子です。 ストリームの種類とは逐次 (sequential)並行 (parallel) か、整列されているか (sorted) されていないか (unordered) です。 ただし、整列された Stream オブジェクトを明示的に返す sorted() メソッドは状態を保有する演算子のため、次回に扱います。

今回見ていくメソッドは以下のも:

S sequential()
S parallel()
S unordered()
boolean isParallel()

ここで S は Stream<T> クラスに対しては同じ Stream<T> (のはず)です。 この S は java.util.stream パッケージに定義されているパッケージ・プライベートなインターフェース BaseStream に型パラメータとして指定されています(こちら参照)。 Java8 で言語拡張されたのでないなら、単なるバグでしょうね。 プログラムは普通にコンパイル&実行できますが。

isParallel() メソッドはその Stream オブジェクトが並行かどうかを返すメソッドで、特に説明は必要ないでしょう。 その他のメソッドは Stream オブジェクトを返すので、その後に intermediate operator や terminal operator に対応するメソッドをメソッド・チェーンとしてつなげていけば OK です。

サンプルコード

逐次ストリームと並行ストリームはインターフェース的には違いがないので、逆にどのように処理が違うのか分かりにくいかと思います。 なので違いが分かるんじゃなかろうかというサンプルを書いてみました。 まずは逐次ストリーム(普通のストリーム):

Stream<String> stream = Stream.of("Java", "Groovy", "Scala", "Clojure", "Kotlin", "Jython", "JRuby");
StringBuffer sb = new StringBuffer();

stream.peek(s -> sb.append(s+"\n"))
      .map(s -> s.toUpperCase())    // 大文字に変換
      .peek(s -> sb.append("\t"+s+"\n"))
      .map(s -> insertSpace(s))    // 各文字間にスペースを挿入
      .forEach(s -> sb.append("\t\t"+s+"\n"));

System.out.println(sb);

/** 各文字の間にスペースを挿入 */
public static String insertSpace(String s){
    return IntStream.range(0, s.length())
                    .mapToObj(i -> s.substring(i, i+1))
    	            .collect(Collectors.toStringJoiner(" ")).toString();
}

文字列を要素に持つストリームに対して、「大文字に変換 → 各文字の間にスペースを挿入」の順で処理を行っています。 ただし、途中で peek() メソッドによって変換途中の文字列を StringBuffer オブジェクトに格納しています。 これを実行すると、以下のように出力されます:

Java
	JAVA
		J A V A
Groovy
	GROOVY
		G R O O V Y
Scala
	SCALA
		S C A L A
Clojure
	CLOJURE
		C L O J U R E
Kotlin
	KOTLIN
		K O T L I N
Jython
	JYTHON
		J Y T H O N
JRuby
	JRUBY
		J R U B Y

逐次ストリームでは、まず最初の要素の "Java" が最後まで変換され、次に "Groovy" が変換され、・・・という処理が順々に行われます。 全ての文字を大文字に変え、次に全ての要素にスペースを挿入する、という風には実行されていません*1。 さて、次は並行ストリームに対して同様のコードを実行してみます。 コード自体は stream オブジェクトに対する処理の最初の行を

stream.parallel().peek(s -> sb.append(s+"\n"))

のように変えるだけです。 これを実行すると

JRuby
Scala
	SCALA
	JRUBY
Kotlin
	KOTLIN
		S C A L A
		K O T L I N
Clojure
Groovy
	CLOJURE
	GROOVY
		G R O O V Y
		C L O J U R E
Java
	JAVA
Jython
	JYTHON
		J Y T H O N
		J A V A
		J R U B Y

のように、1つの要素(例えば "JRuby")に対する変換(大文字にする、スペースを挿入する)は順々に実行されていますが、どの要素が最初に実行されるかなどの順序は全くランダムに見えます。 もちろん、毎回結果は異なるでしょう。 こういった並行処理を自分で、Thread や java.util.concurrency パッケージのクラス群を使って書くのは結構大変なので、こんな簡単に並行処理が実装できるなんて便利スグル。

まぁ、以上の話で終わりでいいと思いますが、ちょっと各メソッドがどんなインスタンスを返すかを試してみました:

List<String> langs = Arrays.asList("Java", "Groovy", "Scala", "Clojure", "Kotlin", "Jython", "JRuby");

// sequential
Stream<String> stream1 = langs.stream();
assert stream1.sequential() == stream1;
assert stream1.parallel()   == stream1;
assert stream1.unordered()  != stream1;

// parallel
Stream<String> stream2 = langs.parallelStream();
assert stream2.sequential() == stream2;
assert stream2.parallel()   == stream2;
assert stream2.unordered()  != stream2;

Set<String> langSet = new HashSet<>(langs);

// unordered
Stream<String> stream3 = langSet.stream();
assert stream3.sequential() == stream3;
assert stream3.parallel()   == stream3;
assert stream3.unordered()  == stream3;

// unordered parallel
Stream<String> stream4 = langSet.parallelStream();
assert stream4.sequential() == stream4;
assert stream4.parallel()   == stream4;
assert stream4.unordered()  == stream4;

// sorted
Stream<String> stream5 = langSet.stream().sorted();
assert stream5.sequential() == stream5;
assert stream5.parallel()   == stream5;
assert stream5.unordered()  != stream5;

結構 parallel() メソッドが自分自身を返してたりするんですね。 単にフラグを保持してて処理を切り替えてる実装なんでしょう。 まぁ、あくまで実装の話ですが。

次回は stateful intermediate operator と short-circuiting stateful intermediate operator の予定。

*1:まぁ、実装に依るかも知れませんが。