Class: Thrift::AmqpRpcClientTransport

Inherits:
BaseTransport
  • Object
show all
Defined in:
lib/thrift/amqp/amqp_rpc_client.rb

Instance Method Summary collapse

Constructor Details

#initialize(service_queue_name, opts = {}) ⇒ AmqpRpcClientTransport

Returns a new instance of AmqpRpcClientTransport.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 32

def initialize(service_queue_name, opts={})
  @service_queue_name = service_queue_name
  @outbuf = Bytes.empty_byte_buffer

  if opts[:connection].nil?
    if opts[:host].nil?
      raise ArgumentError, ":host key not provided in opts dict to make connection"
    end

    if opts[:port].nil?
      raise ArgumentError, ":port key not provided in opts dict to make connection"
    end

    vhost = opts[:vhost] || "/"
    user = opts[:user] || "guest"
    password = opts[:password] || "guest"
    ssl = opts[:ssl] || false

    @conn = Bunny.new(:host => opts[:host], :port => opts[:port], :vhost => vhost, :user => user, :password => password, :ssl=> ssl)

    @conn.start
    @connection_started = true
  else
    @conn = opts[:connection]
    @connection_started = false
  end

  @from_name                 = opts[:from_name].nil? ? "Unknown Client" : opts[:from_name]
  @exchange                  = opts[:exchange] || nil

  @ch                        = @conn.create_channel
  @service_exchange          = @exchange.nil? ? @ch.default_exchange : @ch.direct(@exchange, no_declare: true)
  @service_response_exchange = @ch.default_exchange
  @reply_queue               = @ch.queue("", :exclusive => true)
  @is_opened                 = true

end

Instance Method Details

#closeObject



70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 70

def close
  if @is_opened
    @reply_queue.delete
    @ch.close

    if @connection_started
      @conn.close
      @connection_started = false
    end

    @is_opened = false
  end
end

#flush(options = {}) ⇒ Object

If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 90

def flush(options={})

  operation = options.has_key?(:operation) ? options[:operation] : ""
  blocking = options.has_key?(:blocking) ? options[:blocking] : true
  msg_timeout = options.has_key?(:msg_timeout) ? options[:msg_timeout] : 10
  log_messages = options.has_key?(:log_messages) ? options[:log_messages] : false

  correlation_id = self.generate_uuid

  headers = {:service_name      => @service_queue_name,
             :operation         => operation,
             :response_required => blocking,   #Tell the receiver if a response is required
             :from_name         => @from_name
  }

  #Publish the message
  print_log "Publishing message reply-to: #{@reply_queue.name} - headers: #{headers}", correlation_id if log_messages
  start_time = Time.now
  @service_exchange.publish(@outbuf,
                            :routing_key    => @service_queue_name,
                            :correlation_id => correlation_id,
                            :expiration     => msg_timeout,
                            :reply_to       => @reply_queue.name,
                            :headers        => headers)

  #If this is a standard RPC blocking call, then wait for there to be a response from the
  #service provider or timeout and log the timeout
  if blocking
    @response = ""
    begin
      #Adding 1sec to timeout to account for clock differences
      Timeout.timeout(msg_timeout + 1, ResponseTimeout) do
        @reply_queue.subscribe(:block => true) do |delivery_info, properties, payload|

          if log_messages
            response_time = Time.now - start_time
            print_log "---- Response Message received in #{response_time}sec for #{@reply_queue.name}", correlation_id
            print_log "HEADERS: #{properties}", correlation_id
          end

          if properties[:correlation_id] == correlation_id
            @response = payload

            #once the return message has been received, no need to continue a subscription
            delivery_info.consumer.cancel
          end
        end
      end
    rescue ResponseTimeout => ex
      #Trying to work around weirdness being seen in a multi threaded workflow environment
      if @response == ""
        msg = "A timeout has occurred (#{msg_timeout}sec) trying to call #{@service_queue_name}.#{operation}"
        print_log msg, correlation_id
        raise ex, msg
      else
        print_log "Ignoring timeout - #{@response}", correlation_id
      end
    end
    @inbuf = StringIO.new Bytes.force_binary_encoding(@response)
  end
  @outbuf = Bytes.empty_byte_buffer
end

#open?Boolean

Returns:

  • (Boolean)


84
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 84

def open?; @is_opened end

#read(sz) ⇒ Object



85
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 85

def read(sz); @inbuf.read sz end

#write(buf) ⇒ Object



86
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 86

def write(buf); @outbuf << Bytes.force_binary_encoding(buf) end