課題

Fluentd は GVL のある CRuby でしか(まだ)動かないので、マルチコアを有効に使うためには1つのホストで複数のプロセスを同時に立ち上げる必要がある。

現在の Fluentd では、データ受信側では、複数のプロセスを立ち上げて、それぞれで別のポートを listen(2) して待ち受け、データ送信側では、送信先にホスト名と複数のポートを指定して、クライアント側でラウンドロビンすることで負荷分散する。

イメージ図

                       host
                 +---------------------+
                 |    process          |
                 |   +-------------+   |
     +-------------> |  port:24224 |   |
     |           |   +-------------+   |
     |           |                     |
+----+----+      |   +-------------+   |
| client  +--------> |  port:24225 |   |
+----+----+      |   +-------------+   |
     |           |                     |
     |           |   +-------------+   |
     +-------------> |  port:24226 |   |
                 |   +-------------+   |
                 +---------------------+

目指す姿

受信側では Unicorn のように1つのポートだけ開いて複数プロセスでリクエストを捌きたい。

                         host            
                +----------------------------+
                |                 process    |
                |               +--------+   |
                |  +------------+        |   |
                |  |            +--------+   |
                |  |                         |
+----------+    |               +--------+   |
|  client  | ---> port:24224 -- |        |   |
+----------+    |               +--------+   |
                |  |                         |
                |  |            +--------+   |
                |  +------------+        |   |
                |               +--------+   |
                +----------------------------+

実現方法

Server::Starter というものがある。それを ruby に移植した ruby-server-starter というものがある(昨日リリースした)。これを使うと、Server::Starter が listen(2) して Socket を生成し、子プロセスとして Fluentd を fork && exec することで、その Socket のファイルディスクリプタを子プロセスである Fluentd に引き継いでくれる。

bundle exec start_server.rb --port=0.0.0.0:24224 -- \
  bundle exec --keep-file-descriptors fluentd -c fluent.conf 2>&1

ここで fluent-plugin-multiprocess を利用して、複数の Fluentd プロセスを立ち上げる。

# fluent.conf
<source>
  type multiprocess
  keep_file_descriptors true
  <process>
    cmdline -c in_forward.conf
  </process>
  <process>
    cmdline -c in_forward.conf
  </process>
</source>

in_forward プラグインで、ファイルディスクリプタに対する Socket を生成し、その Socket に対して accept(2) することで複数の Fluentd プロセスから同じポートに対してリクエストを待ち受けることができる

# in_foward.conf
<source>
  type forward
  fd "#{ENV['SERVER_STARTER_PORT'].split(';').first.split('=').last}"
</source>
<match **>
  type stdout
</match

※ ちなみに、Fluentd の v1-config 形式では conf に ruby コードが書けるので、そこでごにょごにょして Server::Starter が環境変数越しに渡してくれる fd 番号を取り出すことができる。

実装

とういわけでこの戦略で試しに実装してみたブランチがこちらにある

(あとで気軽にブランチ消してしまいそうなので patch を gist にも残しておく)

in_forward プラグインに fd オプションを生やして、ファイルディスクリプタから Socket を生成できるようにしている。

--- a/lib/fluent/plugin/in_forward.rb
+++ b/lib/fluent/plugin/in_forward.rb
@@ -25,6 +25,7 @@ module Fluent

     config_param :bind, :string, :default => '0.0.0.0'
+    config_param :fd, :integer, :default => nil # fd for tcp socket
     config_param :backlog, :integer, :default => nil
@@ -65,18 +68,30 @@ module Fluent
     def listen
-      log.info "listening fluent socket on #{@bind}:#{@port}"
-      s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
+      s = if @fd
+            sock = TCPServer.for_fd(@fd)
+            log.info "inherited addr=#{tcp_name(sock)} fd=#{@fd}"
+            Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:on_message))
+          else
+            log.info "listening fluent socket on #{@bind}:#{@port}"
+            Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
+          end
       s.listen(@backlog) unless @backlog.nil?
       s
     end

また、fluent-plugin-multiprocess を使って Fluentd のプロセスを立ち上げようとすると、Process#spawn の仕様により、デフォルトではファイルディスクリプタが閉じられてしまうので、keep_file_descriptors というオプションで閉じなくさせるように拡張している。

--- a/lib/fluent/plugin/in_multiprocess.rb
+++ b/lib/fluent/plugin/in_multiprocess.rb
@@ -65,7 +69,9 @@ module Fluent
         cmd = "#{Shellwords.shellescape(RbConfig.ruby)} #{Shellwords.shellescape(fluentd_rb)} #{pe.cmdline}"
         sleep pe.sleep_before_start if pe.sleep_before_start > 0
         $log.info "launching child fluentd #{pe.cmdline}"
-        pe.process_monitor = @pm.spawn(cmd)
+        options = {:close_others => !@keep_file_descriptors}
+        pe.process_monitor = @pm.spawn(cmd, options)
       end
     end

これで Server::Starter (& Fluentd)を起動し、

$ bundle exec start_server.rb --port=0.0.0.0:24224 -- \
  bundle exec --keep-file-descriptors fluentd -c fluent.conf 2>&1

fluent-cat でデータを流しこんで見ると、

$ echo '{"message":"foo"}' | fluent-cat -p 24224 foo
連打

別のプロセスにデータが振り分けられてうまくいっていることが確認できた╭( ・ㅂ・)و ̑̑

32541 2015-03-17 01:54:23 +0900 foo: {"message":"foo"}
32541 2015-03-17 01:54:30 +0900 foo: {"message":"foo"}
31123 2015-03-17 01:54:34 +0900 foo: {"message":"foo"}
32541 2015-03-17 01:54:35 +0900 foo: {"message":"foo"}
31123 2015-03-17 01:54:36 +0900 foo: {"message":"foo"}
31123 2015-03-17 01:54:37 +0900 foo: {"message":"foo"}
32541 2015-03-17 01:54:38 +0900 foo: {"message":"foo"}

※ 1番左に PID を表示するように out_stdout を一時的にいじっている。

まとめ

Server::Starter を使って複数の Fluentd プロセスで1つのポートを待ち受けることができた。

ちょっとやってみたかっただけで、実際にこれが Fluentd に入るのかというと、たぶんそんなことはなく、Treasure Data の新人さん が、ServerEngine の導入や、SocketManager の実装を鋭意進行中らしいので、そちらをお待ちください mm