Class: Twingly::AMQP::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/twingly/amqp/subscription.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue_name:, exchange_topic: nil, routing_key: nil, routing_keys: nil, consumer_threads: 1, prefetch: 20, connection: Connection.instance, max_length: nil, queue_type: :quorum) ⇒ Subscription

Returns a new instance of Subscription.



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/twingly/amqp/subscription.rb', line 4

def initialize(queue_name:, exchange_topic: nil, routing_key: nil,
               routing_keys: nil, consumer_threads: 1, prefetch: 20,
               connection: Connection.instance, max_length: nil,
               queue_type: :quorum)
  @queue_name       = queue_name
  @exchange_topic   = exchange_topic
  @routing_keys     = Array(routing_keys || routing_key)
  @consumer_threads = consumer_threads
  @prefetch         = prefetch
  @max_length       = max_length
  @queue_type       = queue_type
  @cancel           = false
  @consumer         = nil
  @blocking         = false

  if routing_key
    warn "[DEPRECATION] `routing_key` is deprecated. "\
         "Please use `routing_keys` instead."
  end

  @channel = create_channel(connection)
  @queue   = create_queue

  if @exchange_topic && @routing_keys.any?
    exchange = @channel.topic(@exchange_topic, durable: true)

    @routing_keys.each do |routing_key|
      @queue.bind(exchange, routing_key: routing_key)
    end
  end

  @before_handle_message_callback = proc {}
  @on_exception_callback          = proc {}
end

Instance Method Details

#before_handle_message(&block) ⇒ Object



50
51
52
# File 'lib/twingly/amqp/subscription.rb', line 50

def before_handle_message(&block)
  @before_handle_message_callback = block
end

#cancel!Object



70
71
72
73
# File 'lib/twingly/amqp/subscription.rb', line 70

def cancel!
  @consumer.cancel unless @blocking
  @cancel = true
end

#cancel?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/twingly/amqp/subscription.rb', line 66

def cancel?
  @cancel
end

#each_message(blocking: true, &block) ⇒ Object



39
40
41
42
43
44
45
46
47
48
# File 'lib/twingly/amqp/subscription.rb', line 39

def each_message(blocking: true, &block)
  @blocking = blocking
  @consumer = create_consumer(&block)

  if @blocking
    sleep 0.01 until cancel?

    @consumer.cancel
  end
end

#message_countObject



58
59
60
# File 'lib/twingly/amqp/subscription.rb', line 58

def message_count
  @queue.status.fetch(:message_count)
end

#on_exception(&block) ⇒ Object



54
55
56
# File 'lib/twingly/amqp/subscription.rb', line 54

def on_exception(&block)
  @on_exception_callback = block
end

#raw_queueObject



62
63
64
# File 'lib/twingly/amqp/subscription.rb', line 62

def raw_queue
  @queue
end