Class: HornetQ::Client::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/hornetq/client/connection.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Connection

Create a new Connection from which sessions can be created

Parameters:

  • a Hash consisting of one or more of the named parameters

  • Summary of parameters and their default values

HornetQ::Client::Connection.new(
  :uri                            => 'hornetq://localhost',
  :ack_batch_size                 => ,
  :auto_group                     => ,
  :block_on_acknowledge           => ,
  :block_on_durable_send          => ,
  :block_on_non_durable_send      => ,
  :cache_large_messages_client    => ,
  :call_timeout                   => ,
  :client_failure_check_period    => ,
  :confirmation_window_size       => ,
  :connection_load_balancing_policy_class_name => ,
  :connection_ttl                 => ,
  :consumer_max_rate              => ,
  :consumer_window_size           => ,
  :discovery_address              => ,
  :discovery_initial_wait_timeout => ,
  :discovery_port                 => ,
  :discovery_refresh_timeout      => ,
  :failover_on_initial_connection => true,
  :failover_on_server_shutdown    => true,
  :group_id                       => ,
  :initial_message_packet_size    => ,
  :java_object                    => ,
  :local_bind_address             => ,
  :max_retry_interval             => ,
  :min_large_message_size         => ,
  :pre_acknowledge                => ,
  :producer_max_rate              => ,
  :producer_window_size           => ,
  :reconnect_attempts             => 1,
  :retry_interval                 => ,
  :retry_interval_multiplier      => ,
  :scheduled_thread_pool_max_size => ,
  :static_connectors              => ,
  :thread_pool_max_size           => ,
  :use_global_pools               =>
)

Mandatory Parameters

  • :uri

    • The hornetq uri as to which server to connect with and which transport protocol to use. Format:

      hornetq://server:port,backupserver:port/?protocol=[netty|discover]
      
    • To use the default netty transport

      hornetq://server:port
      
    • To use the default netty transport and specify a backup server

      hornetq://server:port,backupserver:port
      
    • To use auto-discovery

      hornetq://server:port/?protocol=discovery
      
    • To use HornetQ within the current JVM

      hornetq://invm
      

Optional Parameters

  • :ack_batch_size

  • :auto_group

  • :block_on_acknowledge

  • :block_on_durable_send

  • :block_on_non_durable_send

  • :cache_large_messages_client

  • :call_timeout

  • :client_failure_check_period

  • :confirmation_window_size

  • :connection_load_balancing_policy_class_name

  • :connection_ttl

  • :consumer_max_rate

  • :consumer_window_size

  • :discovery_address

  • :discovery_initial_wait_timeout

  • :discovery_port

  • :discovery_refresh_timeout

  • :failover_on_initial_connection

  • :failover_on_server_shutdown

  • :group_id

  • :initial_message_packet_size

  • :java_object

  • :local_bind_address

  • :max_retry_interval

  • :min_large_message_size

  • :pre_acknowledge

  • :producer_max_rate

  • :producer_window_size

  • :reconnect_attempts

  • :retry_interval

  • :retry_interval_multiplier

  • :scheduled_thread_pool_max_size

  • :static_connectors

  • :thread_pool_max_size

  • :use_global_pools



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/hornetq/client/connection.rb', line 159

def initialize(params={})
  params =params.clone
  uri = nil
  # TODO: Support :uri as an array for cluster configurations
  if params.kind_of?(String)
    uri = HornetQ::URI.new(params)
    params = uri.params
  else
    raise "Missing :uri param in HornetQ::Server.create_server" unless params[:uri]
    uri = HornetQ::URI.new(params.delete(:uri))
    # params override uri params
    params = uri.params.merge(params)
  end

  @connection = nil
  @sessions = []
  @consumers = []
  # In-VM Transport has no fail-over or additional parameters
  if uri.host == 'invm'
    transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::INVM_CONNECTOR_CLASS_NAME)
    @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport)
  elsif params[:protocol]
    # Auto-Discovery just has a host name and port
    if params[:protocol] == 'discovery'
      @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(uri.host, uri.port)
    elsif params[:protocol] != 'netty'
      raise "Unknown HornetQ protocol:#{params[:protocol]}"
    end
  end

  # Unless already created, then the connection will use the netty protocol
  unless @connection
    # Primary Transport
    transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.host, 'port' => uri.port })

    # Check for backup server connection information
    if uri.backup_host
      backup_transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.backup_host, 'port' => uri.backup_port })
      @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport, backup_transport)
    else
      @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport)
    end
  end

  # If any other options were supplied, apply them to the created Connection instance
  params.each_pair do |key, val|
    method = key.to_s+'='
    if @connection.respond_to? method
      @connection.send method, val
      #puts "Debug: #{key} = #{@connection.send key}" if @connection.respond_to? key.to_sym
    else
      HornetQ.logger.warn "Warning: Option:#{key}, with value:#{val} is invalid and being ignored"
    end
  end
end

Class Method Details

.connection(params = {}, &proc) ⇒ Object

Call the supplied code block after creating a connection instance See initialize for the parameter list The connection is closed before returning

