2015年03月

Effective Ruby を読んだ

Effective Ruby を読んだメモをこっそり。メモなのでてきとうなことを言っている場合があります。

Effective Ruby
Effective Ruby
posted with amazlet at 15.03.29
Peter J. Jones
翔泳社
売り上げランキング: 114,152
 

感想

良い本だった。知ってることもそりゃあったけれど、それも含めて勉強になった。 ただ、MiniTest と ri, gem, bundler の章はツールの使い方って感じでちょっと読み飛ばしてしまったかな。他の章はよかった。

警告

ruby -w で警告を出せるが、rspec などのように ruby コマンドにオプションを渡せないことがある。そういう場合は RUBYOPT 環境変数を使える。

RUBYOPT=-w

グローバル変数 $VERVOSE を true に設定しても警告を有効にできる。false だと警告が減り、nil だと消える。 しかし、こちらでは $VERBOSE の設定が解釈される前の、コンパイル時の警告が出ない。

コンパイル時警告だけを出したい場合は、RUBYOPT=-w かつ $VERBOSE=nil として、実行時計画だけを出したい場合は $VERBOSE=true とし、両方出したい場合は RUBYOPT=-w とすればよい

※ しばらく、.bashrc に export RUBYOPT=-w と書いておいて警告をつぶしまくる業をしたくなったが、警告出まくって辛い

定数

Ruby の定数は変更可能

argv = ARGV

のように小文字の変数に代人してしまえば、argv[0] = 'foo' のように警告さえもなしに変更可能

clone と dup

clone は freeze 状態と特異メソッドもコピー。dup はやらない

Hash の初期値

Hash.new({}) と書いた場合、Hash の要素は全て同じ object_id を持つ {} で初期化される。これは使い物にならない。

Hash.new { {} } と書けば、要素にアクセスした時に始めてブロックが評価されるので、別の object_id を持つ {} で初期化される。

fields = {}
chunk.msgpack_each do |tag, time, record|
  channel = build_channel(record)
  fields[channel]    ||= {}
  fields[channel][tag] = build_message(record)
end

みたいに書いていたコードは

fields = Hash.new {|hash, key| hash[key] = {} }
chunk.msgpack_each do |tag, time, record|
  channel = build_channel(record)
  fields[channel][tag] = build_message(record)
end

と書けるようだ。ただし、fields[key] がデフォルトで {} になってしまうので、if fields[key] みたいなものは書けなくなる。if fields.has_key?(key) ならイケるようだ。

※ しかし、このコードは他の人に伝わるのかどうか微妙だ

rescue の中での例外

rescue の中でまた例外が起きる可能性も考えてコードを書く

その際、オリジナル例外情報が消えないように、例外インスタンスにオリジナルの例外も含めるように、と書いてあるが、Ruby 2.1 以降なら e.cause もあるのでそれを使っても良いのでは、と思った。

引数のデフォルト値

def_instance_delegator の所で変なところに食いついてしまった。シグネチャが

def def_instance_delegator(accessor, method, ali = method)

のようになっていて、第三引数のデフォルト値を、第二引数の値に設定していた。そんなことできたのか。

method_missing ではなく define_method を使おう

これは自分もやっている。method_missing でメソッドを定義したつもりになっていると、respond_to? が true を返してくれないし、そもそも全部 hook しはじめるので遅いし、メソッド typo してもわからなくなる。

respond_to? の代わりに respond_to_missing? というメソッドで調べることができるようだ。

instance_eval と class_eval

再整理

instance_eval でメソッド定義したら、特異メソッド。 Foo.instance_eval はクラスの特異メソッドなので、クラスメソッドになる。 インスタンスメソッド定義用に用意されたのがFoo.class_eval。クラスでしか使えない。

効率的なテスト

  • ファズテスト (クラッシュしないかどうかのテスト)
    • FuzzBert gem
    • 数日流す
  • プロパティテスト (仕様にそったランダムテスト)
    • MrProperty gem
    • XML Schema とか JSON Schema で入力データのスキーマを定義して、境界値テストするみたいなやつ

ri

ri Array

