Module: JMS::MessageConsumer
- Defined in:
- lib/jms/message_consumer.rb
Overview
Interface javax.jms.MessageConsumer
Instance Method Summary collapse
-
#each(params = {}, &block) ⇒ Object
For each message available to be consumed call the supplied block Returns the statistics gathered when statistics: true, otherwise nil.
-
#get(params = {}) ⇒ Object
Obtain a message from the Destination or Topic In JMS terms, the message is received from the Destination :timeout follows the rules for MQSeries: -1 : Wait forever 0 : Return immediately if no message is available x : Wait for x milli-seconds for a message to be received from the broker Note: Messages may still be on the queue, but the broker has not supplied any messages in the time interval specified Default: 0.
-
#on_message(params = {}, &proc) ⇒ Object
Receive messages in a separate thread when they arrive Allows messages to be recieved in a separate thread.
-
#on_message_statistics ⇒ Object
Return the current statistics for a running MessageConsumer::on_message.
Instance Method Details
#each(params = {}, &block) ⇒ Object
For each message available to be consumed call the supplied block Returns the statistics gathered when statistics: true, otherwise nil
Parameters:
:timeout How to timeout waiting for messages on the Queue or Topic
-1 : Wait forever
0 : Return immediately if no message is available
x : Wait for x milli-seconds for a message to be received from the broker
Note: Messages may still be on the queue, but the broker has not supplied any messages
in the time interval specified
Default: 0
:statistics Capture statistics on how many messages have been read
true : This method will capture statistics on the number of messages received
and the time it took to process them.
The statistics can be reset by calling MessageConsumer::each again
with statistics: true
The statistics gathered are returned when statistics: true and async: false
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/jms/message_consumer.rb', line 42 def each(params={}, &block) raise(ArgumentError, 'Destination::each requires a code block to be executed for each message received') unless block = nil start_time = nil if params[:statistics] = 0 start_time = Time.now end # Receive messages according to timeout while = self.get(params) do block.call() += 1 if end unless .nil? duration = Time.now - start_time { messages: , duration: duration, messages_per_second: duration > 0 ? (/duration).to_i : 0, ms_per_msg: > 0 ? (duration*1000.0)/ : 0 } end end |
#get(params = {}) ⇒ Object
Obtain a message from the Destination or Topic In JMS terms, the message is received from the Destination :timeout follows the rules for MQSeries:
-1 : Wait forever
0 : Return immediately if no message is available
x : Wait for x milli-seconds for a message to be received from the broker
Note: Messages may still be on the queue, but the broker has not supplied any messages
in the time interval specified
Default: 0
12 13 14 15 16 17 18 19 20 21 |
# File 'lib/jms/message_consumer.rb', line 12 def get(params={}) timeout = params[:timeout] || 0 if timeout == -1 self.receive elsif timeout == 0 self.receiveNoWait else self.receive(timeout) end end |
#on_message(params = {}, &proc) ⇒ Object
Receive messages in a separate thread when they arrive Allows messages to be recieved in a separate thread. I.e. Asynchronously This method will return to the caller before messages are processed. It is then the callers responsibility to keep the program active so that messages can then be processed.
Parameters:
:statistics Capture statistics on how many messages have been read
true : This method will capture statistics on the number of messages received
and the time it took to process them.
The timer starts when each() is called and finishes when either the last message was received,
or when Destination::statistics is called. In this case MessageConsumer::statistics
can be called several times during processing without affecting the end time.
Also, the start time and message count is not reset until MessageConsumer::each
is called again with statistics: true
The statistics gathered are returned when statistics: true and async: false
88 89 90 91 92 93 94 95 96 |
# File 'lib/jms/message_consumer.rb', line 88 def (params={}, &proc) raise(ArgumentError, 'MessageConsumer::on_message requires a code block to be executed for each message received') unless proc # Turn on Java class persistence: https://github.com/jruby/jruby/wiki/Persistence self.class.__persistent__ = true @listener = JMS::MessageListenerImpl.new(params, &proc) self.setMessageListener(@listener) end |
#on_message_statistics ⇒ Object
Return the current statistics for a running MessageConsumer::on_message
99 100 101 102 103 |
# File 'lib/jms/message_consumer.rb', line 99 def stats = @listener.statistics if @listener raise(ArgumentError, 'First call MessageConsumer::on_message with statistics: true before calling MessageConsumer::statistics()') unless stats stats end |