Module: JMS::Session
- Defined in:
- lib/jms/session.rb
Overview
For each thread that will be processing messages concurrently a separate session is required. All sessions can share a single connection to the same JMS Provider.
Interface javax.jms.Session
See: download.oracle.com/javaee/6/api/javax/jms/Session.html
Other methods still directly accessible through this class:
create_browser(queue, message_selector)
Creates a QueueBrowser object to peek at the messages on the specified queue using a message selector.
create_bytes_message()
Creates a BytesMessage object
create_consumer(destination)
Creates a MessageConsumer for the specified destination
See: Connection::consumer
Example:
destination = session.create_destination(:queue_name => "MyQueue")
session.create_consumer(destination)
create_consumer(destination, message_selector)
Creates a MessageConsumer for the specified destination, using a message selector
create_consumer(destination, message_selector, boolean NoLocal)
Creates MessageConsumer for the specified destination, using a message selector
create_durable_subscriber(Topic topic, java.lang.String name)
Creates a durable subscriber to the specified topic
create_durable_subscriber(Topic topic, java.lang.String name, java.lang.String messageSelector, boolean noLocal)
Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages published by its own connection should be delivered to it.
create_map_Message()
Creates a MapMessage object
create_message()
Creates a Message object
create_object_message()
Creates an ObjectMessage object
create_object_message(java.io.Serializable object)
Creates an initialized ObjectMessage object
create_producer(destination)
Creates a MessageProducer to send to the specified destination
create_queue(queue_name)
Creates a queue identity given a Queue name
create_stream_message()
Creates a StreamMessage object
create_temporary_queue()
Creates a TemporaryQueue object
create_temporary_topic()
Creates a TemporaryTopic object
create_text_message()
Creates a TextMessage object
create_text_message(text)
Creates an initialized TextMessage object
create_topic(topic_name)
Creates a topic identity given a Topic name
acknowledge_mode()
Returns the acknowledgement mode of the session
message_listener()
Returns the session's distinguished message listener (optional).
transacted?
Indicates whether the session is in transacted mode
recover()
Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message
rollback()
Rolls back any messages done in this transaction and releases any locks currently held
message_listener=(MessageListener listener)
Sets the session's distinguished message listener (optional)
unsubscribe(name)
Unsubscribes a durable subscription that has been created by a client
Interface javax.jms.Session
Instance Method Summary collapse
-
#browse(params = {}, &proc) ⇒ Object
Browse the specified queue, calling the Proc supplied for each message found.
-
#browser(params, &proc) ⇒ Object
Return a browser for the destination A browser can read messages non-destructively from the queue It cannot browse Topics!.
-
#consume(params, &proc) ⇒ Object
Consume all messages for the destination A consumer can read messages from the queue or topic.
-
#consumer(params, &proc) ⇒ Object
Return a consumer for the destination A consumer can read messages from the queue or topic.
-
#create_destination(params) ⇒ Object
Create the destination based on the parameter supplied.
-
#destination(params = {}, &block) ⇒ Object
Create a queue or topic to send or receive messages from.
-
#message(data, type = nil) ⇒ Object
Create a new message instance based on the type of the data being supplied String (:to_str) => TextMessage Hash (:each_pair) => MapMessage Duck typing is used to determine the type.
-
#producer(params, &proc) ⇒ Object
Return a producer for the queue name supplied A producer supports sending messages to a Queue or a Topic.
-
#queue(queue_name, &block) ⇒ Object
Return the queue matching the queue name supplied Call the Proc if supplied.
-
#temporary_queue(&block) ⇒ Object
Return a temporary queue The temporary queue is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed.
-
#temporary_topic(&block) ⇒ Object
Return a temporary topic The temporary topic is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed.
-
#topic(topic_name, &proc) ⇒ Object
Return the topic matching the topic name supplied Call the Proc if supplied.
Instance Method Details
#browse(params = {}, &proc) ⇒ Object
Browse the specified queue, calling the Proc supplied for each message found
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which messages should be returned from the queue
Default: All messages
470 471 472 |
# File 'lib/jms/session.rb', line 470 def browse(params={}, &proc) self.browser(params) {|b| b.each(params, &proc)} end |
#browser(params, &proc) ⇒ Object
Return a browser for the destination A browser can read messages non-destructively from the queue It cannot browse Topics!
Call the Proc if supplied, then automatically close the consumer
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which messages should be returned from the queue
Default: All messages
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 |
# File 'lib/jms/session.rb', line 437 def browser(params, &proc) raise "Session::browser requires a code block to be executed" unless proc destination = create_destination(params) b = nil if params[:selector] b = create_browser(destination, params[:selector]) else b = create_browser(destination) end if proc begin proc.call(b) ensure b.close b = nil end end b end |
#consume(params, &proc) ⇒ Object
Consume all messages for the destination A consumer can read messages from the queue or topic
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which messages should be returned from the queue
Default: All messages
:no_local => Determine whether messages published by its own connection
should be delivered to it
Default: false
:timeout Follows the rules for MQSeries:
-1 : Wait forever
0 : Return immediately if no message is available
x : Wait for x milli-seconds for a message to be received from the broker
Note: Messages may still be on the queue, but the broker has not supplied any messages
in the time interval specified
Default: 0
413 414 415 416 417 418 419 420 |
# File 'lib/jms/session.rb', line 413 def consume(params, &proc) c = self.consumer(params) begin c.each(params, &proc) ensure c.close end end |
#consumer(params, &proc) ⇒ Object
Return a consumer for the destination A consumer can read messages from the queue or topic
Call the Proc if supplied, then automatically close the consumer
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which messages should be returned from the queue
Default: All messages
:no_local => Determine whether messages published by its own connection
should be delivered to it
Default: false
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
# File 'lib/jms/session.rb', line 363 def consumer(params, &proc) destination = create_destination(params) c = nil if params[:no_local] c = create_consumer(destination, params[:selector] || '', params[:no_local]) elsif params[:selector] c = create_consumer(destination, params[:selector]) else c = create_consumer(destination) end if proc begin proc.call(c) ensure c.close c = nil end end c end |
#create_destination(params) ⇒ Object
Create the destination based on the parameter supplied
The idea behind this method is to allow the decision as to whether one is sending to a topic or destination to be transparent to the code. The supplied parameters can be externalized into say a YAML file so that today it writes to a queue, later it can be changed to write to a topic so that multiple parties can receive the same messages.
Note: For Temporary Queues and Topics, remember to delete them when done
or just use ::destination instead with a block and it will take care
of deleting them for you
To create a queue:
session.create_destination(:queue_name => 'name of queue')
To create a temporary queue:
session.create_destination(:queue_name => :temporary)
To create a queue:
session.create_destination('queue://queue_name')
To create a topic:
session.create_destination(:topic_name => 'name of queue')
To create a temporary topic:
session.create_destination(:topic_name => :temporary)
To create a topic:
session.create_destination('topic://topic_name')
Create the destination based on the parameter supplied
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
Returns the result of the supplied block
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/jms/session.rb', line 194 def create_destination(params) # Allow a Java JMS destination object to be passed in return params[:destination] if params[:destination] && params[:destination].java_kind_of?(JMS::Destination) queue_name = nil topic_name = nil if params.is_a? String queue_name = params['queue://'.length..-1] if params.start_with?('queue://') topic_name = params['topic://'.length..-1] if params.start_with?('topic://') else # :q_name is deprecated queue_name = params[:queue_name] || params[:q_name] topic_name = params[:topic_name] end raise "Missing mandatory parameter :queue_name or :topic_name to Session::producer, Session::consumer, or Session::browser" unless queue_name || topic_name if queue_name queue_name == :temporary ? create_temporary_queue : create_queue(queue_name) else topic_name == :temporary ? create_temporary_topic : create_topic(topic_name) end end |
#destination(params = {}, &block) ⇒ Object
Create a queue or topic to send or receive messages from
A block must be supplied so that if it is a temporary topic or queue it will be deleted after the proc is complete
To create a queue:
session.destination(:queue_name => 'name of queue')
To create a temporary queue:
session.destination(:queue_name => :temporary)
To create a topic:
session.destination(:topic_name => 'name of queue')
To create a temporary topic:
session.destination(:topic_name => :temporary)
Create the destination based on the parameter supplied
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
Returns the result of the supplied block
250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/jms/session.rb', line 250 def destination(params={}, &block) raise "Missing mandatory Block when calling JMS::Session#destination" unless block dest = nil begin dest = create_destination(params) block.call(dest) ensure # Delete Temporary Queue / Topic dest.delete if dest && dest.respond_to?(:delete) end end |
#message(data, type = nil) ⇒ Object
Create a new message instance based on the type of the data being supplied
String (:to_str) => TextMessage
Hash (:each_pair) => MapMessage
Duck typing is used to determine the type. If the class responds to :to_str then it is considered a String. Similarly if it responds to :each_pair it is considered to be a Hash
If automated duck typing is not desired, the type of the message can be specified by setting the parameter ‘type’ to any one of:
:text => Creates a Text Message
:map => Creates a Map Message
:bytes => Creates a Bytes Message
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/jms/session.rb', line 124 def (data, type=nil) = nil type ||= if data.respond_to?(:to_str, false) :text elsif data.respond_to?(:each_pair, false) :map else raise "Unknown data type #{data.class.to_s} in Message" end case type when :text = self.createTextMessage .text = data.to_str when :map = self.createMapMessage .data = data when :bytes = self.createBytesMessage .write_bytes(data.to_java_bytes) else raise "Invalid type #{type} requested" end end |
#producer(params, &proc) ⇒ Object
Return a producer for the queue name supplied A producer supports sending messages to a Queue or a Topic
Call the Proc if supplied, then automatically close the producer
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit JMS::Destination to use
329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/jms/session.rb', line 329 def producer(params, &proc) p = self.create_producer(self.create_destination(params)) if proc begin proc.call(p) ensure p.close p = nil end end p end |
#queue(queue_name, &block) ⇒ Object
Return the queue matching the queue name supplied Call the Proc if supplied
264 265 266 267 268 |
# File 'lib/jms/session.rb', line 264 def queue(queue_name, &block) q = create_queue(queue_name) block.call(q) if block q end |
#temporary_queue(&block) ⇒ Object
Return a temporary queue The temporary queue is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed
274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/jms/session.rb', line 274 def temporary_queue(&block) q = create_temporary_queue if block begin block.call(q) ensure # Delete Temporary queue on completion of block q.delete if q q = nil end end q end |
#temporary_topic(&block) ⇒ Object
Return a temporary topic The temporary topic is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed
300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/jms/session.rb', line 300 def temporary_topic(&block) t = create_temporary_topic if block begin block.call(t) ensure # Delete Temporary topic on completion of block t.delete if t t = nil end end t end |
#topic(topic_name, &proc) ⇒ Object
Return the topic matching the topic name supplied Call the Proc if supplied
290 291 292 293 294 |
# File 'lib/jms/session.rb', line 290 def topic(topic_name, &proc) t = create_topic(topic_name) proc.call(t) if proc t end |