Class: Thrift::AmqpRpcClientTransport
- Inherits:
-
BaseTransport
- Object
- BaseTransport
- Thrift::AmqpRpcClientTransport
- Defined in:
- lib/thrift/amqp/amqp_rpc_client.rb
Instance Method Summary collapse
- #close ⇒ Object
-
#flush(options = {}) ⇒ Object
If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!.
-
#initialize(service_queue_name, opts = {}) ⇒ AmqpRpcClientTransport
constructor
A new instance of AmqpRpcClientTransport.
- #open? ⇒ Boolean
- #read(sz) ⇒ Object
- #write(buf) ⇒ Object
Constructor Details
#initialize(service_queue_name, opts = {}) ⇒ AmqpRpcClientTransport
Returns a new instance of AmqpRpcClientTransport.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 32 def initialize(service_queue_name, opts={}) @service_queue_name = service_queue_name @outbuf = Bytes.empty_byte_buffer if opts[:connection].nil? if opts[:host].nil? raise ArgumentError, ":host key not provided in opts dict to make connection" end if opts[:port].nil? raise ArgumentError, ":port key not provided in opts dict to make connection" end vhost = opts[:vhost] || "/" user = opts[:user] || "guest" password = opts[:password] || "guest" ssl = opts[:ssl] || false @conn = Bunny.new(:host => opts[:host], :port => opts[:port], :vhost => vhost, :user => user, :password => password, :ssl=> ssl) @conn.start @connection_started = true else @conn = opts[:connection] @connection_started = false end @from_name = opts[:from_name].nil? ? "Unknown Client" : opts[:from_name] @exchange = opts[:exchange] || nil @ch = @conn.create_channel @service_exchange = @exchange.nil? ? @ch.default_exchange : @ch.direct(@exchange, no_declare: true) @service_response_exchange = @ch.default_exchange @reply_queue = @ch.queue("", :exclusive => true) @is_opened = true end |
Instance Method Details
#close ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 70 def close if @is_opened @reply_queue.delete @ch.close if @connection_started @conn.close @connection_started = false end @is_opened = false end end |
#flush(options = {}) ⇒ Object
If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 90 def flush(={}) operation = .has_key?(:operation) ? [:operation] : "" blocking = .has_key?(:blocking) ? [:blocking] : true msg_timeout = .has_key?(:msg_timeout) ? [:msg_timeout] : 10 = .has_key?(:log_messages) ? [:log_messages] : false correlation_id = self.generate_uuid headers = {:service_name => @service_queue_name, :operation => operation, :response_required => blocking, #Tell the receiver if a response is required :from_name => @from_name } #Publish the message print_log "Publishing message reply-to: #{@reply_queue.name} - headers: #{headers}", correlation_id if start_time = Time.now @service_exchange.publish(@outbuf, :routing_key => @service_queue_name, :correlation_id => correlation_id, :expiration => msg_timeout, :reply_to => @reply_queue.name, :headers => headers) #If this is a standard RPC blocking call, then wait for there to be a response from the #service provider or timeout and log the timeout if blocking @response = "" begin #Adding 1sec to timeout to account for clock differences Timeout.timeout(msg_timeout + 1, ResponseTimeout) do @reply_queue.subscribe(:block => true) do |delivery_info, properties, payload| if response_time = Time.now - start_time print_log "---- Response Message received in #{response_time}sec for #{@reply_queue.name}", correlation_id print_log "HEADERS: #{properties}", correlation_id end if properties[:correlation_id] == correlation_id @response = payload #once the return message has been received, no need to continue a subscription delivery_info.consumer.cancel end end end rescue ResponseTimeout => ex #Trying to work around weirdness being seen in a multi threaded workflow environment if @response == "" msg = "A timeout has occurred (#{msg_timeout}sec) trying to call #{@service_queue_name}.#{operation}" print_log msg, correlation_id raise ex, msg else print_log "Ignoring timeout - #{@response}", correlation_id end end @inbuf = StringIO.new Bytes.force_binary_encoding(@response) end @outbuf = Bytes.empty_byte_buffer end |
#open? ⇒ Boolean
84 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 84 def open?; @is_opened end |
#read(sz) ⇒ Object
85 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 85 def read(sz); @inbuf.read sz end |
#write(buf) ⇒ Object
86 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 86 def write(buf); @outbuf << Bytes.force_binary_encoding(buf) end |