Module: MmMq::Publishable

Includes:
HashExt
Defined in:
lib/mm_mq/publishable.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from HashExt

symbolize_keys

Instance Attribute Details

#channel(connection = nil) ⇒ Object



46
47
48
# File 'lib/mm_mq/publishable.rb', line 46

def channel(connection = nil)
  @channel ||= MQ.new(connection)
end

Instance Method Details

#connect(options = {}, &block) ⇒ Object

MQ.startを実行します。



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/mm_mq/publishable.rb', line 32

def connect(options = {}, &block)
  conn = nil
  EM.run do
    if conn_opts = HashExt.symbolize_keys(config[:connections] || config['connections'])
      conn = AMQP.vstart(options.update(conn_opts))
    else
      conn = AMQP.start(options.update(HashExt.symbolize_keys(config[:connection] || config['connection']) || {}))
    end
    channel(conn)
    block.call if block
  end
end

#publish(msg, options = {}) ⇒ Object

publish:

key       : nil
mandatory : false
immediate : false
persistent: false


95
96
97
98
# File 'lib/mm_mq/publishable.rb', line 95

def publish(msg, options = {})
  opts = options.update(symbolize_keys(config[:publish] || config['publish']) || {})
  queue_or_exchange.publish(msg.to_msgpack, opts)
end

#queue_or_exchangeObject

MQ.queueやMQ.fanoutなどを実行します。



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/mm_mq/publishable.rb', line 68

def queue_or_exchange
  unless @queue_or_exchange
    opts = symbolize_keys(config[:queue] || config['queue']|| config[:exchange] || config['exchange'] ) || {}
    opts[:type] ||= 'queue' if config[:queue] || config['queue']
    opts[:type] ||= 'direct' if config[:exchange] || config['exchange']
    # channel を明示的に指定するため MQ.send を使わず、MQ::Queue と MQ::Exchange を使用しています
    if (type = opts.delete(:type)) == 'queue'
      @queue_or_exchange = MQ::Queue.new(
        channel,
        opts.delete(:name),
        opts)
    else
      @queue_or_exchange = MQ::Exchange.new(
        channel,
        type,
        opts.delete(:name),
        opts)
    end
  end
  @queue_or_exchange
end