Module: JMS::MessageConsumer
- Defined in:
- lib/jms/message_consumer.rb
Overview
Interface javax.jms.MessageConsumer
Instance Method Summary collapse
-
#each(params = {}, &proc) ⇒ Object
For each message available to be consumed call the Proc supplied Returns the statistics gathered when :statistics => true, otherwise nil.
-
#get(params = {}) ⇒ Object
Obtain a message from the Destination or Topic In JMS terms, the message is received from the Destination :timeout follows the rules for MQSeries: -1 : Wait forever 0 : Return immediately if no message is available x : Wait for x milli-seconds for a message to be received from the broker Note: Messages may still be on the queue, but the broker has not supplied any messages in the time interval specified Default: 0.
-
#on_message(params = {}, &proc) ⇒ Object
Receive messages in a separate thread when they arrive Allows messages to be recieved in a separate thread.
-
#on_message_statistics ⇒ Object
Return the current statistics for a running MessageConsumer::on_message.
Instance Method Details
#each(params = {}, &proc) ⇒ Object
For each message available to be consumed call the Proc supplied Returns the statistics gathered when :statistics => true, otherwise nil
Parameters:
:timeout How to timeout waiting for messages on the Queue or Topic
-1 : Wait forever
0 : Return immediately if no message is available
x : Wait for x milli-seconds for a message to be received from the broker
Note: Messages may still be on the queue, but the broker has not supplied any messages
in the time interval specified
Default: 0
:statistics Capture statistics on how many messages have been read
true : This method will capture statistics on the number of messages received
and the time it took to process them.
The statistics can be reset by calling MessageConsumer::each again
with :statistics => true
The statistics gathered are returned when :statistics => true and :async => false
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/jms/message_consumer.rb', line 58 def each(params={}, &proc) raise "Destination::each requires a code block to be executed for each message received" unless proc = nil start_time = nil if params[:statistics] = 0 start_time = Time.now end # Receive messages according to timeout while = self.get(params) do proc.call() += 1 if end unless .nil? duration = Time.now - start_time {:messages => , :duration => duration, :messages_per_second => duration > 0 ? (/duration).to_i : 0, :ms_per_msg => > 0 ? (duration*1000.0)/ : 0 } end end |
#get(params = {}) ⇒ Object
Obtain a message from the Destination or Topic In JMS terms, the message is received from the Destination :timeout follows the rules for MQSeries:
-1 : Wait forever
0 : Return immediately if no message is available
x : Wait for x milli-seconds for a message to be received from the broker
Note: Messages may still be on the queue, but the broker has not supplied any messages
in the time interval specified
Default: 0
28 29 30 31 32 33 34 35 36 37 |
# File 'lib/jms/message_consumer.rb', line 28 def get(params={}) timeout = params[:timeout] || 0 if timeout == -1 self.receive elsif timeout == 0 self.receiveNoWait else self.receive(timeout) end end |
#on_message(params = {}, &proc) ⇒ Object
Receive messages in a separate thread when they arrive Allows messages to be recieved in a separate thread. I.e. Asynchronously This method will return to the caller before messages are processed. It is then the callers responsibility to keep the program active so that messages can then be processed.
Parameters:
:statistics Capture statistics on how many messages have been read
true : This method will capture statistics on the number of messages received
and the time it took to process them.
The timer starts when each() is called and finishes when either the last message was received,
or when Destination::statistics is called. In this case MessageConsumer::statistics
can be called several times during processing without affecting the end time.
Also, the start time and message count is not reset until MessageConsumer::each
is called again with :statistics => true
The statistics gathered are returned when :statistics => true and :async => false
103 104 105 106 107 108 |
# File 'lib/jms/message_consumer.rb', line 103 def (params={}, &proc) raise "MessageConsumer::on_message requires a code block to be executed for each message received" unless proc @listener = JMS::MessageListenerImpl.new(params,&proc) self.setMessageListener @listener end |
#on_message_statistics ⇒ Object
Return the current statistics for a running MessageConsumer::on_message
111 112 113 114 115 |
# File 'lib/jms/message_consumer.rb', line 111 def stats = @listener.statistics if @listener raise "First call MessageConsumer::on_message with :statistics=>true before calling MessageConsumer::statistics()" unless stats stats end |