Returns the result of the code block



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/hornetq/client/connection.rb', line 51

def self.connection(params={}, &proc)
  raise "Missing mandatory code block" unless proc
  connection = nil
  result = nil
  begin
    connection=self.new(params)
    result = proc.call(connection)
  ensure
    connection.close
  end
  result
end

.session(params = {}, &proc) ⇒ Object

Create a new Connection and Session

Creates a new connection and session, then passes the session to the supplied
block. Upon completion the session and connection are both closed

See Connection::initialize and Connection::create_session for the list

of parameters

Returns result of block



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/hornetq/client/connection.rb', line 13

def self.session(params={},&proc)
  raise "Missing mandatory code block" unless proc
  connection = nil
  session = nil
  begin
    if params.kind_of?(String)
      # TODO: Support passing username and password from URI to Session
      connection = self.new(params)
      connection.session({}, &proc)
    else
      connection = self.new(params[:connection] || {})
      connection.session(params[:session] || {}, &proc)
    end
  ensure
    connection.close if connection
  end
end

.start_session(params = {}, &proc) ⇒ Object

Create a new Connection along with a Session, and then start the session

Creates a new connection and session, then passes the session to the supplied
block. Upon completion the session and connection are both closed

See Connection::initialize and Connection::create_session for the list

of parameters

Returns result of block



39
40
41
42
43
44
# File 'lib/hornetq/client/connection.rb', line 39

def self.start_session(params={},&proc)
  session(params) do |session|
    session.start
    proc.call(session)
  end
end

Instance Method Details

#closeObject

Close Connection connections



410
411
412
413
414
# File 'lib/hornetq/client/connection.rb', line 410

def close
  @sessions.each { |session| session.close }
  @connection.close if @connection
  @connection = nil
end

#create_session(params = {}) ⇒ Object

Create a new HornetQ session

Note: Remember to close the session once it is no longer used.

Recommend using #session with a block over this method where possible

Note:

  • The returned session MUST be closed once complete

    connection = HornetQ::Client::Connection.new(:uri => 'hornetq://localhost/')
    session = connection.create_session
      ...
    session.close
    connection.close
    

Returns:

  • A new HornetQ ClientSession

  • See org.hornetq.api.core.client.ClientSession for documentation on returned object

Throws:

  • NativeException

Example:

require 'hornetq'

connection = nil
session = nil
begin
  connection = HornetQ::Client::Connection.new(:uri => 'hornetq://localhost/')
  session = connection.create_session

  # Create a new queue
  session.create_queue('Example', 'Example', true)

  # Create a producer to send messages
  producer = session.create_producer('Example')

  # Create a Text Message
  message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,true)
  message.body_buffer.write_string('Hello World')

  # Send the message
  producer.send(message)
ensure
  session.close if session
  connection.close if connection
end

Parameters:

  • a Hash consisting of one or more of the named parameters

  • Summary of parameters and their default values

connection.create_session(
 :username          => 'my_username',   # Default is no authentication
 :password          => 'password',      # Default is no authentication
 :xa                => false,
 :auto_commit_sends => true,
 :auto_commit_acks  => true,
 :pre_acknowledge   => false,
 :ack_batch_size    => 1
 )

Mandatory Parameters

  • None

Optional Parameters

  • :username

    • The user name. To create an authenticated session

  • :password

    • The user password. To create an authenticated session

  • :xa

    • Whether the session supports XA transaction semantics or not

  • :auto_commit_sends

    • true: automatically commit message sends

    • false: commit manually

  • :auto_commit_acks

    • true: automatically commit message acknowledgement

    • false: commit manually

  • :pre_acknowledge

    • true: to pre-acknowledge messages on the server

    • false: to let the client acknowledge the messages

    • Note: It is possible to pre-acknowledge messages on the server so that the client can avoid additional network trip to the server to acknowledge messages. While this increases performance, this does not guarantee delivery (as messages can be lost after being pre-acknowledged on the server). Use with caution if your application design permits it.

  • :ack_batch_size

    • the batch size of the acknowledgements

  • :managed

    • true: The session will be managed by the connection. It will be

      closed when the connection is closed.
      Also the session will be started or stopped when
      Connection#start_managed_sessions or
      Connection#stop_managed_sessions is called
      
    • false: The caller is responsible for closing the session

    • Default: false



317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/hornetq/client/connection.rb', line 317

def create_session(params={})
  raise "HornetQ::Client::Connection Already Closed" unless @connection
  session = @connection.create_session(
    params[:username],
    params[:password],
    params[:xa] || false,
    params[:auto_commit_sends].nil? ? true : params[:auto_commit_sends],
    params[:auto_commit_acks].nil? ? true : params[:auto_commit_acks],
    params[:pre_acknowledge] || false,
    params[:ack_batch_size] || 1)
  (@sessions << session) if params.fetch(:managed, false)
  session
end

#create_session_pool(params = {}) ⇒ Object

Create a Session pool



404
405
406
407
# File 'lib/hornetq/client/connection.rb', line 404

def create_session_pool(params={})
  require 'hornetq/client/session_pool'
  SessionPool.new(self, params)
