Module: MmMq::Publishable
- Includes:
- HashExt
- Defined in:
- lib/mm_mq/publishable.rb
Instance Attribute Summary collapse
Instance Method Summary collapse
-
#connect(options = {}, &block) ⇒ Object
MQ.startを実行します。.
-
#publish(msg, options = {}) ⇒ Object
publish: key : nil mandatory : false immediate : false persistent: false.
-
#queue_or_exchange ⇒ Object
MQ.queueやMQ.fanoutなどを実行します。.
Methods included from HashExt
Instance Attribute Details
#channel(connection = nil) ⇒ Object
46 47 48 |
# File 'lib/mm_mq/publishable.rb', line 46 def channel(connection = nil) @channel ||= MQ.new(connection) end |
Instance Method Details
#connect(options = {}, &block) ⇒ Object
MQ.startを実行します。
32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/mm_mq/publishable.rb', line 32 def connect( = {}, &block) conn = nil EM.run do if conn_opts = HashExt.symbolize_keys(config[:connections] || config['connections']) conn = AMQP.vstart(.update(conn_opts)) else conn = AMQP.start(.update(HashExt.symbolize_keys(config[:connection] || config['connection']) || {})) end channel(conn) block.call if block end end |
#publish(msg, options = {}) ⇒ Object
publish:
key : nil
mandatory : false
immediate : false
persistent: false
95 96 97 98 |
# File 'lib/mm_mq/publishable.rb', line 95 def publish(msg, = {}) opts = .update(symbolize_keys(config[:publish] || config['publish']) || {}) queue_or_exchange.publish(msg.to_msgpack, opts) end |
#queue_or_exchange ⇒ Object
MQ.queueやMQ.fanoutなどを実行します。
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/mm_mq/publishable.rb', line 68 def queue_or_exchange unless @queue_or_exchange opts = symbolize_keys(config[:queue] || config['queue']|| config[:exchange] || config['exchange'] ) || {} opts[:type] ||= 'queue' if config[:queue] || config['queue'] opts[:type] ||= 'direct' if config[:exchange] || config['exchange'] # channel を明示的に指定するため MQ.send を使わず、MQ::Queue と MQ::Exchange を使用しています if (type = opts.delete(:type)) == 'queue' @queue_or_exchange = MQ::Queue.new( channel, opts.delete(:name), opts) else @queue_or_exchange = MQ::Exchange.new( channel, type, opts.delete(:name), opts) end end @queue_or_exchange end |