LL脳がscalaの勉強を始めたよ その104


Scalaコップ本の30章をやっていきますよー、30章はアクターと並行プログラミングです。

前回は基本的なアクターのやりとりをみていきましたが、今回はネイティブスレッドでアクターを利用する方法なんかをやっていきますね

ネイティブスレッドをアクターとして扱う

アクターサブシステムは、自分で使うために1つ以上のネイティブスレッドを管理しているそうです。なので、定義したアクターを操作している限りではアクターがすれっどにどのようにマッピングされているのかについて考える必要は無いみたいです。また逆方向もサポートされているらしく、全てのネイティブスレッドはアクターとしても使える様になっているみたいです。

ただし、カレントスレッドをアクターとして直接使うためのメソッドをThread.currentが持っていないので、アクターとしてカレントスレッドを見たい場合はActor.selfを使う必要があるみたいです。例えばこんな感じで使うみたいですね

scala> import scala.actors.Actor._
import scala.actors.Actor._

// 自分自身に向けてメッセージを送信
scala> self ! "Hello"             

// 自分自身で受け取ります
scala> self.receive { case x => x }
res3: Any = Hello

この機能は対話型シェルを使ってデバッグするのに役に立つ反面、対話型シェルでreceiveを使った場合、receiveメソッドがメッセージを受け取るまでシェルを占有してしまって操作が効かなくなってしまうのでタイムアウト時間を指定する次のメソッドを使うのが良いみたいですね。

scala> self.receiveWithin(1000) { case x => x}
res0: Any = TIMEOUT

スレッドの再利用によるパフォーマンスの向上

アクターは全てのactメソッドに出番を与えるために専用のJavaスレッドが与えられているそうです。Javaのスレッドはかなりのメモリを消費する&スレッドを頻繁に切り替えるとプロセッサーサイクルををかなり消費するらしいので、処理効率を上げるためにはスレッドの作成や切り替えは控えめにしたほうが良さそうです。

reactメソッドでスレッドを再利用

…と、(Java系?)スレッドプログラミングではスレッドの節約を意識する必要があるみたいですが、Scalaではスレッドの節約を支援するための機能としてreceiveの代替となるreactメソッドを提供しているそうです。

reactはreceiveと同様に引数として部分関数をとるものの、結果型がNothingでメッセージハンドラを評価するだけらしくメッセージを見つけて処理しても制御を返さないみたいです。ちなみにreactは水面で処理終了後に例外を投げるみたいです(´・ω・`)

reactメソッドを使うと制御を返さなくていいので、reactを使った実装ではカレントスレッドを保存せずに次に起動するアクターのためにカレントスレッドを再利用することが出来るとのことです。これは極論的には一つのスレッドで全てのアクターをホスト出来るということみたいです。実際は複数プロセッサーコアを持つコンピュータなんかだとアクターサブシステムがゴニョゴニョするらしいので、いくつかのスレッドを使うみたいですが(´・ω・`)

reactメソッドのサンプル

そんな具合にスレッドの節約に活躍するreactメソッドですが、reactは制御を返さないのでreactの引数を受け取るメッセージハンドラは、メッセージ処理するのと同時にアクターの残りの仕事の調整をする必要があるみたいです。この作業によく使われるアプローチとしてはトップレベル作業メソッドを決めておくことみたいで、サンプルは次のようになるみたいです。

サンプルとしてホスト名の文字列を渡すと対応するIPアドレスを返すアクターを実装してみます。

import scala.actors._
// 名前引きアクターです
object NameResolver extends Actor {
  // 利用するJavaモジュールをimportします
  import java.net.{InetAddress, UnknownHostException}
  // アクター部分です
  def act(){
    // reactでメッセージを受け取ります
    react{
      // ドメイン名が渡ってきたとき
      case (name:String, actor:Actor) =>
        actor ! getIp(name)
        // 処理を継続します
        act()
      // 処理を終了します
      case "EXIT" =>
        println("Name resolver existing.")
      // ハンドルしていないメッセージが渡ってきたときの処理
      case msg => 
        println("Unhandled message:" + msg)
        // 処理を継続します
        act()
    }
  }
  // 実際に名前引きを行う処理です
  def getIp(name:String):Option[InetAddress] = {
    try{
      // 対応するIPアドレスを返します
      Some(InetAddress.getByName(name))
    }catch{
      // 存在しないホストの場合はNoneを返します 
      case _:UnknownHostException => None
    }
  }
}

んじゃ、実行してみますよー

// アクターを1スレッドで実行しますよ
scala> NameResolver.start()
res1: scala.actors.Actor = NameResolver$@4ca42b

// メッセージを送信します
scala> NameResolver ! ("www.scala-lang.org", self)
// 受け取ります
scala> self.receiveWithin(100) { case x => x }
res3: Any = Some(www.scala-lang.org/128.178.154.159)

// 同じスレッドでメッセージを再度送ります
scala> NameResolver ! ("wwwwwwwww.scala-lang.org", self)
// 受け取ります…が、存在しないのでNoneが返ります
scala> self.receiveWithin(100) { case x => x }          
res5: Any = None

上記サンプルではトップレベルの作業メソッドとしてact自体を利用して、処理が就床したときに再度呼び出すような動きになっております。また、同様の処理はEXITによる処理の終了を行わないという前提のもとで、下記のようにActor.loopを利用して書き換えることが出来るみたいです。このコードだとアクターは永遠にメッセージを処理し続けることになるそうです。

def act(){
  // 永遠に処理を続けます(`・ω・´)
  loop{
    react{
      case (name:String, actor:Actor) =>
        actor ! getIp(name)
      case msg => 
        println("Unhandled message: " + msg)
    }
  }
}
reactの仕組み

reactは次のような流れで処理を進めていくみたいですね(`・ω・´)

  • startメソッドは、何らかのスレッドが最終的にそのアクターのactを呼び出すように調整する
  • actメソッドがreactを呼び出すとアクターのメールボックスに部分関数が処理できるメッセージがあるかどうかをチェック
  • 処理できるメッセージを見つけるとreactはあとでメッセージを処理できるようにスケジューリングして例外を投げる
  • メッセージが見つからないとアクターは「冷蔵庫」に入れられて、メールボックスに他のメッセージが送られてきたときに目を覚まして例外を投げる
  • reactとactは例外を投げて異常終了
  • actを呼び出したスレッドは例外をキャッチして他の仕事に進む

基本的に例外を発生させて異常終了するので、最初のメッセージ意外を処理させたい場合は部分関数の中でactを呼だす、などのreactをもう一度呼び出すような手段が必要になる…とc⌒っ゚д゚)っφ メモメモ...処理の節約はできるけど若干手がかかるわけですな(´・ω・`)

いじょうー

時間切れでここまでデス。次回はアクターコードのスタイルをやります(`・ω・´)