Ruby: ZeroMQ の Binary Star パターン

ZeroMQ の The Guide に載っている Binary Star の Python 版サンプルコードを、Ruby に移植してみた。
移植というか、Rubyffi-rzmq の書き方にあわせて直したつもり程度なので、真の意味で Ruby 的かどうかは自信がない。

Binary Star の概要

ZeroMQ を使うにあたっては、どの種類のソケットをどう組み合わせ、どんなメッセージを送り合うかを入念に検討する必要がある。この作業は実質的にプロトコルの設計そのものなので、敷居は高い。
そこで The Guide には、既存の設計パターンの例がいくつか紹介されており、Binary Star もその中に含まれている。このパターンでは、同じプログラムを 2 つ起動させていわゆるホットスタンバイ構成にし、可用性を高めることを目的にしている。

サンプルコードについて

The Guide には Binary Star の実装例として bstarsrv および bstarcli が掲載されている。
bstarsrv は受信したメッセージをそのまま返すだけの、いわゆる echo サーバ的な処理をするサンプルだが、2 つ起動してホットスタンバイ構成にすることができる。
bstarcli は bstarsrv に接続するクライアントのサンプルで、1 秒に 1 回、短いメッセージを bstarsrv に送信するが、bstarsrv のフェールオーバーが発生すると自動的に再接続して処理を続行する。

以下に、The Guide のサイトにある Python 版を Ruby に移植したものを貼る。
元サイトが cc-by-sa 3.0 なので、この Ruby 版も同じということで。

[2014-08-20 追記] The Guide から参照されているサンプルのライセンスは、実は GitHubLICENSE ファイルとして置いてあったらしい。MIT License。ということで、著作権に関する表示を追加しておく。


Copyright (c) 2010-2013 iMatix Corporation and Contributors

[2014-08-20 追記ここまで]

bstarsrv.rb

サーバ側。オプション -p をつけて起動するとプライマリ、-b をつけて起動するとバックアップになる。

#!/usr/bin/env ruby
# coding: utf-8
# Binary Star Server
#
# Original Author (Python): Dan Colish <dcolish@gmail.com>

require 'optparse'
require 'rubygems'
require 'ffi-rzmq'

STATE_PRIMARY = 1
STATE_BACKUP = 2
STATE_ACTIVE = 3
STATE_PASSIVE = 4

PEER_PRIMARY = 1
PEER_BACKUP = 2
PEER_ACTIVE = 3
PEER_PASSIVE = 4
CLIENT_REQUEST = 5

HEARTBEAT = 1000

class BStarState
  attr_accessor :state, :event, :peer_expiry

  def initialize(state, event, peer_expiry)
  @state = state
  @event = event
  @peer_expiry = peer_expiry
  end
end

class BStarException < Exception
end

$fsm_states = {
  STATE_PRIMARY => {
    PEER_BACKUP => ["I: connected to backup (slave), ready as master",
            STATE_ACTIVE],
    PEER_ACTIVE => ["I: connected to backup (master), ready as slave",
            STATE_PASSIVE]
    },
  STATE_BACKUP => {
    PEER_ACTIVE => ["I: connected to primary (master), ready as slave",
            STATE_PASSIVE],
    CLIENT_REQUEST => ["", false]
    },
  STATE_ACTIVE => {
    PEER_ACTIVE => ["E: fatal error - dual masters, aborting", false]
    },
  STATE_PASSIVE => {
    PEER_PRIMARY => ["I: primary (slave) is restarting, ready as master",
             STATE_ACTIVE],
    PEER_BACKUP => ["I: backup (slave) is restarting, ready as master",
            STATE_ACTIVE],
    PEER_PASSIVE => ["E: fatal error - dual slaves, aborting", false],
    CLIENT_REQUEST => [CLIENT_REQUEST, true]  # Say true, check peer later
    }
  }

def run_fsm(fsm)
  # There are some transitional states we do not want to handle
  state_dict = $fsm_states.fetch(fsm.state, {})
  res = state_dict[fsm.event]
  if not res.nil?
    msg = res[0]
    state = res[1]
  else
    return
  end
  if state == false
    raise BStarException.new(msg)
  elsif msg == CLIENT_REQUEST
    #assert fsm.peer_expiry > 0
    if Time.now > fsm.peer_expiry
      fsm.state = STATE_ACTIVE
    else
      raise BStarException.new
    end
  else
    puts msg
    fsm.state = state
  end
