Class: Twingly::AMQP::Subscription
- Inherits:
-
Object
- Object
- Twingly::AMQP::Subscription
- Defined in:
- lib/twingly/amqp/subscription.rb
Instance Method Summary collapse
- #before_handle_message(&block) ⇒ Object
- #cancel! ⇒ Object
- #cancel? ⇒ Boolean
- #each_message(blocking: true, &block) ⇒ Object
-
#initialize(queue_name:, exchange_topic: nil, routing_key: nil, routing_keys: nil, consumer_threads: 1, prefetch: 20, connection: Connection.instance, max_length: nil, queue_type: :quorum) ⇒ Subscription
constructor
A new instance of Subscription.
- #message_count ⇒ Object
- #on_error(&block) ⇒ Object
- #on_exception(&block) ⇒ Object
- #raw_queue ⇒ Object
Constructor Details
#initialize(queue_name:, exchange_topic: nil, routing_key: nil, routing_keys: nil, consumer_threads: 1, prefetch: 20, connection: Connection.instance, max_length: nil, queue_type: :quorum) ⇒ Subscription
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/twingly/amqp/subscription.rb', line 4 def initialize(queue_name:, exchange_topic: nil, routing_key: nil, routing_keys: nil, consumer_threads: 1, prefetch: 20, connection: Connection.instance, max_length: nil, queue_type: :quorum) @queue_name = queue_name @exchange_topic = exchange_topic @routing_keys = Array(routing_keys || routing_key) @consumer_threads = consumer_threads @prefetch = prefetch @max_length = max_length @queue_type = queue_type @cancel = false @consumer = nil @blocking = false if routing_key warn "[DEPRECATION] `routing_key` is deprecated. "\ "Please use `routing_keys` instead." end @channel = create_channel(connection) @queue = create_queue if @exchange_topic && @routing_keys.any? exchange = @channel.topic(@exchange_topic, durable: true) @routing_keys.each do |routing_key| @queue.bind(exchange, routing_key: routing_key) end end = proc {} @on_exception_callback = proc {} @on_error_callback = proc {} end |
Instance Method Details
#before_handle_message(&block) ⇒ Object
51 52 53 |
# File 'lib/twingly/amqp/subscription.rb', line 51 def (&block) = block end |
#cancel! ⇒ Object
75 76 77 78 |
# File 'lib/twingly/amqp/subscription.rb', line 75 def cancel! @consumer.cancel unless @blocking @cancel = true end |
#cancel? ⇒ Boolean
71 72 73 |
# File 'lib/twingly/amqp/subscription.rb', line 71 def cancel? @cancel end |
#each_message(blocking: true, &block) ⇒ Object
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/twingly/amqp/subscription.rb', line 40 def (blocking: true, &block) @blocking = blocking @consumer = create_consumer(&block) if @blocking sleep 0.01 until cancel? @consumer.cancel end end |
#message_count ⇒ Object
63 64 65 |
# File 'lib/twingly/amqp/subscription.rb', line 63 def @queue.status.fetch(:message_count) end |
#on_error(&block) ⇒ Object
59 60 61 |
# File 'lib/twingly/amqp/subscription.rb', line 59 def on_error(&block) @on_error_callback = block end |
#on_exception(&block) ⇒ Object
55 56 57 |
# File 'lib/twingly/amqp/subscription.rb', line 55 def on_exception(&block) @on_exception_callback = block end |
#raw_queue ⇒ Object
67 68 69 |
# File 'lib/twingly/amqp/subscription.rb', line 67 def raw_queue @queue end |