ZeroMQ の The Guide に載っている Binary Star の Python 版サンプルコードを、Ruby に移植してみた。
移植というか、Ruby と ffi-rzmq の書き方にあわせて直したつもり程度なので、真の意味で Ruby 的かどうかは自信がない。
Binary Star の概要
ZeroMQ を使うにあたっては、どの種類のソケットをどう組み合わせ、どんなメッセージを送り合うかを入念に検討する必要がある。この作業は実質的にプロトコルの設計そのものなので、敷居は高い。
そこで The Guide には、既存の設計パターンの例がいくつか紹介されており、Binary Star もその中に含まれている。このパターンでは、同じプログラムを 2 つ起動させていわゆるホットスタンバイ構成にし、可用性を高めることを目的にしている。
サンプルコードについて
The Guide には Binary Star の実装例として bstarsrv および bstarcli が掲載されている。
bstarsrv は受信したメッセージをそのまま返すだけの、いわゆる echo サーバ的な処理をするサンプルだが、2 つ起動してホットスタンバイ構成にすることができる。
bstarcli は bstarsrv に接続するクライアントのサンプルで、1 秒に 1 回、短いメッセージを bstarsrv に送信するが、bstarsrv のフェールオーバーが発生すると自動的に再接続して処理を続行する。
以下に、The Guide のサイトにある Python 版を Ruby に移植したものを貼る。
元サイトが cc-by-sa 3.0 なので、この Ruby 版も同じということで。
[2014-08-20 追記] The Guide から参照されているサンプルのライセンスは、実は GitHub に LICENSE ファイルとして置いてあったらしい。MIT License。ということで、著作権に関する表示を追加しておく。
Copyright (c) 2010-2013 iMatix Corporation and Contributors
[2014-08-20 追記ここまで]
bstarsrv.rb
サーバ側。オプション -p をつけて起動するとプライマリ、-b をつけて起動するとバックアップになる。
#!/usr/bin/env ruby # coding: utf-8 # Binary Star Server # # Original Author (Python): Dan Colish <dcolish@gmail.com> require 'optparse' require 'rubygems' require 'ffi-rzmq' STATE_PRIMARY = 1 STATE_BACKUP = 2 STATE_ACTIVE = 3 STATE_PASSIVE = 4 PEER_PRIMARY = 1 PEER_BACKUP = 2 PEER_ACTIVE = 3 PEER_PASSIVE = 4 CLIENT_REQUEST = 5 HEARTBEAT = 1000 class BStarState attr_accessor :state, :event, :peer_expiry def initialize(state, event, peer_expiry) @state = state @event = event @peer_expiry = peer_expiry end end class BStarException < Exception end $fsm_states = { STATE_PRIMARY => { PEER_BACKUP => ["I: connected to backup (slave), ready as master", STATE_ACTIVE], PEER_ACTIVE => ["I: connected to backup (master), ready as slave", STATE_PASSIVE] }, STATE_BACKUP => { PEER_ACTIVE => ["I: connected to primary (master), ready as slave", STATE_PASSIVE], CLIENT_REQUEST => ["", false] }, STATE_ACTIVE => { PEER_ACTIVE => ["E: fatal error - dual masters, aborting", false] }, STATE_PASSIVE => { PEER_PRIMARY => ["I: primary (slave) is restarting, ready as master", STATE_ACTIVE], PEER_BACKUP => ["I: backup (slave) is restarting, ready as master", STATE_ACTIVE], PEER_PASSIVE => ["E: fatal error - dual slaves, aborting", false], CLIENT_REQUEST => [CLIENT_REQUEST, true] # Say true, check peer later } } def run_fsm(fsm) # There are some transitional states we do not want to handle state_dict = $fsm_states.fetch(fsm.state, {}) res = state_dict[fsm.event] if not res.nil? msg = res[0] state = res[1] else return end if state == false raise BStarException.new(msg) elsif msg == CLIENT_REQUEST #assert fsm.peer_expiry > 0 if Time.now > fsm.peer_expiry fsm.state = STATE_ACTIVE else raise BStarException.new end else puts msg fsm.state = state end end def main args = {primary: false, backup: false} opt = OptionParser.new opt.on("-p", "--primary") {|p| args[:primary] = true } opt.on("-b", "--backup") {|p| args[:backup] = true } opt.parse!(ARGV) ctx = ZMQ::Context.new(1) statepub = ctx.socket(ZMQ::PUB) statesub = ctx.socket(ZMQ::SUB) statesub.setsockopt(ZMQ::SUBSCRIBE, "") frontend = ctx.socket(ZMQ::ROUTER) fsm = BStarState.new(0, 0, Time.at(0)) if args[:primary] puts "I: Primary master, waiting for backup (slave)" frontend.bind("tcp://*:5001") statepub.bind("tcp://*:5003") statesub.connect("tcp://localhost:5004") fsm.state = STATE_PRIMARY elsif args[:backup] puts "I: Backup slave, waiting for primary (master)" frontend.bind("tcp://*:5002") statepub.bind("tcp://*:5004") statesub.connect("tcp://localhost:5003") statesub.setsockopt(ZMQ::SUBSCRIBE, "") fsm.state = STATE_BACKUP end send_state_at = Time.now + (HEARTBEAT / 1000) poller = ZMQ::Poller.new() poller.register(frontend, ZMQ::POLLIN) poller.register(statesub, ZMQ::POLLIN) loop do time_left = send_state_at - Time.now if time_left < 0 time_left = 0 end poller.poll(time_left) poller.readables.each do |socket| if socket === frontend frontend.recv_strings(msg = []) fsm.event = CLIENT_REQUEST begin run_fsm(fsm) frontend.send_strings(msg) rescue BStarException end end if socket === statesub statesub.recv_string(msg = "") fsm.event = msg.to_i begin run_fsm(fsm) fsm.peer_expiry = Time.now + (2 * HEARTBEAT / 1000) rescue BStarException break end end end if Time.now >= send_state_at statepub.send_string("%d" % fsm.state) send_state_at = Time.now + HEARTBEAT / 1000 end end end if __FILE__ == $0 main end
bstarcli.rb
クライアント側。オプションは特になし。
#!/usr/bin/env ruby # coding: utf-8 require 'rubygems' require 'ffi-rzmq' REQUEST_TIMEOUT = 1000 # msecs SETTLE_DELAY = 2000 # before failing over def main server = ['tcp://localhost:5001', 'tcp://localhost:5002'] server_nbr = 0 ctx = ZMQ::Context.new(1) client = ctx.socket(ZMQ::REQ) client.connect(server[server_nbr]) poller = ZMQ::Poller.new poller.register(client, ZMQ::POLLIN) sequence = 0 loop do client.send_string("%s" % sequence) expect_reply = true while expect_reply received = false poller.poll(REQUEST_TIMEOUT) poller.readables.each do |socket| received = true if socket === client client.recv_string(reply = "") if reply.to_i == sequence puts "I: server replied OK (%s)" % reply expect_reply = false sequence += 1 sleep(1) else puts "E: malformed reply from server: %s" % reply end end end if not received puts "W: no response from server, failing over" sleep(SETTLE_DELAY / 1000) poller.delete(client) client.close() server_nbr = (server_nbr + 1) % 2 puts "I: connecting to server at %s.." % server[server_nbr] client = ctx.socket(ZMQ::REQ) poller.register(client, ZMQ::POLLIN) # reconnect and resend request client.connect(server[server_nbr]) client.send_string("%s" % sequence) end end end end if __FILE__ == $0 main end