Module: MmMq::Subscribable
- Includes:
- HashExt
- Defined in:
- lib/mm_mq/subscribable.rb
Instance Method Summary collapse
-
#connect(options = {}, &block) ⇒ Object
MQ.startを実行します。.
-
#exchange ⇒ Object
exchange: type : direct, fanout, topic, headers, system, implementation_defined name : “<required>” key : nil passive : false durable : false auto_delete: false internal : false nowait : false.
-
#queue ⇒ Object
queue: name : “<required>” passive : false durable : false auto_delete: false exclusive : false nowait : false.
- #routing_key ⇒ Object
-
#subscribe(options = {}, &block) ⇒ Object
subscribe: ack : false nowait: false confirm: nil.
- #subscribed ⇒ Object
Methods included from HashExt
Instance Method Details
#connect(options = {}, &block) ⇒ Object
MQ.startを実行します。
32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/mm_mq/subscribable.rb', line 32 def connect( = {}, &block) if conn_opts = HashExt.symbolize_keys(config[:connections] || config['connections']) connections = (conn_opts.delete(:hosts) || []).map do |host_port| host, port = host_port.split(/\:/, 2) port ||= 5672 { :host => host, :port => port.to_i }.update(conn_opts) end else connections = [HashExt.symbolize_keys(config[:connection] || config['connection'])] end connections.each do |conn| AMQP.start(conn, &block) end end |
#exchange ⇒ Object
exchange:
type : direct, fanout, topic, headers, system, implementation_defined
name : "<required>"
key : nil
passive : false
durable : false
auto_delete: false
internal : false
nowait : false
71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/mm_mq/subscribable.rb', line 71 def exchange unless defined?(@exchange) if opts = symbolize_keys(config[:exchange] || config['exchange']) opts[:type] ||= 'direct' @routing_key = opts.delete(:key) @exchange = MQ.send(opts.delete(:type), opts.delete(:name), opts) else @exchange = nil end end @exchange end |
#queue ⇒ Object
queue:
name : "<required>"
passive : false
durable : false
auto_delete: false
exclusive : false
nowait : false
54 55 56 57 58 59 60 |
# File 'lib/mm_mq/subscribable.rb', line 54 def queue unless defined?(@queue) opts = symbolize_keys(config[:queue] || config['queue']) || {} @queue = MQ.queue(opts.delete(:name), opts) end @queue end |
#routing_key ⇒ Object
84 85 86 87 88 89 90 91 92 93 |
# File 'lib/mm_mq/subscribable.rb', line 84 def routing_key unless defined?(@routing_key) if opts = symbolize_keys(config[:exchange] || config['exchange']) @routing_key = opts[:key] else @routing_key = nil end end @routing_key end |
#subscribe(options = {}, &block) ⇒ Object
subscribe:
ack : false
nowait: false
confirm: nil
107 108 109 110 111 112 113 |
# File 'lib/mm_mq/subscribable.rb', line 107 def subscribe( = {}, &block) opts = .update(symbolize_keys(config[:subscribe] || config['subscribe'] || {})) subscribed.subscribe(opts) do |head, msg| block.call(head, MessagePack.unpack(msg)) head.ack if opts[:ack] end end |
#subscribed ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/mm_mq/subscribable.rb', line 95 def subscribed if routing_key exchange ? queue.bind(exchange, :key => routing_key) : queue else exchange ? queue.bind(exchange) : queue end end |