Class: MessageDriver::Adapters::BunnyAdapter::QueueDestination
Instance Attribute Summary
#adapter, #dest_options, #message_props, #name
Instance Method Summary
collapse
Methods inherited from Destination
#publish_params
#consumer_count, #initialize, #message_count, #middleware, #pop_message, #publish
Instance Method Details
#after_initialize(adapter_context) ⇒ Object
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 59
def after_initialize(adapter_context)
if @dest_options[:no_declare]
if @name.empty?
raise MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true'
end
if @dest_options[:bindings]
raise MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true'
end
else
adapter_context.with_channel(false) do |ch|
bunny_queue(ch, init: true)
end
end
end
|
#bunny_queue(channel, options = {}) ⇒ Object
74
75
76
77
78
79
80
|
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 74
def bunny_queue(channel, options = {})
opts = @dest_options.dup
opts[:passive] = options[:passive] if options.key? :passive
queue = channel.queue(@name, opts)
handle_queue_init(queue) if options.fetch(:init, false)
queue
end
|
#exchange_name ⇒ Object
92
93
94
|
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 92
def exchange_name
''
end
|
#handle_consumer_count ⇒ Object
110
111
112
113
114
|
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 110
def handle_consumer_count
current_adapter_context.with_channel(false) do |ch|
bunny_queue(ch, passive: true).consumer_count
end
end
|
#handle_message_count ⇒ Object
100
101
102
103
104
|
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 100
def handle_message_count
current_adapter_context.with_channel(false) do |ch|
bunny_queue(ch, passive: true).message_count
end
end
|
#handle_queue_init(queue) ⇒ Object
82
83
84
85
86
87
88
89
90
|
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 82
def handle_queue_init(queue)
@name = queue.name
if (bindings = @dest_options[:bindings])
bindings.each do |bnd|
raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
queue.bind(bnd[:source], bnd[:args] || {})
end
end
end
|
#purge ⇒ Object
116
117
118
119
120
|
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 116
def purge
current_adapter_context.with_channel(false) do |ch|
bunny_queue(ch).purge
end
end
|
#routing_key(_properties) ⇒ Object
96
97
98
|
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 96
def routing_key(_properties)
@name
end
|
#subscribe(options = {}, &consumer) ⇒ Object
106
107
108
|
# File 'lib/message_driver/adapters/bunny_adapter.rb', line 106
def subscribe(options = {}, &consumer)
current_adapter_context.subscribe(self, options, &consumer)
end
|