end

def main
  args = {primary: false, backup: false}
  opt = OptionParser.new
  opt.on("-p", "--primary") {|p| args[:primary] = true }
  opt.on("-b", "--backup") {|p| args[:backup] = true }
  opt.parse!(ARGV)

  ctx = ZMQ::Context.new(1)
  statepub = ctx.socket(ZMQ::PUB)
  statesub = ctx.socket(ZMQ::SUB)
  statesub.setsockopt(ZMQ::SUBSCRIBE, "")
  frontend = ctx.socket(ZMQ::ROUTER)

  fsm = BStarState.new(0, 0, Time.at(0))

  if args[:primary]
    puts "I: Primary master, waiting for backup (slave)"
    frontend.bind("tcp://*:5001")
    statepub.bind("tcp://*:5003")
    statesub.connect("tcp://localhost:5004")
    fsm.state = STATE_PRIMARY
  elsif args[:backup]
    puts "I: Backup slave, waiting for primary (master)"
    frontend.bind("tcp://*:5002")
    statepub.bind("tcp://*:5004")
    statesub.connect("tcp://localhost:5003")
    statesub.setsockopt(ZMQ::SUBSCRIBE, "")
    fsm.state = STATE_BACKUP
  end

  send_state_at = Time.now + (HEARTBEAT / 1000)
  poller = ZMQ::Poller.new()
  poller.register(frontend, ZMQ::POLLIN)
  poller.register(statesub, ZMQ::POLLIN)

  loop do
    time_left = send_state_at - Time.now
    if time_left < 0
      time_left = 0
    end
    poller.poll(time_left)
    poller.readables.each do |socket|
      if socket === frontend
        frontend.recv_strings(msg = [])
        fsm.event = CLIENT_REQUEST
        begin
          run_fsm(fsm)
          frontend.send_strings(msg)
        rescue BStarException
        end
      end

      if socket === statesub
        statesub.recv_string(msg = "")
        fsm.event = msg.to_i
        begin
          run_fsm(fsm)
          fsm.peer_expiry = Time.now + (2 * HEARTBEAT / 1000)
        rescue BStarException
          break
        end
      end
    end

    if Time.now >= send_state_at
      statepub.send_string("%d" % fsm.state)
      send_state_at = Time.now + HEARTBEAT / 1000
    end
  end
end

if __FILE__ == $0
  main
end

bstarcli.rb

クライアント側。オプションは特になし。

#!/usr/bin/env ruby
# coding: utf-8

require 'rubygems'
require 'ffi-rzmq'

REQUEST_TIMEOUT = 1000  # msecs
SETTLE_DELAY = 2000  # before failing over

def main
  server = ['tcp://localhost:5001', 'tcp://localhost:5002']
  server_nbr = 0
  ctx = ZMQ::Context.new(1)
  client = ctx.socket(ZMQ::REQ)
  client.connect(server[server_nbr])
  poller = ZMQ::Poller.new
  poller.register(client, ZMQ::POLLIN)

  sequence = 0
  loop do
    client.send_string("%s" % sequence)

    expect_reply = true
    while expect_reply
      received = false
      poller.poll(REQUEST_TIMEOUT)
      poller.readables.each do |socket|
        received = true
        if socket === client
          client.recv_string(reply = "")
          if reply.to_i == sequence
            puts "I: server replied OK (%s)" % reply
            expect_reply = false
            sequence += 1
            sleep(1)
          else
            puts "E: malformed reply from server: %s" % reply
          end
        end
      end

      if not received
        puts "W: no response from server, failing over"
        sleep(SETTLE_DELAY / 1000)
        poller.delete(client)
        client.close()
        server_nbr = (server_nbr + 1) % 2
        puts "I: connecting to server at %s.." % server[server_nbr]
        client = ctx.socket(ZMQ::REQ)
        poller.register(client, ZMQ::POLLIN)
        # reconnect and resend request
        client.connect(server[server_nbr])
        client.send_string("%s" % sequence)
      end
    end
  end
end

if __FILE__ == $0
  main
end