Class: Twingly::AMQP::Subscription
- Inherits:
-
Object
- Object
- Twingly::AMQP::Subscription
- Defined in:
- lib/twingly/amqp/subscription.rb
Instance Method Summary collapse
- #before_handle_message(&block) ⇒ Object
- #cancel! ⇒ Object
- #cancel? ⇒ Boolean
- #each_message(blocking: true, &block) ⇒ Object
-
#initialize(queue_name:, exchange_topic: nil, routing_key: nil, routing_keys: nil, consumer_threads: 1, prefetch: 20, connection: Connection.instance, max_length: nil, queue_type: :quorum) ⇒ Subscription
constructor
A new instance of Subscription.
- #message_count ⇒ Object
- #on_exception(&block) ⇒ Object
- #raw_queue ⇒ Object
Constructor Details
#initialize(queue_name:, exchange_topic: nil, routing_key: nil, routing_keys: nil, consumer_threads: 1, prefetch: 20, connection: Connection.instance, max_length: nil, queue_type: :quorum) ⇒ Subscription
Returns a new instance of Subscription.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/twingly/amqp/subscription.rb', line 4 def initialize(queue_name:, exchange_topic: nil, routing_key: nil, routing_keys: nil, consumer_threads: 1, prefetch: 20, connection: Connection.instance, max_length: nil, queue_type: :quorum) @queue_name = queue_name @exchange_topic = exchange_topic @routing_keys = Array(routing_keys || routing_key) @consumer_threads = consumer_threads @prefetch = prefetch @max_length = max_length @queue_type = queue_type @cancel = false @consumer = nil @blocking = false if routing_key warn "[DEPRECATION] `routing_key` is deprecated. "\ "Please use `routing_keys` instead." end @channel = create_channel(connection) @queue = create_queue if @exchange_topic && @routing_keys.any? exchange = @channel.topic(@exchange_topic, durable: true) @routing_keys.each do |routing_key| @queue.bind(exchange, routing_key: routing_key) end end @before_handle_message_callback = proc {} @on_exception_callback = proc {} end |
Instance Method Details
#before_handle_message(&block) ⇒ Object
50 51 52 |
# File 'lib/twingly/amqp/subscription.rb', line 50 def (&block) @before_handle_message_callback = block end |
#cancel! ⇒ Object
70 71 72 73 |
# File 'lib/twingly/amqp/subscription.rb', line 70 def cancel! @consumer.cancel unless @blocking @cancel = true end |
#cancel? ⇒ Boolean
66 67 68 |
# File 'lib/twingly/amqp/subscription.rb', line 66 def cancel? @cancel end |
#each_message(blocking: true, &block) ⇒ Object
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/twingly/amqp/subscription.rb', line 39 def (blocking: true, &block) @blocking = blocking @consumer = create_consumer(&block) if @blocking sleep 0.01 until cancel? @consumer.cancel end end |
#message_count ⇒ Object
58 59 60 |
# File 'lib/twingly/amqp/subscription.rb', line 58 def @queue.status.fetch(:message_count) end |
#on_exception(&block) ⇒ Object
54 55 56 |
# File 'lib/twingly/amqp/subscription.rb', line 54 def on_exception(&block) @on_exception_callback = block end |
#raw_queue ⇒ Object
62 63 64 |
# File 'lib/twingly/amqp/subscription.rb', line 62 def raw_queue @queue end |