Class: Bisques::QueueListener

Inherits:
Object
  • Object
show all
Defined in:
lib/bisques/queue_listener.rb

Overview

Listen for messages on a queue and execute a block when they arrive.

Instance Method Summary collapse

Constructor Details

#initialize(queue, poll_time = 5) ⇒ QueueListener

Returns a new instance of QueueListener.

Parameters:

  • queue (Queue)

    the queue to listen on

  • poll_time (Fixnum) (defaults to: 5)

    the number of seconds to long poll during each iteration. Maximum is 20.



9
10
11
# File 'lib/bisques/queue_listener.rb', line 9

def initialize(queue, poll_time = 5)
  @queue, @poll_time = queue, poll_time
end

Instance Method Details

#listen {|Message| ... } ⇒ Object

Note:

Note that the block you give to this method is executed in a new thread.

Listen for messages. This is asynchronous and returns immediately.

Examples:


queue = bisques.find_or_create_queue("my queue")
listener = QueuedListener.new(queue)
listener.listen do |message|
  puts "Received #{message.object}"
  message.delete
end

while true; sleep 1; end # Process messages forever

Yields:



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/bisques/queue_listener.rb', line 34

def listen(&block)
  return if @listening
  @listening = true

  @thread = Thread.new do
    while @listening
      message = @queue.retrieve(@poll_time)
      block.call(message) if message.present?
    end
  end
end

#listening?Boolean

Returns true while the listener is active.

Returns:

  • (Boolean)

    returns true while the listener is active.



14
15
16
# File 'lib/bisques/queue_listener.rb', line 14

def listening?
  @listening
end

#stopObject

Stop listening for messages.



47
48
49
50
# File 'lib/bisques/queue_listener.rb', line 47

def stop
  @listening = false
  @thread.join if @thread
end