Class: ThriftyBunny::ClientTransport
- Inherits:
-
Thrift::BaseTransport
- Object
- Thrift::BaseTransport
- ThriftyBunny::ClientTransport
- Defined in:
- lib/thrifty_bunny/client_transport.rb
Instance Method Summary collapse
- #close ⇒ Object
-
#flush(options = {}) ⇒ Object
If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!.
-
#initialize(config = Config.new, options = {}) ⇒ ClientTransport
constructor
A new instance of ClientTransport.
- #open? ⇒ Boolean
- #read(sz) ⇒ Object
- #write(buf) ⇒ Object
Constructor Details
#initialize(config = Config.new, options = {}) ⇒ ClientTransport
Returns a new instance of ClientTransport.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/thrifty_bunny/client_transport.rb', line 11 def initialize(config=Config.new, ={}) @service_queue_name = config.queue @outbuf = Thrift::Bytes.empty_byte_buffer if [:connection].nil? @conn = Bunny.new(config.bunny_config) @conn.start @connection_started = true else @conn = [:connection] @connection_started = false end @from_name = [:from_name] || 'Unknown Client' @exchange = [:exchange] @ch = @conn.create_channel @service_exchange = @exchange.nil? ? @ch.default_exchange : @ch.direct(@exchange, durable: true) @service_response_exchange = @ch.default_exchange @reply_queue = @ch.queue('', exclusive: true) @is_opened = true end |
Instance Method Details
#close ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/thrifty_bunny/client_transport.rb', line 35 def close if @is_opened @reply_queue.delete @ch.close if @connection_started @conn.close @connection_started = false end @is_opened = false end end |
#flush(options = {}) ⇒ Object
If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/thrifty_bunny/client_transport.rb', line 55 def flush(={}) operation = [:operation] || "" blocking = .has_key?(:blocking) ? [:blocking] : true msg_timeout = [:msg_timeout] || 10 = .has_key?(:log_messages) ? [:log_messages] : true correlation_id = self.generate_uuid headers = { :service_name => @service_queue_name, :operation => operation, :response_required => blocking, #Tell the receiver if a response is required :from_name => @from_name } #Publish the message print_log "Publishing message reply-to: #{@reply_queue.name} - headers: #{headers}", correlation_id if start_time = Time.now @service_exchange.publish(@outbuf, :routing_key => @service_queue_name, :correlation_id => correlation_id, :expiration => msg_timeout, :reply_to => @reply_queue.name, :headers => headers) #If this is a standard RPC blocking call, then wait for there to be a response from the #service provider or timeout and log the timeout if blocking @response = "" begin #Adding 1sec to timeout to account for clock differences Timeout.timeout(msg_timeout + 1, ResponseTimeout) do @reply_queue.subscribe(:block => true) do |delivery_info, properties, payload| if response_time = Time.now - start_time print_log "---- Response Message received in #{response_time}sec for #{@reply_queue.name}", correlation_id print_log "HEADERS: #{properties}", correlation_id end if properties[:correlation_id] == correlation_id @response = payload #once the return message has been received, no need to continue a subscription delivery_info.consumer.cancel end end end rescue ResponseTimeout => ex #Trying to work around weirdness being seen in a multi threaded workflow environment if @response == "" msg = "A timeout has occurred (#{msg_timeout}sec) trying to call #{@service_queue_name}.#{operation}" print_log msg, correlation_id raise ex, msg else print_log "Ignoring timeout - #{@response}", correlation_id end end @inbuf = StringIO.new Thrift::Bytes.force_binary_encoding(@response) end @outbuf = Thrift::Bytes.empty_byte_buffer end |
#open? ⇒ Boolean
49 |
# File 'lib/thrifty_bunny/client_transport.rb', line 49 def open?; @is_opened end |
#read(sz) ⇒ Object
50 |
# File 'lib/thrifty_bunny/client_transport.rb', line 50 def read(sz); @inbuf.read sz end |
#write(buf) ⇒ Object
51 |
# File 'lib/thrifty_bunny/client_transport.rb', line 51 def write(buf); @outbuf << Thrift::Bytes.force_binary_encoding(buf) end |