Class: ThriftyBunny::ClientTransport
- Inherits:
-
Thrift::BaseTransport
- Object
- Thrift::BaseTransport
- ThriftyBunny::ClientTransport
- Defined in:
- lib/thrifty_bunny/client_transport.rb
Instance Method Summary collapse
- #close ⇒ Object
-
#flush(options = {}) ⇒ Object
If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!.
-
#initialize(config = Config.new, options = {}) ⇒ ClientTransport
constructor
A new instance of ClientTransport.
- #log? ⇒ Boolean
- #open? ⇒ Boolean
- #read(sz) ⇒ Object
- #write(buf) ⇒ Object
Constructor Details
#initialize(config = Config.new, options = {}) ⇒ ClientTransport
Returns a new instance of ClientTransport.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/thrifty_bunny/client_transport.rb', line 11 def initialize(config=Config.new, ={}) @service_queue_name = config.queue @outbuf = Thrift::Bytes.empty_byte_buffer if [:connection].nil? @conn = Bunny.new(config.bunny_config) @conn.start @connection_started = true else @conn = [:connection] @connection_started = false end @from_name = [:from_name] || 'Unknown Client' @exchange = [:exchange] @ch = @conn.create_channel @service_exchange = @exchange.nil? ? @ch.default_exchange : @ch.direct(@exchange, durable: true) @service_response_exchange = @ch.default_exchange @reply_queue = @ch.queue('', exclusive: true) @is_opened = true @timeout = config.timeout @log = config.log end |
Instance Method Details
#close ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/thrifty_bunny/client_transport.rb', line 41 def close if @is_opened @reply_queue.delete @ch.close if @connection_started @conn.close @connection_started = false end @is_opened = false end end |
#flush(options = {}) ⇒ Object
If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/thrifty_bunny/client_transport.rb', line 61 def flush(={}) operation = [:operation] || "" blocking = .has_key?(:blocking) ? [:blocking] : true correlation_id = self.generate_uuid headers = { :service_name => @service_queue_name, :operation => operation, :response_required => blocking, #Tell the receiver if a response is required :from_name => @from_name } #Publish the message print_log "Publishing message reply-to: #{@reply_queue.name} - headers: #{headers}", correlation_id if log? start_time = Time.now @service_exchange.publish(@outbuf, :routing_key => @service_queue_name, :correlation_id => correlation_id, :expiration => @timeout, :reply_to => @reply_queue.name, :headers => headers) #If this is a standard RPC blocking call, then wait for there to be a response from the #service provider or timeout and log the timeout if blocking @response = "" begin #Adding 1sec to timeout to account for clock differences Timeout.timeout(@timeout + 1, ResponseTimeout) do @reply_queue.subscribe(:block => true) do |delivery_info, properties, payload| if log? response_time = Time.now - start_time print_log "---- Response Message received in #{response_time}sec for #{@reply_queue.name}", correlation_id print_log "HEADERS: #{properties}", correlation_id end if properties[:correlation_id] == correlation_id @response = payload #once the return message has been received, no need to continue a subscription delivery_info.consumer.cancel end end end rescue ResponseTimeout => ex #Trying to work around weirdness being seen in a multi threaded workflow environment if @response == "" msg = "A timeout has occurred (#{@timeout}sec) trying to call #{@service_queue_name}.#{operation}" print_log msg, correlation_id raise ex, msg else print_log "Ignoring timeout - #{@response}", correlation_id end end @inbuf = StringIO.new Thrift::Bytes.force_binary_encoding(@response) end @outbuf = Thrift::Bytes.empty_byte_buffer end |
#log? ⇒ Boolean
37 38 39 |
# File 'lib/thrifty_bunny/client_transport.rb', line 37 def log? @log end |
#open? ⇒ Boolean
55 |
# File 'lib/thrifty_bunny/client_transport.rb', line 55 def open?; @is_opened end |
#read(sz) ⇒ Object
56 |
# File 'lib/thrifty_bunny/client_transport.rb', line 56 def read(sz); @inbuf.read sz end |
#write(buf) ⇒ Object
57 |
# File 'lib/thrifty_bunny/client_transport.rb', line 57 def write(buf); @outbuf << Thrift::Bytes.force_binary_encoding(buf) end |