倭マン's BLOG

くだらない日々の日記書いてます。 たまにプログラミング関連の記事書いてます。 書いてます。

Groovy で Single Thread Execution パターン その2

以前Single Thread Execution パターンを GPars の Actor を使って書き換えてみました。 そこでは門の役割をする Gate クラス自体を Actor のサブクラスである DefaultActor を使ってスレッドセーフな実装にしていました。 ただし、クラスをスレッドセーフ化するのに、常に実装(ソースコード)に手を加えることができるとは限りません。 今回は既にスレッドデンジャラスな門の実装 UnsafeGate があるとして、それを GPars のライブラリでスレッドセーフかする方法を見ていきます。 今回試す方法は

  • Actor のサブクラスを作成するスレッドセーフ化
  • Actors を使って Actor オブジェクトを作成するスレッドセーフ化
  • Agent を使ったスレッドセーフ化

です。 ちなみに、UnsafeGate の実装はこんなの:

class UnsafeGate {

    private int counter = 0;
    private String name = 'Nobody'
    private String address = 'Nowhere'

    void pass(String name, String address){
        this.counter++;
        this.name = name
        this.address = address
        check();
    }

    private void check(){
        if(name.charAt(0) != address.charAt(0)){    // 名前と住所の頭文字が一致してなければ「BROKEN」
            println "***** BROKEN ***** No. $counter: $name, $address"
        }
    }
}

Actor のサブクラスを作成するスレッドセーフ化


まずは、前回行った Actor のサブクラスを作成するのと基本的に同じ方法。 DefaultActor を拡張して、UnsafeGate をラップした Actor のサブクラスを作成します:

import groovyx.gpars.actor.DefaultActor

class ActorGate extends DefaultActor {

    private UnsafeGate gate = new UnsafeGate()

    @Override
    void act() {
        loop {
            react { args ->
                gate.pass(args[0], args[1])
            }
        }
    }
}

これらを使用するスクリプトはこんなの:

import groovyx.gpars.actor.Actor

println 'Testing Gate, hit CTRL+C to exit.'

Actor gate = new ActorGate().start()
Thread.start passenger(gate, 'Alice', 'Alaska')
Thread.start passenger(gate, 'Bobby', 'Brazil')
Thread.start  passenger(gate, 'Chris', 'Canada')

def passenger(Actor gate, String name, String address){
    return {
        println "$name BEGIN"
        while(true){
            gate << [name, address]
        }
    }
}

ActorGate は start() メソッドを呼び出す必要がありました。

Actors を使って Actor オブジェクトを作成するスレッドセーフ化


次は、Actor のサブクラスを作成せずに Actor オブジェクトを作る方法。 簡単な Actor を作る場合はこの方法がお手軽。 Actor オブジェクトを作成するには、Actors クラス(Actor のユーティリティ・クラス)の actor() メソッドに、Actor の処理を記述したクロージャを渡します。

import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.Actors

println 'Testing Gate, hit CTRL+C to exit.'

// Actor オブジェクトの作成
Actor actorGate = Actors.actor{
    def gate = new UnsafeGate()
    loop {
        react { args ->
            gate.pass(args[0], args[1])
        }
    }
}

Thread.start passenger(actorGate, 'Alice', 'Alaska')
Thread.start passenger(actorGate, 'Bobby', 'Brazil')
Thread.start  passenger(actorGate, 'Chris', 'Canada')

def passenger(Actor gate, String name, String address){
    return {
        println "$name BEGIN"
        while(true){
            gate << [name, address]
        }
    }
}

Actor を作成するクロージャの loopreact は、DefaultActor を使って Actor のサブクラスを作成する際に act() メソッドの実装に書いたものと同じ(ような)ものです。 まぁ、気持ち的には DefaultActor を使って無名クラスを作っているようなものでしょうか。 ただし、start() メソッドを呼び出す必要はありません。

Agent を使ったスレッドセーフ化


最後は、今まで見てきた Actor を使用する方法とはちょっと違って Agent を使用する方法。 Agent は具象クラスでそれ自体インスタンス化し、コンストラクタに処理をスレッドセーフ化したいオブジェクトを渡します。 また、このオブジェクトに対して行いたい処理はクロージャとして渡します:

import groovyx.gpars.agent.Agent

println 'Testing Gate, hit CTRL+C to exit.'

// 処理をスレッドセーフ化したい UnsafeGate オブジェクトをコンストラクタの引数にして
// Agent オブジェクトを生成
Agent<UnsafeGate> gate = new Agent<UnsafeGate>(new UnsafeGate())

Thread.start passenger(gate, 'Alice', 'Alaska')
Thread.start passenger(gate, 'Bobby', 'Brazil')
Thread.start passenger(gate, 'Chris', 'Canada')

def passenger(Agent gate, String name, String address){
    return {
        println "$name BEGIN"
        while(true){
            // UsafeGate オブジェクトへの処理はクロージャで
            gate << { UnsafeGate gate -> gate.pass(name, address) }
        }
    }
}

スレッドセーフ化というより非同期処理化と言った方がいいのかも知れませんが。

正直、GPars のライブラリがいまいち分かってなくて、Actor と Agent の違いが Scala 起源か? Clojure 起源か?ってくらいの違いにしか認識できていないんですが、今回のサンプルから言えるのは

  • Actor も Agent も1つのオブジェクトに対して処理をシングル・スレッドで実行する
  • Actor はスレッドセーフなオブジェクトを作成するのに対して、Agent はスレッドデンジャラスなオブジェクトをスレッドセーフにする「入れ物」として使用する

みたいな感じでしょうか。 まぁ、Actor を「入れ物」として実装することもできますが。

増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編

増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編


Java並行処理プログラミング ―その「基盤」と「最新API」を究める―

Java並行処理プログラミング ―その「基盤」と「最新API」を究める―