end

#on_message(params, &proc) ⇒ Object

Receive messages in a separate thread when they arrive Allows messages to be received in a separate thread. I.e. Asynchronously This method will return to the caller before messages are processed. It is then the callers responsibility to keep the program active so that messages can then be processed.

Note:

Session Parameters:

:options => any of the javax.jms.Session constants
   Default: javax.jms.Session::AUTO_ACKNOWLEDGE

:session_count : Number of sessions to create, each with their own consumer which
                 in turn will call the supplied block.
                 Note: The supplied block must be thread safe since it will be called
                       by several threads at the same time.
                       I.e. Don't change instance variables etc. without the
                       necessary semaphores etc.
                 Default: 1

Consumer Parameters:

:queue_name  => The name of the queue to consume messages from. Mandatory
:filter      => Only consume messages matching the filter: Default: nil
:browse_only => Whether to just browse the queue or consume messages
                true | false. Default: false
:window_size => The consumer window size.
:max_rate    => The maximum rate to consume messages.
:auto_start  => Immediately start processing messages.
                If set to false, call Connection#start_managed_sessions
                to manually start receive messages later
                Default: true

:statistics Capture statistics on how many messages have been read
   true  : This method will capture statistics on the number of messages received
           and the time it took to process them.
           The timer starts when each() is called and finishes when either the last message was received,
           or when Destination::statistics is called. In this case MessageConsumer::statistics
           can be called several times during processing without affecting the end time.
           Also, the start time and message count is not reset until MessageConsumer::each
           is called again with :statistics => true

           The statistics gathered are returned when :statistics => true and :async => false

Usage: For transacted sessions (the default) the Proc supplied must return

either true or false:
  true => The session is committed
  false => The session is rolled back
  Any Exception => The session is rolled back

Notes:

  • Remember to call ::start on the connection otherwise the on_message will not start consuming any messages

  • Remember to call message.acknowledge before completing the block so that

    the message will be removed from the queue
    
  • If the block throws an exception, the



471
472
473
474
475
476
477
478
479
480
481
# File 'lib/hornetq/client/connection.rb', line 471

def on_message(params, &proc)
  consumer_count = params[:session_count] || 1
  consumer_count.times do
    session = self.create_session(params)
    consumer = session.create_consumer_from_params(params)
    consumer.on_message(params, &proc)
    session.start if params.fetch(:auto_start, true)
    @consumers << consumer
    @sessions << session
  end
end

#on_message_statisticsObject



483
484
485
# File 'lib/hornetq/client/connection.rb', line 483

def on_message_statistics
  @consumers.collect{|consumer| consumer.on_message_statistics}
end

#session(params = {}, &proc) ⇒ Object

Create a session, call the supplied block and once it completes close the session. See session_create for the Parameters

Returns the result of the block

Example:

require 'hornetq'

HornetQ::Client::Connection.connection('hornetq://localhost/') do |connection
  connection.session do |session|

    # Create a producer to send messages
    session.producer('Example') do |producer|

      # Create a Text Message
      message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,true)
      message.body = 'Hello World'

      # Send the message
      producer.send(message)
    end
  end
end


355
356
357
358
359
360
361
362
363
364
# File 'lib/hornetq/client/connection.rb', line 355

def session(params={}, &proc)
  raise "HornetQ::Client::session mandatory block missing" unless proc
  session = nil
  begin
    session = create_session(params)
    proc.call(session)
  ensure
    session.close if session
  end
end

#start_managed_sessionsObject

Start all sessions managed by this connection

Sessions created via #create_session are not managed unless :auto_close => true was specified when the session was created

Session are Only managed when created through the following methods:

Connection#on_message
Connection#create_session And :auto_close => true

This call does not do anything to sessions in a session pool



497
498
499
# File 'lib/hornetq/client/connection.rb', line 497

def start_managed_sessions
  @sessions.each {|session| session.start}
end

#start_session(params = {}, &proc) ⇒ Object

Create a session, start the session, call the supplied block and once the block completes close the session.

See: #session_create for the Parameters

Returns the result of the block

Example:

require 'hornetq'

HornetQ::Client::Connection.connection('hornetq://localhost/') do |connection
  # Must start the session other we cannot consume messages using it
  connection.start_session do |session|

    # Create a consumer to receive messages
    session.consumer('TestQueue') do |consumer|

      consumer.each do |message|
        message.acknowledge
      end

    end
  end
end


391
392
393
394
395
396
397
398
399
400
401
# File 'lib/hornetq/client/connection.rb', line 391

def start_session(params={}, &proc)
  raise "HornetQ::Client::session mandatory block missing" unless proc
  session = nil
  begin
    session = create_session(params)
    session.start
    proc.call(session)
  ensure
    session.close if session
  end
end

#stop_managed_sessionsObject

Stop all sessions managed by this connection so that they no longer receive messages for processing

See: #start_managed_sessions for details on which sessions are managed



505
506
507
# File 'lib/hornetq/client/connection.rb', line 505

def stop_managed_sessions
  @sessions.each {|session| session.stop}
end