課題
Fluentd は GVL のある CRuby でしか(まだ)動かないので、マルチコアを有効に使うためには1つのホストで複数のプロセスを同時に立ち上げる必要がある。
現在の Fluentd では、データ受信側では、複数のプロセスを立ち上げて、それぞれで別のポートを listen(2) して待ち受け、データ送信側では、送信先にホスト名と複数のポートを指定して、クライアント側でラウンドロビンすることで負荷分散する。
イメージ図
host
+---------------------+
| process |
| +-------------+ |
+-------------> | port:24224 | |
| | +-------------+ |
| | |
+----+----+ | +-------------+ |
| client +--------> | port:24225 | |
+----+----+ | +-------------+ |
| | |
| | +-------------+ |
+-------------> | port:24226 | |
| +-------------+ |
+---------------------+
目指す姿
受信側では Unicorn のように1つのポートだけ開いて複数プロセスでリクエストを捌きたい。
host
+----------------------------+
| process |
| +--------+ |
| +------------+ | |
| | +--------+ |
| | |
+----------+ | +--------+ |
| client | ---> port:24224 -- | | |
+----------+ | +--------+ |
| | |
| | +--------+ |
| +------------+ | |
| +--------+ |
+----------------------------+
実現方法
Server::Starter というものがある。それを ruby に移植した ruby-server-starter というものがある(昨日リリースした)。これを使うと、Server::Starter が listen(2) して Socket を生成し、子プロセスとして Fluentd を fork && exec することで、その Socket のファイルディスクリプタを子プロセスである Fluentd に引き継いでくれる。
bundle exec start_server.rb --port=0.0.0.0:24224 -- \
bundle exec --keep-file-descriptors fluentd -c fluent.conf 2>&1
ここで fluent-plugin-multiprocess を利用して、複数の Fluentd プロセスを立ち上げる。
# fluent.conf
<source>
type multiprocess
keep_file_descriptors true
<process>
cmdline -c in_forward.conf
</process>
<process>
cmdline -c in_forward.conf
</process>
</source>in_forward プラグインで、ファイルディスクリプタに対する Socket を生成し、その Socket に対して accept(2) することで複数の Fluentd プロセスから同じポートに対してリクエストを待ち受けることができる
# in_foward.conf
<source>
type forward
fd "#{ENV['SERVER_STARTER_PORT'].split(';').first.split('=').last}"
</source>
<match **>
type stdout
</match
※ ちなみに、Fluentd の v1-config 形式では conf に ruby コードが書けるので、そこでごにょごにょして Server::Starter が環境変数越しに渡してくれる fd 番号を取り出すことができる。
実装
とういわけでこの戦略で試しに実装してみたブランチがこちらにある
- sonots/fluentd/tree/SERVER_STARTER_FD
- sonots/fluent-plugin-multiprocess/tree/keep_file_descriptors (Pull Request済み)
(あとで気軽にブランチ消してしまいそうなので patch を gist にも残しておく)
in_forward プラグインに fd オプションを生やして、ファイルディスクリプタから Socket を生成できるようにしている。
--- a/lib/fluent/plugin/in_forward.rb
+++ b/lib/fluent/plugin/in_forward.rb
@@ -25,6 +25,7 @@ module Fluent
config_param :bind, :string, :default => '0.0.0.0'
+ config_param :fd, :integer, :default => nil # fd for tcp socket
config_param :backlog, :integer, :default => nil
@@ -65,18 +68,30 @@ module Fluent
def listen
- log.info "listening fluent socket on #{@bind}:#{@port}"
- s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
+ s = if @fd
+ sock = TCPServer.for_fd(@fd)
+ log.info "inherited addr=#{tcp_name(sock)} fd=#{@fd}"
+ Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:on_message))
+ else
+ log.info "listening fluent socket on #{@bind}:#{@port}"
+ Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
+ end
s.listen(@backlog) unless @backlog.nil?
s
endまた、fluent-plugin-multiprocess を使って Fluentd のプロセスを立ち上げようとすると、Process#spawn の仕様により、デフォルトではファイルディスクリプタが閉じられてしまうので、keep_file_descriptors というオプションで閉じなくさせるように拡張している。
--- a/lib/fluent/plugin/in_multiprocess.rb
+++ b/lib/fluent/plugin/in_multiprocess.rb
@@ -65,7 +69,9 @@ module Fluent
cmd = "#{Shellwords.shellescape(RbConfig.ruby)} #{Shellwords.shellescape(fluentd_rb)} #{pe.cmdline}"
sleep pe.sleep_before_start if pe.sleep_before_start > 0
$log.info "launching child fluentd #{pe.cmdline}"
- pe.process_monitor = @pm.spawn(cmd)
+ options = {:close_others => !@keep_file_descriptors}
+ pe.process_monitor = @pm.spawn(cmd, options)
end
endこれで Server::Starter (& Fluentd)を起動し、
$ bundle exec start_server.rb --port=0.0.0.0:24224 -- \
bundle exec --keep-file-descriptors fluentd -c fluent.conf 2>&1
fluent-cat でデータを流しこんで見ると、
$ echo '{"message":"foo"}' | fluent-cat -p 24224 foo
連打
別のプロセスにデータが振り分けられてうまくいっていることが確認できた╭( ・ㅂ・)و ̑̑
32541 2015-03-17 01:54:23 +0900 foo: {"message":"foo"}
32541 2015-03-17 01:54:30 +0900 foo: {"message":"foo"}
31123 2015-03-17 01:54:34 +0900 foo: {"message":"foo"}
32541 2015-03-17 01:54:35 +0900 foo: {"message":"foo"}
31123 2015-03-17 01:54:36 +0900 foo: {"message":"foo"}
31123 2015-03-17 01:54:37 +0900 foo: {"message":"foo"}
32541 2015-03-17 01:54:38 +0900 foo: {"message":"foo"}
※ 1番左に PID を表示するように out_stdout を一時的にいじっている。
まとめ
Server::Starter を使って複数の Fluentd プロセスで1つのポートを待ち受けることができた。
ちょっとやってみたかっただけで、実際にこれが Fluentd に入るのかというと、たぶんそんなことはなく、Treasure Data の新人さん が、ServerEngine の導入や、SocketManager の実装を鋭意進行中らしいので、そちらをお待ちください mm
