Module: MmMq::Subscribable

Includes:
HashExt
Defined in:
lib/mm_mq/subscribable.rb

Instance Method Summary collapse

Methods included from HashExt

symbolize_keys

Instance Method Details

#connect(options = {}, &block) ⇒ Object

MQ.startを実行します。



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/mm_mq/subscribable.rb', line 32

def connect(options = {}, &block)
  if conn_opts = HashExt.symbolize_keys(config[:connections] || config['connections'])
    connections = (conn_opts.delete(:hosts) || []).map do |host_port|
      host, port = host_port.split(/\:/, 2)
      port ||= 5672
      { :host => host, :port => port.to_i }.update(conn_opts)
    end
  else
    connections = [HashExt.symbolize_keys(config[:connection] || config['connection'])]
  end
  connections.each do |conn|
    AMQP.start(conn, &block)
  end
end

#exchangeObject

exchange:

type       : direct, fanout, topic, headers, system, implementation_defined
name       : "<required>"
key        : nil
passive    : false
durable    : false
auto_delete: false
internal   : false
nowait    : false


71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/mm_mq/subscribable.rb', line 71

def exchange
  unless defined?(@exchange)
    if opts = symbolize_keys(config[:exchange] || config['exchange'])
      opts[:type] ||= 'direct'
      @routing_key = opts.delete(:key)
      @exchange = MQ.send(opts.delete(:type), opts.delete(:name), opts)
    else
      @exchange = nil
    end
  end
  @exchange
end

#queueObject

queue:

name       : "<required>"
passive    : false
durable    : false
auto_delete: false
exclusive   : false
nowait    : false


54
55
56
57
58
59
60
# File 'lib/mm_mq/subscribable.rb', line 54

def queue
  unless defined?(@queue)
    opts = symbolize_keys(config[:queue] || config['queue']) || {}
    @queue = MQ.queue(opts.delete(:name), opts)
  end
  @queue
end

#routing_keyObject



84
85
86
87
88
89
90
91
92
93
# File 'lib/mm_mq/subscribable.rb', line 84

def routing_key
  unless defined?(@routing_key)
    if opts = symbolize_keys(config[:exchange] || config['exchange'])
      @routing_key = opts[:key]
    else
      @routing_key = nil
    end
  end
  @routing_key
end

#subscribe(options = {}, &block) ⇒ Object

subscribe:

ack    : false
nowait: false
confirm: nil


107
108
109
110
111
112
113
# File 'lib/mm_mq/subscribable.rb', line 107

def subscribe(options = {}, &block)
  opts = options.update(symbolize_keys(config[:subscribe] || config['subscribe'] || {}))
  subscribed.subscribe(opts) do |head, msg|
    block.call(head, MessagePack.unpack(msg))
    head.ack if opts[:ack]
  end
end

#subscribedObject



95
96
97
98
99
100
101
# File 'lib/mm_mq/subscribable.rb', line 95

def subscribed
  if routing_key
    exchange ? queue.bind(exchange, :key => routing_key) : queue
  else
    exchange ? queue.bind(exchange) : queue
  end
end