モンキーパッチあてているやつのドキュメントも見れるとのこと => あれ、Hash#except とかでてこないけどな

ri File::open
ri Array#pop
ri pop
ri bundler:README.md

irb

irb> irb 'foo'
irb#1(foo)> size

のようにしてそのオブジェクトのセッションを開ける (pry みたいなことが irb だけでもできたのか)

jobsfgkill でセッションを扱う

gem の上限指定

最新バージョンで動く保証がないのだから、自作 gem が依存している gem のバージョンは上限指定をしよう、と言っているが、反対したい。

gem.add_dependency 'fluentd', '>= 0.10.56', '< 0.12.0'
# gem.add_dependency 'fluentd', '~ 0.10.56'

のように書いてしまうと、v0.12 でも動くものが動かなくなってしまう。

gem.add_dependency 'fluentd'

とだけ書いてあれば、仮に v0.12 で動かないにしても、ユーザ側で Gemfile.lock をいじって回避しようがあるが、 gemspec でバージョン指定されてしまうと、ユーザの手で回避しようがなくなる。

「最新バージョンで動く保証がない」とか言ってないで「最新で動かないことがわかったら速やかに修正します」という姿勢をもったほうが良いと思ってる。

GC

近所の Ruby コミッタ氏に以前聞いたことが書いてあった。

  • Ruby はメモリプールを確保しているので、OSにメモリを返されないことがある
  • プール > ページ > スロット
  • ページが全部解放されない限り、OS にメモリを返却しない. ps や top でみたメモリ使用量が減らない
  • 2.1 では 1ページ 408 スロット

詰め詰め処理して、できるだけ空きページ作るような処理はやらないんだっけ?Java とかはやってた気がするが。

GC.stat の heap_length がページ数. その他 => https://gist.github.com/sonots/71277ef3f9b53fa87862

GC のタイミング

malloc_increase が malloc_limit を超えるとマイナーGC、olmalloc_increase が oldmalloc_limit を超えるとメジャーGC が走る。

  • malloc_limit: これを越えて malloc すると GC が発生する閾値
  • malloc_increase: GC 発生までに malloc/realloc したバイト総数 (GC 起動でリセット)
(1..1000).each do |i|
  a = [i] * 10000
  s = GC.stat
  puts "#{s[:malloc_limit]} #{s[:malloc_increase]}"
end

超えると Minor GC が走ってその度に malloc_limit が増えていることが確認できた。

16777216 16765560
23584579 672
...
23584579 23562384
33100043 672
...
33100043 33067888
33554432 672
...
33554432 672

malloc_limit が増え続けて、ヒープサイズも大きくなるかと思っていたが、33554432 よりは増えなくなった。はて。=> RUBY_GC_MALLOC_LIMIT_MAX (32MB) にあたったからだ。けっこう小さいような気もするし、1回のGCと考えれば大きいような気もするし、チューニング難しいな。

ObjectSpace.define_finalizer

GC発動時にリソース解放処理をさせる。そんなことができるのか。ただし、けっこう嵌りそう

freeze

"foo".freeze は定数と同じ扱いになる

"foo" でオブジェクトを生成してから freeze しそうに見えるが、そうはならないようだ。以前 f リテラルを導入するかどうか議論していたのはこの辺の挙動が不思議に見えるからかな、と邪推

Q. いつ、GC されるんだ?されなくなる? Symbol と同じ扱い?であれば、Symbol GC と同様のタイミングで解放される?


ループの中でオブジェクトリテラルを避けよう

(1..100).each do |i|
  %w[a b c].include?('a')
end

とかやると %w[a b c] の object が作られては GC されるので無駄処理になる。あらかじめローカル変数に保存しておくべき。これは意識してるな。

メモ化

複数行にわたるときは

@a ||= begin
end

と書ける。今まで

return @a if @a
...
...
@a = ...

と書いてたな。 ナルホド

Daioikachan - 複数のバックエンド(IRC, Slack, etc)に対応する Ikachan 互換サーバを書いた

最近、弊社でも Slack が使われはじめている。 弊社のアラート通知は今まで IRC (とメール)にだけ流していたが、Slack にも流せるように Ikachan 互換 API を持ち、IRC, Slack 両方に流せるサーバとしてDaioikachan というものを作った。

