Class: JMS::SessionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/jms/session_pool.rb

Overview

Since a Session can only be used by one thread at a time, we could create a Session for every thread. That could result in excessive unused Sessions. An alternative is to create a pool of sessions that can be shared by multiple threads.

Each thread can request a session and then return it once it is no longer needed by that thread. The only way to get a session is pass a block so that the Session is automatically returned to the pool upon completion of the block.

Parameters:

see regular session parameters from: JMS::Connection#initialize

Additional parameters for controlling the session pool itself

:pool_name         Name of the pool as it shows up in the logger.
                   Default: 'JMS::SessionPool'
:pool_size         Maximum Pool Size. Default: 10
                   The pool only grows as needed and will never exceed
                   :pool_size
:pool_timeout      Number of seconds to wait before raising a TimeoutError
                   if no sessions are available in the poo
                   Default: 60
:pool_warn_timeout Number of seconds to wait before logging a warning when a
                   session in the pool is not available
                   Default: 5
:pool_logger       Supply a logger that responds to #debug, #info, #warn and #debug?
                   For example: Rails.logger
                   Default: JMS.logger

Example:

session_pool = connection.create_session_pool(config)
session_pool.session do |session|
   ....
end

Instance Method Summary collapse

Constructor Details

#initialize(connection, params = {}) ⇒ SessionPool

Returns a new instance of SessionPool.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/jms/session_pool.rb', line 37

def initialize(connection, params={})
  # Save Session params since it will be used every time a new session is
  # created in the pool
  session_params = params.nil? ? {} : params.dup
  logger = session_params[:pool_logger] || JMS.logger
  # Define how GenePool can create new sessions
  @pool = GenePool.new(
    :name         => session_params[:pool_name] || self.class.name,
    :pool_size    => session_params[:pool_size] || 10,
    :warn_timeout => session_params[:pool_warn_timeout] || 5,
    :timeout      => session_params[:pool_timeout] || 60,
    :close_proc   => nil,
    :logger       => logger) do
    connection.create_session(session_params)
  end

  # Handle connection failures
  connection.on_exception do |jms_exception|
    logger.error "JMS Connection Exception has occurred: #{jms_exception}" if logger
    #TODO: Close all sessions in the pool and release from the pool?
  end
end

Instance Method Details

#closeObject

Immediately Close all sessions in the pool and release from the pool

Note: This is an immediate close, active sessions will be aborted

Note: Once closed a session pool cannot be re-used. A new instance must

be created

TODO: Allow an option to wait for active sessions to be returned before

closing


160
161
162
163
164
# File 'lib/jms/session_pool.rb', line 160

def close
  @pool.each do |s|
    s.close
  end
end

#consumer(params, &block) ⇒ Object

Obtain a session from the pool and create a MessageConsumer. Pass both into the supplied block. Once the block is complete the consumer is closed and the session is returned to the pool.

Parameters:

:queue_name => String: Name of the Queue to return
               Symbol: :temporary => Create temporary queue
               Mandatory unless :topic_name is supplied
  Or,
:topic_name => String: Name of the Topic to write to or subscribe to
               Symbol: :temporary => Create temporary topic
               Mandatory unless :queue_name is supplied
  Or,
:destination=> Explicit javaxJms::Destination to use

:selector   => Filter which messages should be returned from the queue
               Default: All messages
:no_local   => Determine whether messages published by its own connection
               should be delivered to it
               Default: false

Example

session_pool.consumer(:queue_name => 'MyQueue') do |session, consumer|
  message = consumer.receive(timeout)
  puts message.data if message
end


107
108
109
110
111
112
113
114
115
116
117
# File 'lib/jms/session_pool.rb', line 107

def consumer(params, &block)
  session do |s|
    consumer = nil
    begin
      consumer = s.consumer(params)
      block.call(s, consumer)
    ensure
      consumer.close if consumer
    end
  end
end

#producer(params, &block) ⇒ Object

Obtain a session from the pool and create a MessageProducer. Pass both into the supplied block. Once the block is complete the producer is closed and the session is returned to the pool.

Parameters:

:queue_name => String: Name of the Queue to send messages to
               Symbol: :temporary => Create temporary queue
               Mandatory unless :topic_name is supplied
  Or,
:topic_name => String: Name of the Topic to send message to
               Symbol: :temporary => Create temporary topic
               Mandatory unless :queue_name is supplied
  Or,
:destination=> Explicit JMS::Destination to use

Example

session_pool.producer(:queue_name => 'ExampleQueue') do |session, producer|
  producer.send(session.message("Hello World"))
end


139
140
141
142
143
144
145
146
147
148
149
# File 'lib/jms/session_pool.rb', line 139

def producer(params, &block)
  session do |s|
    producer = nil
    begin
      producer = s.producer(params)
      block.call(s, producer)
    ensure
      producer.close if producer
    end
  end
end

#session(&block) ⇒ Object

Obtain a session from the pool and pass it to the supplied block The session is automatically returned to the pool once the block completes

In the event a JMS Exception is thrown the session will be closed and removed from the pool to prevent re-using sessions that are no longer valid



65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/jms/session_pool.rb', line 65

def session(&block)
  s = nil
  begin
    s = @pool.checkout
    block.call(s)
  rescue javax.jms.JMSException => e
    s.close rescue nil
    @pool.remove(s)
    s = nil # Do not check back in since we have removed it
    raise e
  ensure
    @pool.checkin(s) if s
  end
end