Class: MessageDriver::Adapters::BunnyAdapter::QueueDestination

Inherits:
Destination show all
Defined in:
lib/message_driver/adapters/bunny_adapter.rb

Instance Attribute Summary

Attributes inherited from Destination::Base

#adapter, #dest_options, #message_props, #name

Instance Method Summary collapse

Methods inherited from Destination

#publish_params

Methods inherited from Destination::Base

#consumer_count, #initialize, #message_count, #middleware, #pop_message, #publish

Constructor Details

This class inherits a constructor from MessageDriver::Destination::Base

Instance Method Details

#after_initialize(adapter_context) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 59

def after_initialize(adapter_context)
  if @dest_options[:no_declare]
    if @name.empty?
      raise MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true'
    end
    if @dest_options[:bindings]
      raise MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true'
    end
  else
    adapter_context.with_channel(false) do |ch|
      bunny_queue(ch, init: true)
    end
  end
end

#bunny_queue(channel, options = {}) ⇒ Object



74
75
76
77
78
79
80
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 74

def bunny_queue(channel, options = {})
  opts = @dest_options.dup
  opts[:passive] = options[:passive] if options.key? :passive
  queue = channel.queue(@name, opts)
  handle_queue_init(queue) if options.fetch(:init, false)
  queue
end

#exchange_nameObject



92
93
94
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 92

def exchange_name
  ''
end

#handle_consumer_countObject



110
111
112
113
114
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 110

def handle_consumer_count
  current_adapter_context.with_channel(false) do |ch|
    bunny_queue(ch, passive: true).consumer_count
  end
end

#handle_message_countObject



100
101
102
103
104
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 100

def handle_message_count
  current_adapter_context.with_channel(false) do |ch|
    bunny_queue(ch, passive: true).message_count
  end
end

#handle_queue_init(queue) ⇒ Object



82
83
84
85
86
87
88
89
90
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 82

def handle_queue_init(queue)
  @name = queue.name
  if (bindings = @dest_options[:bindings])
    bindings.each do |bnd|
      raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
      queue.bind(bnd[:source], bnd[:args] || {})
    end
  end
end

#purgeObject



116
117
118
119
120
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 116

def purge
  current_adapter_context.with_channel(false) do |ch|
    bunny_queue(ch).purge
  end
end

#routing_key(_properties) ⇒ Object



96
97
98
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 96

def routing_key(_properties)
  @name
end

#subscribe(options = {}, &consumer) ⇒ Object



106
107
108
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 106

def subscribe(options = {}, &consumer)
  current_adapter_context.subscribe(self, options, &consumer)
end