Class: HornetQ::Client::Connection
- Inherits:
-
Object
- Object
- HornetQ::Client::Connection
- Defined in:
- lib/hornetq/client/connection.rb
Class Method Summary collapse
-
.connection(params = {}, &proc) ⇒ Object
Call the supplied code block after creating a connection instance See initialize for the parameter list The connection is closed before returning.
-
.session(params = {}, &proc) ⇒ Object
Create a new Connection and Session.
-
.start_session(params = {}, &proc) ⇒ Object
Create a new Connection along with a Session, and then start the session.
Instance Method Summary collapse
-
#close ⇒ Object
Close Connection connections.
-
#create_session(params = {}) ⇒ Object
Create a new HornetQ session.
-
#create_session_pool(params = {}) ⇒ Object
Create a Session pool.
-
#initialize(params = {}) ⇒ Connection
constructor
Create a new Connection from which sessions can be created.
-
#on_message(params, &proc) ⇒ Object
Receive messages in a separate thread when they arrive Allows messages to be received in a separate thread.
- #on_message_statistics ⇒ Object
-
#session(params = {}, &proc) ⇒ Object
Create a session, call the supplied block and once it completes close the session.
-
#start_managed_sessions ⇒ Object
Start all sessions managed by this connection.
-
#start_session(params = {}, &proc) ⇒ Object
Create a session, start the session, call the supplied block and once the block completes close the session.
-
#stop_managed_sessions ⇒ Object
Stop all sessions managed by this connection so that they no longer receive messages for processing.
Constructor Details
#initialize(params = {}) ⇒ Connection
Create a new Connection from which sessions can be created
Parameters:
-
a Hash consisting of one or more of the named parameters
-
Summary of parameters and their default values
HornetQ::Client::Connection.new(
:uri => 'hornetq://localhost',
:ack_batch_size => ,
:auto_group => ,
:block_on_acknowledge => ,
:block_on_durable_send => ,
:block_on_non_durable_send => ,
:cache_large_messages_client => ,
:call_timeout => ,
:client_failure_check_period => ,
:confirmation_window_size => ,
:connection_load_balancing_policy_class_name => ,
:connection_ttl => ,
:consumer_max_rate => ,
:consumer_window_size => ,
:discovery_address => ,
:discovery_initial_wait_timeout => ,
:discovery_port => ,
:discovery_refresh_timeout => ,
:failover_on_initial_connection => true,
:failover_on_server_shutdown => true,
:group_id => ,
:initial_message_packet_size => ,
:java_object => ,
:local_bind_address => ,
:max_retry_interval => ,
:min_large_message_size => ,
:pre_acknowledge => ,
:producer_max_rate => ,
:producer_window_size => ,
:reconnect_attempts => 1,
:retry_interval => ,
:retry_interval_multiplier => ,
:scheduled_thread_pool_max_size => ,
:static_connectors => ,
:thread_pool_max_size => ,
:use_global_pools =>
)
Mandatory Parameters
-
:uri
-
The hornetq uri as to which server to connect with and which transport protocol to use. Format:
hornetq://server:port,backupserver:port/?protocol=[netty|discover]
-
To use the default netty transport
hornetq://server:port
-
To use the default netty transport and specify a backup server
hornetq://server:port,backupserver:port
-
To use auto-discovery
hornetq://server:port/?protocol=discovery
-
To use HornetQ within the current JVM
hornetq://invm
-
Optional Parameters
-
:ack_batch_size
-
:auto_group
-
:block_on_acknowledge
-
:block_on_durable_send
-
:block_on_non_durable_send
-
:cache_large_messages_client
-
:call_timeout
-
:client_failure_check_period
-
:confirmation_window_size
-
:connection_load_balancing_policy_class_name
-
:connection_ttl
-
:consumer_max_rate
-
:consumer_window_size
-
:discovery_address
-
:discovery_initial_wait_timeout
-
:discovery_port
-
:discovery_refresh_timeout
-
:failover_on_initial_connection
-
:failover_on_server_shutdown
-
:group_id
-
:initial_message_packet_size
-
:java_object
-
:local_bind_address
-
:max_retry_interval
-
:min_large_message_size
-
:pre_acknowledge
-
:producer_max_rate
-
:producer_window_size
-
:reconnect_attempts
-
:retry_interval
-
:retry_interval_multiplier
-
:scheduled_thread_pool_max_size
-
:static_connectors
-
:thread_pool_max_size
-
:use_global_pools
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/hornetq/client/connection.rb', line 159 def initialize(params={}) params =params.clone uri = nil # TODO: Support :uri as an array for cluster configurations if params.kind_of?(String) uri = HornetQ::URI.new(params) params = uri.params else raise "Missing :uri param in HornetQ::Server.create_server" unless params[:uri] uri = HornetQ::URI.new(params.delete(:uri)) # params override uri params params = uri.params.merge(params) end @connection = nil @sessions = [] @consumers = [] # In-VM Transport has no fail-over or additional parameters if uri.host == 'invm' transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::INVM_CONNECTOR_CLASS_NAME) @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport) elsif params[:protocol] # Auto-Discovery just has a host name and port if params[:protocol] == 'discovery' @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(uri.host, uri.port) elsif params[:protocol] != 'netty' raise "Unknown HornetQ protocol:#{params[:protocol]}" end end # Unless already created, then the connection will use the netty protocol unless @connection # Primary Transport transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.host, 'port' => uri.port }) # Check for backup server connection information if uri.backup_host backup_transport = Java::org.hornetq.api.core.TransportConfiguration.new(HornetQ::NETTY_CONNECTOR_CLASS_NAME, {'host' => uri.backup_host, 'port' => uri.backup_port }) @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport, backup_transport) else @connection = Java::org.hornetq.api.core.client.HornetQClient.create_client_session_factory(transport) end end # If any other options were supplied, apply them to the created Connection instance params.each_pair do |key, val| method = key.to_s+'=' if @connection.respond_to? method @connection.send method, val #puts "Debug: #{key} = #{@connection.send key}" if @connection.respond_to? key.to_sym else HornetQ.logger.warn "Warning: Option:#{key}, with value:#{val} is invalid and being ignored" end end end |
Class Method Details
.connection(params = {}, &proc) ⇒ Object
Call the supplied code block after creating a connection instance See initialize for the parameter list The connection is closed before returning
Returns the result of the code block
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/hornetq/client/connection.rb', line 51 def self.connection(params={}, &proc) raise "Missing mandatory code block" unless proc connection = nil result = nil begin connection=self.new(params) result = proc.call(connection) ensure connection.close end result end |
.session(params = {}, &proc) ⇒ Object
Create a new Connection and Session
Creates a new connection and session, then passes the session to the supplied
block. Upon completion the session and connection are both closed
See Connection::initialize and Connection::create_session for the list
of parameters
Returns result of block
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/hornetq/client/connection.rb', line 13 def self.session(params={},&proc) raise "Missing mandatory code block" unless proc connection = nil session = nil begin if params.kind_of?(String) # TODO: Support passing username and password from URI to Session connection = self.new(params) connection.session({}, &proc) else connection = self.new(params[:connection] || {}) connection.session(params[:session] || {}, &proc) end ensure connection.close if connection end end |
.start_session(params = {}, &proc) ⇒ Object
Create a new Connection along with a Session, and then start the session
Creates a new connection and session, then passes the session to the supplied
block. Upon completion the session and connection are both closed
See Connection::initialize and Connection::create_session for the list
of parameters
Returns result of block
39 40 41 42 43 44 |
# File 'lib/hornetq/client/connection.rb', line 39 def self.start_session(params={},&proc) session(params) do |session| session.start proc.call(session) end end |
Instance Method Details
#close ⇒ Object
Close Connection connections
410 411 412 413 414 |
# File 'lib/hornetq/client/connection.rb', line 410 def close @sessions.each { |session| session.close } @connection.close if @connection @connection = nil end |
#create_session(params = {}) ⇒ Object
Create a new HornetQ session
Note: Remember to close the session once it is no longer used.
Recommend using #session with a block over this method where possible
Note:
-
The returned session MUST be closed once complete
connection = HornetQ::Client::Connection.new(:uri => 'hornetq://localhost/') session = connection.create_session ... session.close connection.close
Returns:
-
A new HornetQ ClientSession
-
See org.hornetq.api.core.client.ClientSession for documentation on returned object
Throws:
-
NativeException
-
…
Example:
require 'hornetq'
connection = nil
session = nil
begin
connection = HornetQ::Client::Connection.new(:uri => 'hornetq://localhost/')
session = connection.create_session
# Create a new queue
session.create_queue('Example', 'Example', true)
# Create a producer to send messages
producer = session.create_producer('Example')
# Create a Text Message
= session.(HornetQ::Client::Message::TEXT_TYPE,true)
.body_buffer.write_string('Hello World')
# Send the message
producer.send()
ensure
session.close if session
connection.close if connection
end
Parameters:
-
a Hash consisting of one or more of the named parameters
-
Summary of parameters and their default values
connection.create_session(
:username => 'my_username', # Default is no authentication
:password => 'password', # Default is no authentication
:xa => false,
:auto_commit_sends => true,
:auto_commit_acks => true,
:pre_acknowledge => false,
:ack_batch_size => 1
)
Mandatory Parameters
-
None
Optional Parameters
-
:username
-
The user name. To create an authenticated session
-
-
:password
-
The user password. To create an authenticated session
-
-
:xa
-
Whether the session supports XA transaction semantics or not
-
-
:auto_commit_sends
-
true: automatically commit message sends
-
false: commit manually
-
-
:auto_commit_acks
-
true: automatically commit message acknowledgement
-
false: commit manually
-
-
:pre_acknowledge
-
true: to pre-acknowledge messages on the server
-
false: to let the client acknowledge the messages
-
Note: It is possible to pre-acknowledge messages on the server so that the client can avoid additional network trip to the server to acknowledge messages. While this increases performance, this does not guarantee delivery (as messages can be lost after being pre-acknowledged on the server). Use with caution if your application design permits it.
-
-
:ack_batch_size
-
the batch size of the acknowledgements
-
-
:managed
-
true: The session will be managed by the connection. It will be
closed when the connection is closed. Also the session will be started or stopped when Connection#start_managed_sessions or Connection#stop_managed_sessions is called
-
false: The caller is responsible for closing the session
-
Default: false
-
317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/hornetq/client/connection.rb', line 317 def create_session(params={}) raise "HornetQ::Client::Connection Already Closed" unless @connection session = @connection.create_session( params[:username], params[:password], params[:xa] || false, params[:auto_commit_sends].nil? ? true : params[:auto_commit_sends], params[:auto_commit_acks].nil? ? true : params[:auto_commit_acks], params[:pre_acknowledge] || false, params[:ack_batch_size] || 1) (@sessions << session) if params.fetch(:managed, false) session end |
#create_session_pool(params = {}) ⇒ Object
Create a Session pool
404 405 406 407 |
# File 'lib/hornetq/client/connection.rb', line 404 def create_session_pool(params={}) require 'hornetq/client/session_pool' SessionPool.new(self, params) end |
#on_message(params, &proc) ⇒ Object
Receive messages in a separate thread when they arrive Allows messages to be received in a separate thread. I.e. Asynchronously This method will return to the caller before messages are processed. It is then the callers responsibility to keep the program active so that messages can then be processed.
Note:
Session Parameters:
:options => any of the javax.jms.Session constants
Default: javax.jms.Session::AUTO_ACKNOWLEDGE
:session_count : Number of sessions to create, each with their own consumer which
in turn will call the supplied block.
Note: The supplied block must be thread safe since it will be called
by several threads at the same time.
I.e. Don't change instance variables etc. without the
necessary semaphores etc.
Default: 1
Consumer Parameters:
:queue_name => The name of the queue to consume messages from. Mandatory
:filter => Only consume messages matching the filter: Default: nil
:browse_only => Whether to just browse the queue or consume messages
true | false. Default: false
:window_size => The consumer window size.
:max_rate => The maximum rate to consume messages.
:auto_start => Immediately start processing messages.
If set to false, call Connection#start_managed_sessions
to manually start receive messages later
Default: true
:statistics Capture statistics on how many messages have been read
true : This method will capture statistics on the number of messages received
and the time it took to process them.
The timer starts when each() is called and finishes when either the last message was received,
or when Destination::statistics is called. In this case MessageConsumer::statistics
can be called several times during processing without affecting the end time.
Also, the start time and message count is not reset until MessageConsumer::each
is called again with :statistics => true
The statistics gathered are returned when :statistics => true and :async => false
Usage: For transacted sessions (the default) the Proc supplied must return
either true or false:
true => The session is committed
false => The session is rolled back
Any Exception => The session is rolled back
Notes:
-
Remember to call ::start on the connection otherwise the on_message will not start consuming any messages
-
Remember to call message.acknowledge before completing the block so that
the will be removed from the queue
-
If the block throws an exception, the
471 472 473 474 475 476 477 478 479 480 481 |
# File 'lib/hornetq/client/connection.rb', line 471 def (params, &proc) consumer_count = params[:session_count] || 1 consumer_count.times do session = self.create_session(params) consumer = session.create_consumer_from_params(params) consumer.(params, &proc) session.start if params.fetch(:auto_start, true) @consumers << consumer @sessions << session end end |
#on_message_statistics ⇒ Object
483 484 485 |
# File 'lib/hornetq/client/connection.rb', line 483 def @consumers.collect{|consumer| consumer.} end |
#session(params = {}, &proc) ⇒ Object
Create a session, call the supplied block and once it completes close the session. See session_create for the Parameters
Returns the result of the block
Example:
require 'hornetq'
HornetQ::Client::Connection.connection('hornetq://localhost/') do |connection
connection.session do |session|
# Create a producer to send messages
session.producer('Example') do |producer|
# Create a Text Message
message = session.create_message(HornetQ::Client::Message::TEXT_TYPE,true)
message.body = 'Hello World'
# Send the message
producer.send(message)
end
end
end
355 356 357 358 359 360 361 362 363 364 |
# File 'lib/hornetq/client/connection.rb', line 355 def session(params={}, &proc) raise "HornetQ::Client::session mandatory block missing" unless proc session = nil begin session = create_session(params) proc.call(session) ensure session.close if session end end |
#start_managed_sessions ⇒ Object
Start all sessions managed by this connection
Sessions created via #create_session are not managed unless :auto_close => true was specified when the session was created
Session are Only managed when created through the following methods:
Connection#on_message
Connection#create_session And :auto_close => true
This call does not do anything to sessions in a session pool
497 498 499 |
# File 'lib/hornetq/client/connection.rb', line 497 def start_managed_sessions @sessions.each {|session| session.start} end |
#start_session(params = {}, &proc) ⇒ Object
Create a session, start the session, call the supplied block and once the block completes close the session.
See: #session_create for the Parameters
Returns the result of the block
Example:
require 'hornetq'
HornetQ::Client::Connection.connection('hornetq://localhost/') do |connection
# Must start the session other we cannot consume messages using it
connection.start_session do |session|
# Create a consumer to receive messages
session.consumer('TestQueue') do |consumer|
consumer.each do |message|
message.acknowledge
end
end
end
end
391 392 393 394 395 396 397 398 399 400 401 |
# File 'lib/hornetq/client/connection.rb', line 391 def start_session(params={}, &proc) raise "HornetQ::Client::session mandatory block missing" unless proc session = nil begin session = create_session(params) session.start proc.call(session) ensure session.close if session end end |
#stop_managed_sessions ⇒ Object
Stop all sessions managed by this connection so that they no longer receive messages for processing
See: #start_managed_sessions for details on which sessions are managed
505 506 507 |
# File 'lib/hornetq/client/connection.rb', line 505 def stop_managed_sessions @sessions.each {|session| session.stop} end |