LL脳がscalaの勉強を始めたよ その105
Scalaコップ本の30章の続きをやっていきますよー。30章はアクターを使った並行プログラミングですが、今回はアクターコードのスタイルからやっていきますねー
アクターコードスタイル
デバッグしやすくかつデッドロックや競合が起こりにくいアクタースタイルでのプログラミングについて、コップ本的ガイドラインを見ていきましょうかねー
アクターはブロックしてはならない
アクターが(処理待ちなどの理由で)メッセージをブロックしてしまうと、ブロック後に他のアクターから送られてきたリクエストに気づかなかったり、また複数のアクター同士が相互の処理待ちのためにメッセージをブロックすることでデッドロックに陥る可能性があるとのことです。
なので、メッセージをブロックするのではなく、アクションをすぐに実行しても良いことを知らせる何らかのメッセージが届くように調整するべきみたいです。また、この調整には例えば次のようにThread.sleepを直接呼び出してカレントアクターをスリープすることで十分な時間が軽kした後にメッセージを送信するヘルパーアクターを作成するなど、他のアクターの助けが必要になる場合が多いみたいです。
actor { // 特定の時間が経過した後にメッセージを送ります Thread.sleep(time) mainActor ! "WAKEUP" }
上記のアクターは実際のところThread.sleepによってブロックしているのですが、メッセージの送信のみを行いメッセージの受信は行わないため(デッドロックを引き起こすなどの)問題はないとのことです。
上記のようなヘルパーアクターを使ったサンプルとしては次のようなものが挙げられるみたいですネ(´・ω・`)
val sillyActor2 = actor { // ヘルプアクター呼び出しメソッドです def emoteLater(){ // 正しいアクターにEmoteメッセージを送信するために // メインアクターのselfを評価しマス val mainActor = self // これがヘルプアクターです actor { Thread.sleep(1000) mainActor ! "Emote" } } var emoted = 0 emoteLater() // メインのアクター処理です loop { react { // ヘルパーアクターからメッセージが来た場合 case "Emote" => println("I'm acting!") emoted += 1 if(emoted < 5) // ヘルパーアクターに遅延メッセージを送信させます emoteLater() // ソレ以外の場合は単純にメッセージを表示します case msg => println("Received: " + msg) } } }
ちょっと分かりにくいですが実行結果ですー。メインアクターの仕事をしつつ、emoteLaterの処理を行っておりますねー。結果的にブロックせずに次の仕事の入力待ち→実行が行われているようでデス。
scala> sillyActor2 ! "Hello actor"
Received: Hello actor
scala> I'm acting!
I'm acting!
I'm acting!
I'm acting!
I'm acting!
アクターとはメッセージだけを使って通信する
アクターモデルでは、actメソッドとして逐次的にモノを考えられる安全地帯を提供することで共有データ・ロックモデルでの難点であったデッドロックや競合への対処を行っているのだけども、これはアクター間の通信手段がメッセージだけに制限されているからこそ出来ることみたいです。
例えば2つのアクターが!メソッド以外を使って相互にインスタンスのデータを参照・書き込みし始めた場合、各データの同期をとる必要が出てきてしまうため、結局のところロック機構によって制御する共有データを扱っているのと同じになってしまうみたいデス(´・ω・`)なので、メッセージ交換以外のことをするのはコップ本的にオススメ出来ないそうです。
しかしながら、例えば複数のアクターにミュータブルマップを共有させるような処理を行う場合には、Java並行処理ユーティリティのConcurrentHashMapのようなスレッドセーフなマップを渡して各アクターに操作させたりって方法もあるように、Scala的折衷主義でアクターモデルと共有データ・ロックモデルを併用できるようです(´・ω・`)
なお、上記のConcurrentHashMapを使わない純粋アクターアプローチとしては、ミュータブルマップを所有するアクターを作って、他のアクターがそのマップにアクセスするための次のようなメッセージを定義することになるみたいです。
- マップに対する各種操作のメッセージ
- キーと値のペアを共有マップに追加するメッセージ
- 指定したキーに対応する値を取得するメッセージ
- 他…
- マップに問い合わせを行ったアクターに対して非同期に応答を送り返すメッセージ
…といくつかの定義が必要になったり、共有マップを使えば同期的に処理できるのに対して純粋アクターアプローチだと応答を全て非同期にする必要があるなど、けっこうな癖があるので、Scalaでは両方使えるようにしていますよー、とのことです。
あと、コップ本の中でアクターモデルと共有モデルの検討イメージにダーティーハリーのセリフが出てくるんだけど…あまり意味がワカラナイデス(´・ω・`)確実なのか、威力があるのかどっちがいい?ってことかしら?
イミュータブルなメッセージを使うようにする
Scalaのアクターモデルでは個々のactメソッドが実質的に1つスレッドに制限されているみたいなので、actメソッドの中心部では同期をとっていないミュータブルなオブジェクトでも構わないみたいです。このようにアクターモデルは一つのスレッドからしかアクセスされないので「共有なしモデル」とも呼べるみたいです。
ただし、アクター間でメッセージを送信するのに使われるオブジェクトの内部データは複数のアクターによって共有されるので、メッセージオブジェクトがスレッドセーフかどうかは注意しなければいけないそうです(´・ω・`)つか、一般的にメッセージオブジェクトはスレッドセーフであるべき!とのこと
ここで、一番良いスレッドセーフかどうかの保証はイミュータブルオブジェクトを使うこと!みたいなので、メッセージオブジェクトの中ではいミュータブルな要素を使うのが良さそうです。
具体的にはケースクラス等でイミュータブルな型によるvalフィールドのみを利用することで、目的となるイミュータブルメッセージオブジェクトを使うことが出来るみたいです。なお、ケースクラスだと部分関数内でパターンマッチ出来たりするので便利だYO!とのことですが、ケースクラスでないイミュータブルクラスのインスタンスであれば問題なく使えるようです。またScala APIの提供する次のようなイミュータブル要素達でもOKのようです
- タプル
- 文字列
- リスト
- イミュータブル集合・マップ
仮にメッセージがミュータブルだと、何処かしこで書き換えられている可能性がでてくるため、並行処理上で不具合が出やすくなるとのことです。
なお、一般的に全ての動機されていないミュータブルオブジェクトは「オーナー」を持って一つのアクターだけがアクセスするようにデータを構成するといいみたいです。これによってオーナーとしてそのオブジェクトにアクセスできるアクターがどれかをはっきりすることができるので、ミュータブルなメモリのどの部分がどのアクターに割り当てられているかはっきり出来るみたいです。
また、仮に内部で取り扱っているミュータブルなオブジェクトを他のアクターへのメッセージで送りたい場合は、いミュータブルなコピーを作成してそいつを使いましょう、とのことです。例えば内部的に配列を使っている場合であればarr.toListを利用してイミュータブルなメッセージオブジェクトを作成する必要があるみたいです(´・ω・`)
そんな感じで並列システムでのメッセージのイミュータブル性(による実装の簡易さ&リスクの低下)を確保するために、コップ本ではイミュータブルな設計にこだわっているみたいです。
メッセージを自己完結的にする
メソッドからの値呼び出しであれば、呼び出し元はメソッドを呼び出す前の処理をきちんと覚えているので、メソッド呼び出しの結果を受けたうえでそれまでの処理を続行するtことが出来ます…が、アクターを使った場合はそんな単純にはいかないです(´・ω・`)
アクター間でやり取りする場合は、他のアクタにメッセージを送ったあとで長い期間返答が帰って来ない場合もあるため、呼び出し元のアクターは他の処理を進めていたりします。なのでメッセージ送信先のアクターから返答が帰ってきた場合に何をしていたか小野恵ていないことが多々ありマス(´・ω・`)
このような問題を防いでアクタープログラムのロジックを簡単にするには、パラメータを増やしたりすることでメッセージに多めの情報を付与するといいみたいです。また、メッセージ用のケースクラスを作ることでも情報量を増やすことができるみたいです。ケースクラスを使うことでパターンマッチにも使えるし、情報を明示的にまとめることも出来ますね(`・ω・´)
例えばメッセージ情報量を増やしたサンプルは次のようになりますね。前回作成した名前引きアクターを拡張してやりますデス
import scala.actors._ import scala.actors.Actor._ import java.net.{InetAddress, UnknownHostException} // メッセージ情報用のケースクラスを定義します。 case class LookupIP(name:String, respondTo:Actor) case class LookupResult( name:String, address:Option[InetAddress] ) // アクターを定義します object NameResolver2 extends Actor { def act(){ loop{ // メッセージを受け取ります react{ // 戻り値の中に前回送ったメッセージの参照(ドメイン名)があるので // それを元に処理を実行 case LookupIP(name, actor) => actor ! LookupResult(name, getIp(name)) } } } // ここは前回と同じ処理です def getIp(name:String):Option[InetAddress] = { try{ // 対応するIPアドレスを返します Some(InetAddress.getByName(name)) }catch{ // 存在しないホストの場合はNoneを返します case _:UnknownHostException => None } } }
それじゃー実行してみますよー
// アクターを実行します scala> NameResolver2.start res11: scala.actors.Actor = NameResolver2$@402e11 // アクターにメッセージを送信しますよ // メッセージの送信もケースクラスを使ってまとめてます scala> NameResolver2 ! LookupIP("www.google.co.jp", self) // 自分宛に返ってきたメッセージを受け取ります scala> self.receiveWithin(1000) {case x => x} // 送信したメッセージも一緒に帰ってきました(`・ω・´) res13: Any = LookupResult(www.google.co.jp,Some(www.google.co.jp/66.249.89.104))