Rindaã¨ã¯
Rubyã¯å¤å½©ãªã©ã¤ãã©ãªãæ¨æºæ·»ä»ããã¦ããããããã¤ãã¯è¶ å¼·åãªã©ã¤ãã©ãªã§ããã«ãé¢ããããã»ã¨ãã©ç¥ããã¦ããªããããªãã®ãããã ä¾ãã°PStoreãªã©ã ããRindaã¯å ¨ç§ã«ããã使ããªãã®ãã£ãããªãRubyã©ã¤ãã©ãªé¸ææ¨©ã第ä¸ä½ã®ã©ã¤ãã©ãªã§ããã
Rindaã¯Rubyã®ã¿ãã«ã¹ãã¼ã¹å®è£ ã ã ã¿ãã«ã¹ãã¼ã¹ãã®ãã®ã¯Javaã®Lindaã¨ããã©ã¤ãã©ãªã§æåã§ããã
ã¿ãã«ã¹ãã¼ã¹ã¨ã¯
ã¿ãã«ã¹ãã¼ã¹ã¯ããã»ã¹ééä¿¡ææ³ã®ã²ã¨ã¤ã ã ã¿ãã«ã¹ãã¼ã¹ã使ããã¨ã§ç°¡åã«ä¸¦åå®è¡å¯è½ãªããã°ã©ã æ¸ããã¨ãã§ããã ããããã¿ã¤ãã³ã°ã«ãããªãä»»æã®ã¯ã¼ã«ã¼ã®å¢æ¸ããããã¯ã¼ã¯åæ£ãªã©é£æåº¦ãé«ããã®ãç°¡åã«å®ç¾ã§ããã®ã ã
ã¿ãã«ã¹ãã¼ã¹ã¯ãã¼ã¿ã仲ä»ããã¿ãã«ã¹ãã¼ã¹ã«æ¸ãã ãããã¼ã¿ãä¿æãã¿ãã«ã¹ãã¼ã¹ãããã¼ã¿ãã¨ããã¨ãã§ããã
ã¿ãã«ã¹ãã¼ã¹ã§ã¯æ¬¡ã®5ã¤ã®æä½ãå¯è½:
- out: ã¿ãã«ã¹ãã¼ã¹ã«å¤ãæ¸ãåºã
- in: ã¿ãã«ã¹ãã¼ã¹ããå¤ãåé¤ãããã»ã¹ã«è¿ã (bloking)
- rd: ã¿ãã«ã¹ãã¼ã¹ããå¤ãã³ãã¼ãããã»ã¹ã«è¿ã (blocking)
- inp: inã®non-blockingç
- rdp: rdã®non-brockingç
ãã¿ãã«ã¹ãã¼ã¹ã«æ¸ãããã¿ãã«ã¹ãã¼ã¹ããåããã¨ããæä½ã«ããããã»ã¹ééä¿¡ãå®ç¾ã§ããã ã¿ãã«ã¹ãã¼ã¹ããåãåºãå¤ã¯æ¡ä»¶ãã¤ãããã¨ãã§ããããã«ãã£ã¦èªèº«ã«å¯¾ããå¿çãåãåºãããã種é¡ã®ãã¨ãªãããã¨ãããããã¨ãã§ããã
Rindaã«ãããã¿ãã«ã¹ãã¼ã¹
Rindaã¯ã¿ãã«ã¹ãã¼ã¹ã¨ã®ãã¼ã¿ã®åãæ¸¡ãã«DRbã使ã£ã¦ããã
DRbã¯åæ£Rubyã®ããã®ã¡ã½ããã®ãªã¢ã¼ãå¼ã³åºãæ©è½ã§ããã DRbã使ãã¨Rubyãè¤æ°ã®ãã·ã³ãã¾ããã§ä½¿ã£ã¦ããã
DRbã§ã®ã¡ã½ããå¼ã³åºããã¤ã¾ããããã¯ã¼ã¯ããã¨ãã¯Marshalã使ã£ã¦ããã
Marshalã¯ã©ã¤ãã©ãªã§ã¯ãªãçµã¿è¾¼ã¿ã¯ã©ã¹ãªã®ã ããããã便å©ãªã®ã«å ¨ç¶ä½¿ããã¦ããªãã¯ã©ã¹ã§ããã å ·ä½çã«ã¯
str = Marshal.dump(obj)ã®ããã«ãã¦ãªãã¸ã§ã¯ããæåååãããã¼ã¿ãå¾ãããã
File.open("foo.marshal", "w") {|f| Marshal.dump(obj, f) }ã®ããã«ãã¦ãã¡ã¤ã«ã«æ¸ãåºããã¨ãã§ãããã
obj = Marshal.load(File.read("foo.marshal"))ã®ããã«ãªãã¸ã§ã¯ãã«æ»ããã¨ãã§ããã
Marshalã¯ã»ã¨ãã©ã®çµã¿è¾¼ã¿ã¯ã©ã¹ãæ±ããã¨ãã§ããããããæ¸ãã°ã«ã¹ã¿ã ã¯ã©ã¹ã使ããã¨ãå¯è½ã ã
DRbã¯Marshalã使ããããRindaãå½ç¶ãªããMarshalã使ããã¨ã«ãªããå¹ åºããªãã¸ã§ã¯ããã¿ãã«ã¹ãã¼ã¹ãä»ãã¦ããã¨ããããã¨ãã§ããããã®ãããã¨ã¦ã¤ããªãå¼·åã ã
Rindaã¯rd, inp, rdpç¸å½ã®æä½ããªããé常ã«ã·ã³ãã«ã«Rinda::TupleSpaceProxyã®writeã§outæä½ãtakeã§inæä½ã¨ãªã£ã¦ããã
ããã«DRb.uriã使ããã¨ã«ããã¡ãã»ã¼ã¸ã®ããã¨ãã䏿ã«ç¹å®å¯è½ã ã
ã¾ããDRbã¯ããããTCPã使ãããã«ãªã£ã¦ãããããã«ãã£ã¦Rindaã使ãã¨ãããã¯ã¼ã¯åæ£ãå¯è½ã«ãªãã
Rindaã使ãè¨è¨
Rindaãµã¼ãã¼ãã¤ã¾ãã¿ãã«ã¹ãã¼ã¹ã¯ã²ã¨ã¤ã§ããã é常ã«ã·ã³ãã«ã«
require 'drb/drb'
require 'rinda/tuplespace'
uri = ARGV.shift
DRb.start_service(uri, Rinda::TupleSpace.new)
puts DRb.uri
DRb.thread.joinã®ãããªã³ã¼ãã«ãªãã
ããã¯å¤ãåãåããä¿æããåãåºãã¿ãã«ã¹ãã¼ã¹ãã®ãã®ã®ã³ã¼ãã§ããã
ããã«å¯¾ãã¦ãwriteãtakeããã¯ã©ã¤ã¢ã³ãããã»ã¹ã¯ãã©ã®ãããªå½¢ã§ãã£ã¦ãããã
ãããåç´ã«ã¯writeããããã°ã©ã ã¨takeããããã°ã©ã ã®ãµãã¤ã§åãæ¸¡ãããå½¢å¼ã ããAããã°ã©ã ãwriteãã¦takeããBããã°ã©ã ãtakeãã¦writeããããã«ããã¨ãAããã°ã©ã ãå¦çãè¦è«ããBããã°ã©ã ãå¦çãã¦å¤ãè¿ããAããã°ã©ã ãçµæãåãåãã¨ãããã®ã«ãªãã
Rindaã§éè¦ãªãã¤ã³ãã¯ããããããã¾ã§ã¿ãã«ã¹ãã¼ã¹ã«å¯¾ããæ¸ãè¾¼ã¿ã¨åãåºãã§ããã¨ãããã¨ã ã ãã®ãããæ¸ãè¾¼ãããã»ã¹ãåãåºãããã»ã¹ãããã¤ãã£ã¦ãåé¡ã¯ãªããé対称ãªãããã¦ä»»ææ°ã®ä¸¦åå®è¡ãå¯è½ã§ããã
ç°¡åãªãµã³ãã«
次ã®ããã°ã©ã ã¯ã弿°ã«ä¸ãããããµãã¤ã®ãã¡ã¤ã«ãã¹ãã¿ãã«ã¹ãã¼ã¹ã«æ¸ãåºãã
require 'drb/drb'
require 'rinda/rinda'
uri = "drb://fooserver.local:40121"
sourcefile = ARGV.shift
destfile = ARGV.shfit
DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))
ts.write(["ffmpeg", {source: sourcefile, dest: destfile}])ãã®ããã°ã©ã ã¯ã¿ãã«ã¹ãã¼ã¹ã«æ¸ãã ãã§ä¸ç¬ã§å®è¡ã¯çµäºããã 並ååããæå³ã¯ä¹ãããããã¨ãåæã«å®è¡ããã¨ãã¦ãæ¯éã¯ãªãã ã¤ã¾ãããã®ããã°ã©ã ãå¼ã³åºãããã°ã©ã ã¯ä¸¦åã«å®è¡ã§ããã
次ã®2ã¤ç®ã®ããã°ã©ã ã¯ãã¿ãã«ã¹ãã¼ã¹ããå¤ãåãåºããffmpegã«ãã£ã¦å¤æããã
require 'drb/drb'
require 'rinda/rinda'
uri = "drb://fooserver.local:40121"
DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new(nil, uri))
while arg = ts.take(["ffmpeg", nil])
system("ffmpeg", "-i", arg[1][0], "-c:v", "libvpx-vp9", "-crf", "38", "-c:a", "libopus", "-b:a", "128k", arg[1][1])
endlibvpxã«ãã夿å¦çã¯éãå¦çã§ãããããªãã®æéããããã ãããä¸¦åæ§ã¯æé«ã§ã¯ãªããããã¡ãã¼ã³ã¢ã®ãã·ã³ã§ããã°ããã¤ãã®ããã»ã¹ã並åå®è¡ã§ããã ããã ã¾ããè¤æ°ã®ãã·ã³ã§ã®ä¸¦åå®è¡ããã¦ããããªãã
æ¢ã«è¿°ã¹ãéããã¿ãã«ã¹ãã¼ã¹ã使ãããã°ã©ã ã¯ããã¤ä¸¦åã§èµ·åãã¦ããã¾ããªãããRindaã¯ãããã¯ã¼ã¯ããã«ãå©ç¨ã§ããã ããã»ã¹ãèµ·åããæ°ã ã忣å¦çãããã®ã ã
ãªãããã®å ´åãããã¯ã¼ã¯åæ£ããã«ã¯ãåããã¡ã¤ã«ãã¹ã§ãã¡ã¤ã«ã«ã¢ã¯ã»ã¹ã§ããããã«ããæ¹æ³ã¯å¥ã«ç¨æããªããã°ãªããªããã¨ã«æ³¨æãã¦ã»ããã
ããã§ã¯ffmpegã«ããããåãæ¸¡ããã®ã弿°ã¨èããsystemãä»»æã®æéã®ãããå¦çã«ãããã¨ãã§ããã
Rindaã®å©ç¨
ç§ãæ¸ããããã°ã©ã ã§ã¯ããªã¼ãã³ã½ã¼ã¹ã§ã¯ãªããPlutoãRindaãæ¡ç¨ãã¦ããã
Plutoã¯å é¨ã§Cairo/Pangoãç¨ããçµçãPopplerãç¨ããPDF夿ãè¡ã£ã¦ããããããã®å¦çãããªãéãã ãããããããã®æ®µéãã¨ã«åãæ¸¡ããã¨ã§åãã§ã¼ãºãç´äº¤çã«ä¸¦åå¦çããããã«ã§ããã
Rindaæ¡ç¨ã®çç±ã¯ã䏦忧ãã³ã³ããã¼ã«ããããããµã¼ãã¼å¢å¼·ãç°¡åã«ç¹æ ã§ãããã¨ããã®ã大ããã ããã«ã1å°ã§è¶³ããªãå ´åã¯ã¯ã©ã¹ã¿åãå¯è½ã ããã£ã¨ãããã®å ´åãã¡ã¤ã«IOããããã¯ã¼ã¯ããã«ãããã¨ã«ãªãããã¡ã¤ã«IOãããªãéã(大ããªãã¡ã¤ã«ã«ãªã)ãã®ã§ããããã100Mbpsã®ConoHa VPS LANã§ã¯å¿ãã¨ãªããã§ããã°ãããã¯ã¼ã¯ããã«ã¯ãããããªãã£ããã
ã¾ããå ã®ä¾ã¨ã»ã¨ãã©åãå 容ã§ãã²ã¼ã ã®ãã¬ã¤åç»ã夿ããã¹ã¯ãªãããRindaãæ¡ç¨ããã ä¾ã®éããlibvpxãç¨ããVP9ã¸ã®å¤æãéããç§ã¯å¼·åãªãã·ã³ãè¤æ°ä¿æããããã並åã§åç»å¦çããããã¨èãããã¨ãããRindaãæ¡ç¨ããã å½åã¯ç´ç²ãªTCPãµã¼ãã¼ãã¥ã¼ãå®è£ ãããã¨èããã®ã ãã対象ã®ãªãã¼ããé£ãããã¨ã¨ãRindaã使ã£ãã»ããç°¡åãªã³ã¼ãã«ãªããããRindaãæ¡ç¨ããã
åç´ã«ã¯æ¬¡ã®ãããªã³ã¼ãã§ãæ¨æºå ¥åã®å¤ãã©ã¦ã³ãããã³å¼ã«ããã»ã¹ã«æ¸¡ãã
COMMAND = "cat -n"
ps = []
nproc = 5
nrpoc.times {|i| ps.push IO.popen(COMMAND, "w")}
i = 0
STDIN.each do |line|
ps[i].puts line
i = ( i + 1 ) % ps.length
endããã§ä¸¦ååã¯ã§ããããåã ã®ã³ãã³ãã®å®è¡æéã«ã ã©ãããã¨ãæé©ãªä¸¦ååã¨ã¯è¨ãé£ãã ããããã«å¼ã«å¤ããã°ããããã¯æ¹åããã
require 'socket'
args = ARGF.each.map {|i| i.chomp }.to_a
serv = UNIXServer.new("/run/para1.sock")
while s = serv.accept
s.puts args.shift
s.close
break if args.empty?
end
serv.closerequire 'socket'
s = UNIXSocket.new("/run/para1.sock")
while arg = s.gets
system("command", arg)
endãã¡ãããZeroMQãä½¿ãæ¹æ³ããããããã¯ãããã¯ã¼ã¯åæ£ãã§ããããã«ãªãã
require 'ffi-rzmq'
args = ARGF.each.map {|i| i.chomp }.to_a
cx = ZMQ::Context.new
sock = cx.socket(ZMQ::REP)
sock.bind("tcp://127.0.0.1:40000")
loop do
sock.recv_string
sock.send_string(args.empty? ? ":exit" : args.shift)
endrequire 'ffi-rzmq'
cx = ZMQ::Context.new
sock = cx.socket(ZMQ::REQ)
sock.connect("tcp://127.0.0.1:40000")
while sleep 10
sock.send_string("PLZ")
s = ""
rc = sock.recv_string s
break if s == ":exit"
puts "WOIRKER RECEIVED: #{s}"
endã ããZeroMQã«å¯¾ãã¦Rindaã使ãã¡ãªããã¨ãã¦
- Rindaã¯æ¨æºæ·»ä»ã©ã¤ãã©ãªã§ãããã»ããã¢ãããæ¥½
- ZeroMQã«ã¸ã£ã¹ããã£ããããã¢ãã«ã§ãªãéããå¤ãã®å ´åRindaã®ã»ããçãæ¸ãã
- Rindaã®ã»ããæè»ã§ãç°¡åã«ä»»ææ°ã®å¢æ¸ã§ãã並åå¦çãæ¸ãããZeroMQã ã¨ãããªãã«è¨è¨ãå¿ è¦ã§ãæè»ãªå¢æ¸ã®ããã«ã¯ãã䏿®µæ§æãå¢ããå¿ è¦ããããé·ãè¤éã«ãªã
- ZeroMQã®å ´åãã·ãªã¢ã©ã¤ãº/ãã·ãªã¢ã©ã¤ãºã¯æåã§ããå¿ è¦ããããRindaãªããªãã¸ã§ã¯ãããã®ã¾ã¾æ¸¡ããã¨ãã§ãã
- è¤æ°ç¨®é¡ã®ã¡ãã»ã¼ã¸ããã¨ããããå ´åãZeroMQã¯ããã ããµã¼ãã¼ãå¢ããå¿ è¦ããããRindaã¯ã²ã¨ã¤ã®ã¿ãã«ã¹ãã¼ã¹ã§ãã¹ã¦å¦çã§ãã
ã¨ãã£ããã¨ãæããããã
Rindaã®æ³¨æç¹
Rindaã®æå¤§ã®æ³¨æç¹ã¯ãFIFOã§ã¯ãªããã¨ãããã¨ã ã
Rindaã使ãå ´åãæ¡ä»¶ã«ä¸è´ããã©ã®ãªãã¸ã§ã¯ããè¿ããã¨ããã®ã¯äºæã§ããªãã ãæ¡ä»¶ã«ä¸è´ããããããã®ãªãã¸ã§ã¯ããåãåºãã®ã ã
ãã®ãã¨ãã並åå¦çã«ããã¦å ¥åãããé åºãå®ãå¿ è¦ãããå ´åãããªãé£ãã(ãªããã¤ä¸¦åæ§ã¾ãã¯æè»æ§ãããããæãã)æ¹æ³ãå¿ è¦ã«ãªãã
ãRindaã使ãã鏿è¢ãåºãå ´é¢
åæã¨ãã¦ãããã»ã¹ééä¿¡ãå¿ è¦ã§ãããã¨ããç¶æ³ã§ããã¨ãã«Rindaã¯ç»å ´ããã
便å©ãªããã»ã¹ééä¿¡ã©ã¤ãã©ãªã¨ãã¦æããããã®ã¯ããã¯ãZeroMQã ããã ZeroMQã¯ä¾¿å©ã ããããæè»ãªä¸¦åå®è¡ãç¹ã«ãå¤å¯¾å¤ãã¢ãã«ã«ã¯ãã¾ãé©ãã¦ããªãã
è¤æ°ã¯ã¼ã«ã¼ããè¤æ°ã¯ã¼ã«ã¼ã«åãæ¸¡ããããªãã®ã§ãã£ãããããç°¡åã«æè»ã«ä¸¦åå®è¡ã§ããããã«ãããå ´åãRindaãæ¡ç¨ããã¨ãã¾ããããã¨ãå¤ãã
çµã³
Rindaã®å¼·åãã¨ãã¦ã¯ããªãã¨ãã£ã¦ãåãæ¸¡ããæ¸ãã ãã§ãæè»ãªä¸¦åå®è¡ãå¯è½ã«ãªããã¨ããã®ã大ããã ãã®ãããªä¸¦åå¦çç¨ã©ã¤ãã©ãªãæ¨æºã§åå¨ãããã¨ããçºæ³ããã¾ããªãã®ãããã¾ãã«ãå©ç¨ããã¦ããªãã©ã¤ãã©ãªã§ã¯ããããå®éã¯è¶ å¼·åã ã
ã¿ãã«ã¹ãã¼ã¹ã«ããåãæ¸¡ãã³ã¹ãã¯é常ã«ä½ããèªèº«ãã·ã³ãã«ã«ãå¦çãã¹ã対象ãåãåããå¦çããå®äºãããã¿ãã«ã¹ãã¼ã¹ã«éç¥ãããã¨ããåä½ãããã¹ã¯ãªãããæ¸ãã°ãããã ãã§ããã»ããµæ°ã®å¢å ã«å¯¾ãã¦ãªãã¢ãªå¦çæ§è½å¢å ãæããã¹ã¯ãªãããæ¸ããã¨ãã§ããã ããã¯ãé常ã«ç°¡åã ã
ãã®ããã«ããããã®æ®µéãç´äº¤ããå¾ ã¡åãããããªãè¨è¨ã¯ä¸¦åå¦çã«ãããé«ããã©ã¼ãã³ã¹ãçºæ®ã§ããã ãã§ãªããOrbital designã¨ã®ç¸æ§ãè¯ãã
ããã°ã©ã ãå°ããä¿ã¤ãã¨ãã§ããã®ãã¡ãªããã ã ããã«å°ããªã¹ã¯ãªãããå¿ è¦ã«å¿ãã¦Systemdã§èµ·åãããã¨ããLinuxãããã¢ããã¼ãã«ãé©ãã¦ããã
ãªã«ãããã¯Rubyããã°ã©ãã ãã«å©çãããããã§ã¯ãªãã
ä¾ã«ããã¦ffmpegãåºããããã«ãsystem, IO.popenããããã¯UNIXãã¡ã¤ã³ã½ã±ãããªã©ã使ããã¨ã«ãã£ã¦ä»ã®è¨èªã§æ¸ãããããã°ã©ã ã並ååããã®ã«ãå½¹ã«ç«ã¤ã
ãã®ããã«å¼·åãªã©ã¤ãã©ãªã§ããRindaããã²ä½¿ã£ã¦ã¿ã¦ã»ããã