Class: JMS::SessionPool
- Inherits:
-
Object
- Object
- JMS::SessionPool
- Defined in:
- lib/jms/session_pool.rb
Overview
Since a Session can only be used by one thread at a time, we could create a Session for every thread. That could result in excessive unused Sessions. An alternative is to create a pool of sessions that can be shared by multiple threads.
Each thread can request a session and then return it once it is no longer needed by that thread. The only way to get a session is pass a block so that the Session is automatically returned to the pool upon completion of the block.
Parameters:
see regular session parameters from: JMS::Connection#initialize
Additional parameters for controlling the session pool itself
:pool_name Name of the pool as it shows up in the logger.
Default: 'JMS::SessionPool'
:pool_size Maximum Pool Size. Default: 10
The pool only grows as needed and will never exceed
:pool_size
:pool_timeout Number of seconds to wait before raising a TimeoutError
if no sessions are available in the poo
Default: 60
:pool_warn_timeout Number of seconds to wait before logging a warning when a
session in the pool is not available
Default: 5
:pool_logger Supply a logger that responds to #debug, #info, #warn and #debug?
For example: Rails.logger
Default: JMS.logger
Example:
session_pool = connection.create_session_pool(config)
session_pool.session do |session|
....
end
Instance Method Summary collapse
-
#close ⇒ Object
Immediately Close all sessions in the pool and release from the pool.
-
#consumer(params, &block) ⇒ Object
Obtain a session from the pool and create a MessageConsumer.
-
#initialize(connection, params = {}) ⇒ SessionPool
constructor
A new instance of SessionPool.
-
#producer(params, &block) ⇒ Object
Obtain a session from the pool and create a MessageProducer.
-
#session(&block) ⇒ Object
Obtain a session from the pool and pass it to the supplied block The session is automatically returned to the pool once the block completes.
Constructor Details
#initialize(connection, params = {}) ⇒ SessionPool
Returns a new instance of SessionPool.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/jms/session_pool.rb', line 37 def initialize(connection, params={}) # Save Session params since it will be used every time a new session is # created in the pool session_params = params.nil? ? {} : params.dup logger = session_params[:pool_logger] || JMS.logger # Define how GenePool can create new sessions @pool = GenePool.new( :name => session_params[:pool_name] || self.class.name, :pool_size => session_params[:pool_size] || 10, :warn_timeout => session_params[:pool_warn_timeout] || 5, :timeout => session_params[:pool_timeout] || 60, :close_proc => nil, :logger => logger) do connection.create_session(session_params) end # Handle connection failures connection.on_exception do |jms_exception| logger.error "JMS Connection Exception has occurred: #{jms_exception}" if logger #TODO: Close all sessions in the pool and release from the pool? end end |
Instance Method Details
#close ⇒ Object
Immediately Close all sessions in the pool and release from the pool
Note: This is an immediate close, active sessions will be aborted
Note: Once closed a session pool cannot be re-used. A new instance must
be created
TODO: Allow an option to wait for active sessions to be returned before
closing
160 161 162 163 164 |
# File 'lib/jms/session_pool.rb', line 160 def close @pool.each do |s| s.close end end |
#consumer(params, &block) ⇒ Object
Obtain a session from the pool and create a MessageConsumer. Pass both into the supplied block. Once the block is complete the consumer is closed and the session is returned to the pool.
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which should be returned from the queue
Default: All
:no_local => Determine whether published by its own connection
should be delivered to it
Default: false
Example
session_pool.consumer(:queue_name => 'MyQueue') do |session, consumer|
= consumer.receive(timeout)
puts .data if
end
107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/jms/session_pool.rb', line 107 def consumer(params, &block) session do |s| consumer = nil begin consumer = s.consumer(params) block.call(s, consumer) ensure consumer.close if consumer end end end |
#producer(params, &block) ⇒ Object
Obtain a session from the pool and create a MessageProducer. Pass both into the supplied block. Once the block is complete the producer is closed and the session is returned to the pool.
Parameters:
:queue_name => String: Name of the Queue to send to
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to send to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit JMS::Destination to use
Example
session_pool.producer(:queue_name => 'ExampleQueue') do |session, producer|
producer.send(session.("Hello World"))
end
139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/jms/session_pool.rb', line 139 def producer(params, &block) session do |s| producer = nil begin producer = s.producer(params) block.call(s, producer) ensure producer.close if producer end end end |
#session(&block) ⇒ Object
Obtain a session from the pool and pass it to the supplied block The session is automatically returned to the pool once the block completes
In the event a JMS Exception is thrown the session will be closed and removed from the pool to prevent re-using sessions that are no longer valid
65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/jms/session_pool.rb', line 65 def session(&block) s = nil begin s = @pool.checkout block.call(s) rescue javax.jms.JMSException => e s.close rescue nil @pool.remove(s) s = nil # Do not check back in since we have removed it raise e ensure @pool.checkin(s) if s end end |