Class: Twingly::AMQP::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/twingly/amqp/subscription.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue_name:, exchange_topic: nil, routing_key: nil, routing_keys: nil, consumer_threads: 1, prefetch: 20, connection: Connection.instance, max_length: nil, queue_type: :quorum) ⇒ Subscription



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/twingly/amqp/subscription.rb', line 4

def initialize(queue_name:, exchange_topic: nil, routing_key: nil,
               routing_keys: nil, consumer_threads: 1, prefetch: 20,
               connection: Connection.instance, max_length: nil,
               queue_type: :quorum)
  @queue_name       = queue_name
  @exchange_topic   = exchange_topic
  @routing_keys     = Array(routing_keys || routing_key)
  @consumer_threads = consumer_threads
  @prefetch         = prefetch
  @max_length       = max_length
  @queue_type       = queue_type
  @cancel           = false
  @consumer         = nil
  @blocking         = false

  if routing_key
    warn "[DEPRECATION] `routing_key` is deprecated. "\
         "Please use `routing_keys` instead."
  end

  @channel = create_channel(connection)
  @queue   = create_queue

  if @exchange_topic && @routing_keys.any?
    exchange = @channel.topic(@exchange_topic, durable: true)

    @routing_keys.each do |routing_key|
      @queue.bind(exchange, routing_key: routing_key)
    end
  end

  @before_handle_message_callback = proc {}
  @on_exception_callback          = proc {}
  @on_error_callback              = proc {}
end

Instance Method Details

#before_handle_message(&block) ⇒ Object



51
52
53
# File 'lib/twingly/amqp/subscription.rb', line 51

def before_handle_message(&block)
  @before_handle_message_callback = block
end

#cancel!Object



75
76
77
78
# File 'lib/twingly/amqp/subscription.rb', line 75

def cancel!
  @consumer.cancel unless @blocking
  @cancel = true
end

#cancel?Boolean



71
72
73
# File 'lib/twingly/amqp/subscription.rb', line 71

def cancel?
  @cancel
end

#each_message(blocking: true, &block) ⇒ Object



40
41
42
43
44
45
46
47
48
49
# File 'lib/twingly/amqp/subscription.rb', line 40

def each_message(blocking: true, &block)
  @blocking = blocking
  @consumer = create_consumer(&block)

  if @blocking
    sleep 0.01 until cancel?

    @consumer.cancel
  end
end

#message_countObject



63
64
65
# File 'lib/twingly/amqp/subscription.rb', line 63

def message_count
  @queue.status.fetch(:message_count)
end

#on_error(&block) ⇒ Object



59
60
61
# File 'lib/twingly/amqp/subscription.rb', line 59

def on_error(&block)
  @on_error_callback = block
end

#on_exception(&block) ⇒ Object



55
56
57
# File 'lib/twingly/amqp/subscription.rb', line 55

def on_exception(&block)
  @on_exception_callback = block
end

#raw_queueObject



67
68
69
# File 'lib/twingly/amqp/subscription.rb', line 67

def raw_queue
  @queue
end