要件

以下のような要件を満たす

  • Ikachan 互換 API
  • 複数バックエンド対応
    • Slack だけではなく、IRCにも流す(移行期間中用。いずれは Slack に完全移行したい)
  • ルーティング機能
    • どのチャンネルをどのSlackチームに流すかルーティング設定できる
  • プラグイン機構
    • IRC, Slack 以外のバックエンドにも対応出来る(せっかくなので)

特に、弊社では草の根活動的に、Slack 利用が広まったという背景もあり、チーム毎に別の Slack チーム(ドメイン) を使っていることがある。 そのため、「このチャンネルは、あのSlackチームのあのチャンネル」という風にルーティングできる機能が欲しかった。

設計

以下のような設計にした

  • Ruby 1プロセスで捌く。ミドルウェアに依存しない。
    • アプリの特徴として IRC, Slack といった外部リソースに対して I/O ブロックされるが、CPU リソースを多く使うようなものではないので1プロセスで十分そう
  • バックエンドへの post は非同期に捌く
    • Web API は、enqueue だけして即座にレスポンスを返す
    • IRC, Slack といったバックエンドへの書き込みは、非同期に処理する(non-blocking I/O or スレッド)
    • 投稿制限があるので sleep をいれてゆっくり処理する
  • データの validation は(そんなに)やらない
    • バックエンドへの post が失敗した場合にログに warn を吐いて、ログを別途監視して対応
  • 設定ファイルでルーティングを記述できる

使い方

README にも書いているけど、いちおう書いておくと、こんなかんじ

$ gem install daioikachan
$ daioikachan -g daioikachan.conf # サンプル設定生成
$ vim .env
IRC_SERVER=XX.XX.XX.XX
SLACK_API_TOKEN=XXX-XXXXX-XXXXXX-XXXXX
$ daioikachan -c daioikachan.conf # 起動

で、あとは

$ curl -d "channel=#channel&message=test message" http://localhost:4979/notice

とかするだけ。デフォルト daioikachan.conf は IRC と Slack 両方に投げるけど、Slack だけに投げたかったら設定ファイルをいじってIRC側を消してもらえればいい。

daioikachan.conf

IRCと2つのSlackチームに流すサンプルはこんなかんじになる。あれ、なんか Fluentd っぽい。

<source>
  type daioikachan
  bind 0.0.0.0
  port 4979
  min_threads 0
  max_threads 4
  backlog 1024
  @label @raw
</source>

<label @raw>
  <match **>
    type copy
    <store>
      type stdout
    </store>
    <store>
      type relabel
      @label @slack
    </store>
    <store>
      type relabel
      @label @irc
    </store>
  </match>
</label>

<label @irc>
  <match **>
    type irc
    host "#{ENV['IRC_SERVER']}"
    port 6667
    nick daioikachan
    user daioikachan
    real daioikachan
    command %s
    command_keys command
    channel %s
    channel_keys channel
    message %s
    out_keys message
    send_interval 2s # IRC would return Excess Flood, so sleep
  </match>
</label>

<label @slack>
  # #{notice,privmsg}.team1_warn => team1.slack.com#general
  <match *.team1_warn>
    type slack
    token "#{ENV['TEAM1_TOKEN']}"
    username daioikachan
    channel general
    color good
    icon_emoji :ghost:
    flush_interval 2s # slack API has limit as a post / sec
  </match>
  # other channels => team2.slack.com#${channel}
  <match **>
    type slack
    token "#{ENV['TEAM2_TOKEN']}"
    username daioikachan
    channel %s
    channel_keys channel
    color good
    icon_emoji :ghost:
    flush_interval 2s # slack API has limit as a post / sec
  </match>
</label>

daioikachan の実装

勘の良い人は気づいたかもしれないが、daioikachan は Fluentd の仕組みに乗っかって実装されている。

  1. 非同期処理は BufferedOutput プラグインという仕組みがすでにある
  2. プラグイン機構がすでにある
  3. tag や label といった設定ファイルでのルーティング機能がすでにある
  4. IRC, Slack プラグインもすでにあるし、
  5. 他にも hipchat、twitter、mail、 twilio プラグインなどもすでにある

