Subscribed unsubscribe Subscribe Subscribe

Rubyのスレッドプールとenumerator

Rubyスクリプトでスレッドプールを使いたかったので調べてみた。

スレッドプールの実装

Rubyクックブックに載っているmutexを使ったコードより、d:id:gioext:20081218:1229617888にあるblocking queueの方がスマートに見えたので、これをベースにshutdownを明示しなくてすむように、ちょっと弄ったコードを使うことにした。

require 'thread'

class Thread
  class Pool
    def initialize(size, &session)
      @size    = size
      @queue   = Queue.new
      @threads = []
      session.call(self)
    ensure
      shutdown
    end

    def run(&job)
      @queue.push(job)
      @threads << create_thread if @threads.size < @size
    end

    protected
    def shutdown
      until @queue.num_waiting == @threads.size
        sleep(0.01)
      end
      @threads.each { |th| th.kill }
    end

    protected
    def create_thread
      Thread.start(@queue) {|q|
        loop { job = q.pop; job.call }
      }
    end
  end
end

if __FILE__ == $0
  Thread::Pool.new(10) {|pool|
    100.times {|n|
      pool.run {
        wait = rand(100).to_f/100
        puts "#{n}: #{Thread.current} (#{wait} sec)"
        sleep(wait)
      }
    }
  }
end

Thread::Pool#newの引数を増減させると、スクリプト終了までの時間が顕著に変わるのがわかる。

enumeratorを使ってEnumerable#eachを並行版eachに差し替える

実際にこのスレッドプールを使う場合、シーケンシャルなEnumerable#eachを並行版のeachに差し替えられると嬉しそう。クックブックを見てみると、enumeratorとかいうモジュールを使えばできるらしい。

こんなスクリプトで試してみた。

require 'enumerator'
require 'enumerable/concurrent'

range = Range.new(0, 10)

conc_range = range.to_enum(:concurrent_each, 5)

puts "sequencial each"
range.each {|i|
  puts "#{i}: #{Thread.current}"
  sleep(0.5)
}

puts "concurrent each"
conc_range.each {|i|
  puts "#{i}: #{Thread.current}"
  sleep(0.5)
}

実行してみると、ちゃんと動いているのがわかる。並行度を上げると実行時間も短くなる。

sequencial each
0: #<Thread:0x7ffc7e7cfdf8>
1: #<Thread:0x7ffc7e7cfdf8>
2: #<Thread:0x7ffc7e7cfdf8>
3: #<Thread:0x7ffc7e7cfdf8>
4: #<Thread:0x7ffc7e7cfdf8>
5: #<Thread:0x7ffc7e7cfdf8>
6: #<Thread:0x7ffc7e7cfdf8>
7: #<Thread:0x7ffc7e7cfdf8>
8: #<Thread:0x7ffc7e7cfdf8>
9: #<Thread:0x7ffc7e7cfdf8>
10: #<Thread:0x7ffc7e7cfdf8>
concurrent each
0: #<Thread:0x7ffc7c303bf0>
1: #<Thread:0x7ffc7c3038a8>
2: #<Thread:0x7ffc7c303560>
3: #<Thread:0x7ffc7c303218>
4: #<Thread:0x7ffc7c302ed0>
5: #<Thread:0x7ffc7c3038a8>
6: #<Thread:0x7ffc7c303bf0>
7: #<Thread:0x7ffc7c303218>
8: #<Thread:0x7ffc7c303560>
9: #<Thread:0x7ffc7c302ed0>
10: #<Thread:0x7ffc7c303218>

並行版のeachはenumerable/concurrentというモジュールで下のように定義している。

require 'thread/pool'

module Enumerable
  def concurrent_each(n)
    Thread::Pool.new(n) {|pool|
      self.each {|x|
        pool.run { yield x } 
      }
    }
  end
end

本当にちゃんと動いている?騙されないぞ!

ずいぶん簡単に並行eachができてしまった。しかし、ドキュメントをよくよく見ていると、Enumerableのメソッドはすべてeachで書かれていると書いてある。並行なプログラムは非決定的な動作になるけど、大丈夫なのかと心配になって調べてみた。

RushCheckというHaskellのランダムテストライブラリQuickCheckをRubyに移植したものがあるので、これを使ってみる。

まずはmapをチェック。

require 'enumerator'
require 'enumerable/concurrent'
require 'rushcheck'

prop_map_id_equality = RushCheck::Assertion.new(Integer, Integer) {|max, n|
  RushCheck::guard { 0 < max }
  RushCheck::guard { 0 < n }
  range = Range.new(0, max)
  conc_range = range.to_enum(:concurrent_each, n)
  range.map {|x| x } == conc_range.map {|x| x }
}

prop_map_id_equality.check

実行!

$ ruby tests/concurrent_each.rb
OK, passed 100 tests.

続いてinjectをチェック。

prop_inject_equality = RushCheck::Assertion.new(Integer, Integer) {|max, n|
  RushCheck::guard { 0 < max }
  RushCheck::guard { 0 < n }
  range = Range.new(0, max)
  conc_range = range.to_enum(:concurrent_each, n)
  range.inject(0) {|acc, x| acc + x } == conc_range.inject(0) {|acc, x| acc + x }
}

prop_inject_equality.check

実行!

$ ruby tests/concurrent_each.rb
Falsifiable, after 1 tests:
[2, 2]

おかしい。

下のirbセッションにあるように、injectは正しい値を返さないし、findは固まる。

irb(main):001:0> require 'enumerator'
=> true
irb(main):002:0> require 'enumerable/concurrent'
=> true
irb(main):003:0> range = Range.new(0, 100)
=> 0..100
irb(main):004:0> conc_range = range.to_enum(:concurrent_each, 10)
=> #<Enumerable::Enumerator:0x7f0a161e1168>
irb(main):005:0> range.map {|x| x } == conc_range.map {|x| x }
=> true
irb(main):006:0> range.inject(0) {|acc, x| acc + x }
=> 5050
irb(main):007:0> conc_range.inject(0) {|acc, x| acc + x }
=> 0
irb(main):008:0> range.find {|x| x == 3 }
=> 3
irb(main):009:0> conc_range.find {|x| x == 3 }
^CIRB::Abort: abort then interrupt!!
        from /usr/lib64/ruby/1.8/irb.rb:81:in `irb_abort'
        from /usr/lib64/ruby/1.8/irb.rb:247:in `signal_handle'
        from /usr/lib64/ruby/1.8/irb.rb:66:in `start'
        from ./thread/pool.rb:22:in `call'
        from ./thread/pool.rb:22:in `sleep'
        from ./thread/pool.rb:22:in `shutdown'
        from ./thread/pool.rb:11:in `initialize'
        from ./enumerable/concurrent.rb:5:in `new'
        from ./enumerable/concurrent.rb:5:in `concurrent_each'
        from (irb):9:in `each'
        from (irb):9:in `find'
        from (irb):9

固まっている間のirbをstraceするとselectがtimeoutしてループしていた。

結論

  • 上記のスレッドプール自体は普通に使える。
  • RushCheck良い。
  • スレッドプールを使ったeachを、enumeratorでオリジナルのeachと差し替えるのは止めた方がよい。
  • だれかいけてる解決策を教えてください。