Class: Thrift::AmqpRpcClientTransport
- Inherits:
-
BaseTransport
- Object
- BaseTransport
- Thrift::AmqpRpcClientTransport
- Defined in:
- lib/thrift/amqp/amqp_rpc_client.rb
Instance Method Summary collapse
- #close ⇒ Object
- #declare_exchange ⇒ Object
-
#flush(options = {}) ⇒ Object
If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!.
-
#initialize(service_queue_name, opts = {}) ⇒ AmqpRpcClientTransport
constructor
A new instance of AmqpRpcClientTransport.
- #open? ⇒ Boolean
- #read(sz) ⇒ Object
- #write(buf) ⇒ Object
Constructor Details
#initialize(service_queue_name, opts = {}) ⇒ AmqpRpcClientTransport
Returns a new instance of AmqpRpcClientTransport.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 32 def initialize(service_queue_name, opts={}) @service_queue_name = service_queue_name @outbuf = Bytes.empty_byte_buffer @timeout = opts[:timeout] || 100 if opts[:connection].nil? if opts[:host].nil? raise ArgumentError, ":host key not provided in opts dict to make connection" end if opts[:port].nil? raise ArgumentError, ":port key not provided in opts dict to make connection" end vhost = opts[:vhost] || "/" user = opts[:user] || "guest" password = opts[:password] || "guest" ssl = opts[:ssl] || false @conn = Bunny.new(:host => opts[:host], :port => opts[:port], :vhost => vhost, :user => user, :password => password, :ssl=> ssl) @conn.start @connection_started = true else @conn = opts[:connection] @connection_started = false end @from_name = opts[:from_name].nil? ? "Unknown Client" : opts[:from_name] @exchange = opts[:exchange] || nil @ch = @conn.create_channel @service_exchange = @exchange.nil? ? @ch.default_exchange : declare_exchange @service_response_exchange = @ch.default_exchange @reply_queue = @ch.queue("", :exclusive => true) @is_opened = true end |
Instance Method Details
#close ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 76 def close if @is_opened @reply_queue.delete @ch.close if @connection_started @conn.close @connection_started = false end @is_opened = false end end |
#declare_exchange ⇒ Object
70 71 72 73 74 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 70 def declare_exchange @ch.direct(@exchange) rescue @ch.direct(@exchange, no_declare: true) end |
#flush(options = {}) ⇒ Object
If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 96 def flush(={}) operation = .has_key?(:operation) ? [:operation] : "" blocking = .has_key?(:blocking) ? [:blocking] : true msg_timeout = .has_key?(:msg_timeout) ? [:msg_timeout] : @timeout = .has_key?(:log_messages) ? [:log_messages] : false correlation_id = self.generate_uuid headers = {:service_name => @service_queue_name, :operation => operation, :response_required => blocking, #Tell the receiver if a response is required :from_name => @from_name } #Publish the message print_log "Publishing message reply-to: #{@reply_queue.name} - headers: #{headers}", correlation_id if start_time = Time.now @service_exchange.publish(@outbuf, :routing_key => @service_queue_name, :correlation_id => correlation_id, :expiration => msg_timeout, :reply_to => @reply_queue.name, :headers => headers) #If this is a standard RPC blocking call, then wait for there to be a response from the #service provider or timeout and log the timeout if blocking @response = "" begin #Adding 1sec to timeout to account for clock differences Timeout.timeout(msg_timeout + 1, ResponseTimeout) do @reply_queue.subscribe(:block => true) do |delivery_info, properties, payload| if response_time = Time.now - start_time print_log "---- Response Message received in #{response_time}sec for #{@reply_queue.name}", correlation_id print_log "HEADERS: #{properties}", correlation_id end if properties[:correlation_id] == correlation_id @response = payload #once the return message has been received, no need to continue a subscription delivery_info.consumer.cancel end end end rescue ResponseTimeout => ex #Trying to work around weirdness being seen in a multi threaded workflow environment if @response == "" msg = "A timeout has occurred (#{msg_timeout}sec) trying to call #{@service_queue_name}.#{operation}" print_log msg, correlation_id @outbuf = Bytes.empty_byte_buffer raise ex, msg else print_log "Ignoring timeout - #{@response}", correlation_id end rescue => e @outbuf = Bytes.empty_byte_buffer raise e end @inbuf = StringIO.new Bytes.force_binary_encoding(@response) end @outbuf = Bytes.empty_byte_buffer end |
#open? ⇒ Boolean
90 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 90 def open?; @is_opened end |
#read(sz) ⇒ Object
91 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 91 def read(sz); @inbuf.read sz end |
#write(buf) ⇒ Object
92 |
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 92 def write(buf); @outbuf << Bytes.force_binary_encoding(buf) end |