あとは Ikachan 互換の HTTP を受ける口だけあれば良い、ということで、in_daioikachan というプラグインを書いた。 プラグイン内部で http サーバ puma を動かしつつ、生 rack なコードを書いて実現している。

in_daioikachan にこんなかんじで post すると、

curl -d "channel=#channel&message=test message" http://localhost:4979/notice

内部的には以下のようなメッセージが emit されるので、

notice.channel {"command":"notice","channel":"channel","message":"test message"}

それを output プラグインで処理すれば良いというわけだ。ちなみに、パラメタを増やすと、emit メッセージのフィールドも増えるので、out_mail 用に subject を増やすとかもできるかもしれない。

fluent-plugin-irc, fluent-plugin-slack

この2つは機能拡張、性能検証もしっかりやって、daioikachan にデフォルトで bundle している。

まだ、マージされていないものがあるので、その辺はあとで更新しますが、お使いいただける状態です。=> 全部マージしてもらいました :D Thanks!! (2015.03.25)

まとめ

複数のバックエンド(IRC, Slack, etc)に対応する Ikachan 互換サーバ、Daioikachan を書いた。 バックエンドを増やしたい時には、Fluentd のプラグインとして書けば良いので、既存の資産に乗っかれて大変便利。

どうぞご利用ください。

Server::Starter を使って複数の Fluentd で1つのポートを待ち受ける


課題

Fluentd は GVL のある CRuby でしか(まだ)動かないので、マルチコアを有効に使うためには1つのホストで複数のプロセスを同時に立ち上げる必要がある。

現在の Fluentd では、データ受信側では、複数のプロセスを立ち上げて、それぞれで別のポートを listen(2) して待ち受け、データ送信側では、送信先にホスト名と複数のポートを指定して、クライアント側でラウンドロビンすることで負荷分散する。

イメージ図

                       host
                 +---------------------+
                 |    process          |
                 |   +-------------+   |
     +-------------> |  port:24224 |   |
     |           |   +-------------+   |
     |           |                     |
+----+----+      |   +-------------+   |
| client  +--------> |  port:24225 |   |
+----+----+      |   +-------------+   |
     |           |                     |
     |           |   +-------------+   |
     +-------------> |  port:24226 |   |
                 |   +-------------+   |
                 +---------------------+

目指す姿

受信側では Unicorn のように1つのポートだけ開いて複数プロセスでリクエストを捌きたい。

                         host            
                +----------------------------+
                |                 process    |
                |               +--------+   |
                |  +------------+        |   |
                |  |            +--------+   |
                |  |                         |
+----------+    |               +--------+   |
|  client  | ---> port:24224 -- |        |   |
+----------+    |               +--------+   |
                |  |                         |
                |  |            +--------+   |
                |  +------------+        |   |
                |               +--------+   |
                +----------------------------+

実現方法

Server::Starter というものがある。それを ruby に移植した ruby-server-starter というものがある(昨日リリースした)。これを使うと、Server::Starter が listen(2) して Socket を生成し、子プロセスとして Fluentd を fork && exec することで、その Socket のファイルディスクリプタを子プロセスである Fluentd に引き継いでくれる。

bundle exec start_server.rb --port=0.0.0.0:24224 -- \
  bundle exec --keep-file-descriptors fluentd -c fluent.conf 2>&1

ここで fluent-plugin-multiprocess を利用して、複数の Fluentd プロセスを立ち上げる。

# fluent.conf
<source>
  type multiprocess
  keep_file_descriptors true
  <process>
    cmdline -c in_forward.conf
  </process>
  <process>
    cmdline -c in_forward.conf
  </process>
</source>

in_forward プラグインで、ファイルディスクリプタに対する Socket を生成し、その Socket に対して accept(2) することで複数の Fluentd プロセスから同じポートに対してリクエストを待ち受けることができる

# in_foward.conf
<source>
  type forward
  fd "#{ENV['SERVER_STARTER_PORT'].split(';').first.split('=').last}"
</source>
<match **>
  type stdout
</match

※ ちなみに、Fluentd の v1-config 形式では conf に ruby コードが書けるので、そこでごにょごにょして Server::Starter が環境変数越しに渡してくれる fd 番号を取り出すことができる。

実装

とういわけでこの戦略で試しに実装してみたブランチがこちらにある

(あとで気軽にブランチ消してしまいそうなので patch を gist にも残しておく)

in_forward プラグインに fd オプションを生やして、ファイルディスクリプタから Socket を生成できるようにしている。

--- a/lib/fluent/plugin/in_forward.rb
+++ b/lib/fluent/plugin/in_forward.rb
@@ -25,6 +25,7 @@ module Fluent

     config_param :bind, :string, :default => '0.0.0.0'
+    config_param :fd, :integer, :default => nil # fd for tcp socket
     config_param :backlog, :integer, :default => nil
@@ -65,18 +68,30 @@ module Fluent
     def listen
-      log.info "listening fluent socket on #{@bind}:#{@port}"
-      s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
+      s = if @fd
+            sock = TCPServer.for_fd(@fd)
+            log.info "inherited addr=#{tcp_name(sock)} fd=#{@fd}"
+            Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:on_message))
+          else
+            log.info "listening fluent socket on #{@bind}:#{@port}"
+            Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
+          end
       s.listen(@backlog) unless @backlog.nil?
       s
     end

また、fluent-plugin-multiprocess を使って Fluentd のプロセスを立ち上げようとすると、Process#spawn の仕様により、デフォルトではファイルディスクリプタが閉じられてしまうので、keep_file_descriptors というオプションで閉じなくさせるように拡張している。

--- a/lib/fluent/plugin/in_multiprocess.rb
+++ b/lib/fluent/plugin/in_multiprocess.rb
@@ -65,7 +69,9 @@ module Fluent
         cmd = "#{Shellwords.shellescape(RbConfig.ruby)} #{Shellwords.shellescape(fluentd_rb)} #{pe.cmdline}"
         sleep pe.sleep_before_start if pe.sleep_before_start > 0
         $log.info "launching child fluentd #{pe.cmdline}"
-        pe.process_monitor = @pm.spawn(cmd)
+        options = {:close_others => !@keep_file_descriptors}
+        pe.process_monitor = @pm.spawn(cmd, options)
       end
     end

これで Server::Starter (& Fluentd)を起動し、

$ bundle exec start_server.rb --port=0.0.0.0:24224 -- \
  bundle exec --keep-file-descriptors fluentd -c fluent.conf 2>&1

fluent-cat でデータを流しこんで見ると、

$ echo '{"message":"foo"}' | fluent-cat -p 24224 foo
連打

別のプロセスにデータが振り分けられてうまくいっていることが確認できた╭( ・ㅂ・)و ̑̑

32541 2015-03-17 01:54:23 +0900 foo: {"message":"foo"}
32541 2015-03-17 01:54:30 +0900 foo: {"message":"foo"}
31123 2015-03-17 01:54:34 +0900 foo: {"message":"foo"}
32541 2015-03-17 01:54:35 +0900 foo: {"message":"foo"}
31123 2015-03-17 01:54:36 +0900 foo: {"message":"foo"}
31123 2015-03-17 01:54:37 +0900 foo: {"message":"foo"}
32541 2015-03-17 01:54:38 +0900 foo: {"message":"foo"}

※ 1番左に PID を表示するように out_stdout を一時的にいじっている。

まとめ

Server::Starter を使って複数の Fluentd プロセスで1つのポートを待ち受けることができた。

ちょっとやってみたかっただけで、実際にこれが Fluentd に入るのかというと、たぶんそんなことはなく、Treasure Data の新人さん が、ServerEngine の導入や、SocketManager の実装を鋭意進行中らしいので、そちらをお待ちください mm

A Ruby and Fluentd committer working at DeNA. 記事本文および記事中のコード片は引用および特記あるものを除いてすべて修正BSDライセンスとします。 #ruby #fluentd #growthforecast #haikanko #yohoushi #specinfra #serverspec #focuslight
はてぶ人気エントリー