Class: Thrift::AmqpRpcClientTransport

Inherits:
BaseTransport
  • Object
show all
Defined in:
lib/thrift/amqp/amqp_rpc_client.rb

Instance Method Summary collapse

Constructor Details

#initialize(service_queue_name, opts = {}) ⇒ AmqpRpcClientTransport

Returns a new instance of AmqpRpcClientTransport.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 32

def initialize(service_queue_name, opts={})
  @service_queue_name = service_queue_name
  @outbuf = Bytes.empty_byte_buffer
  @timeout = opts[:timeout] || 100
  if opts[:connection].nil?
    if opts[:host].nil?
      raise ArgumentError, ":host key not provided in opts dict to make connection"
    end

    if opts[:port].nil?
      raise ArgumentError, ":port key not provided in opts dict to make connection"
    end

    vhost = opts[:vhost] || "/"
    user = opts[:user] || "guest"
    password = opts[:password] || "guest"
    ssl = opts[:ssl] || false

    @conn = Bunny.new(:host => opts[:host], :port => opts[:port], :vhost => vhost, :user => user, :password => password, :ssl=> ssl)

    @conn.start
    @connection_started = true
  else
    @conn = opts[:connection]
    @connection_started = false
  end

  @from_name                 = opts[:from_name].nil? ? "Unknown Client" : opts[:from_name]
  @exchange                  = opts[:exchange] || nil

  @ch                        = @conn.create_channel
  @service_exchange          = @exchange.nil? ? @ch.default_exchange : declare_exchange
  @service_response_exchange = @ch.default_exchange
  @reply_queue               = @ch.queue("", :exclusive => true)
  @is_opened                 = true

end

Instance Method Details

#closeObject



76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 76

def close
  if @is_opened
    @reply_queue.delete
    @ch.close

    if @connection_started
      @conn.close
      @connection_started = false
    end

    @is_opened = false
  end
end

#declare_exchangeObject



70
71
72
73
74
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 70

def declare_exchange
  @ch.direct(@exchange)
rescue
  @ch.direct(@exchange, no_declare: true)
end

#flush(options = {}) ⇒ Object

If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 96

def flush(options={})

  operation = options.has_key?(:operation) ? options[:operation] : ""
  blocking = options.has_key?(:blocking) ? options[:blocking] : true
  msg_timeout = options.has_key?(:msg_timeout) ? options[:msg_timeout] : @timeout
  log_messages = options.has_key?(:log_messages) ? options[:log_messages] : false

  correlation_id = self.generate_uuid

  headers = {:service_name      => @service_queue_name,
             :operation         => operation,
             :response_required => blocking,   #Tell the receiver if a response is required
             :from_name         => @from_name
  }

  #Publish the message
  print_log "Publishing message reply-to: #{@reply_queue.name} - headers: #{headers}", correlation_id if log_messages
  start_time = Time.now
  @service_exchange.publish(@outbuf,
                            :routing_key    => @service_queue_name,
                            :correlation_id => correlation_id,
                            :expiration     => msg_timeout,
                            :reply_to       => @reply_queue.name,
                            :headers        => headers)

  #If this is a standard RPC blocking call, then wait for there to be a response from the
  #service provider or timeout and log the timeout
  if blocking
    @response = ""
    begin
      #Adding 1sec to timeout to account for clock differences
      Timeout.timeout(msg_timeout + 1, ResponseTimeout) do
        @reply_queue.subscribe(:block => true) do |delivery_info, properties, payload|

          if log_messages
            response_time = Time.now - start_time
            print_log "---- Response Message received in #{response_time}sec for #{@reply_queue.name}", correlation_id
            print_log "HEADERS: #{properties}", correlation_id
          end

          if properties[:correlation_id] == correlation_id
            @response = payload

            #once the return message has been received, no need to continue a subscription
            delivery_info.consumer.cancel
          end
        end
      end
    rescue ResponseTimeout => ex
      #Trying to work around weirdness being seen in a multi threaded workflow environment
      if @response == ""
        msg = "A timeout has occurred (#{msg_timeout}sec) trying to call #{@service_queue_name}.#{operation}"
        print_log msg, correlation_id
        @outbuf = Bytes.empty_byte_buffer
        raise ex, msg
      else
        print_log "Ignoring timeout - #{@response}", correlation_id
      end
    rescue => e
      @outbuf = Bytes.empty_byte_buffer
      raise e
    end
    @inbuf = StringIO.new Bytes.force_binary_encoding(@response)
  end
  @outbuf = Bytes.empty_byte_buffer
end

#open?Boolean

Returns:

  • (Boolean)


90
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 90

def open?; @is_opened end

#read(sz) ⇒ Object



91
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 91

def read(sz); @inbuf.read sz end

#write(buf) ⇒ Object



92
# File 'lib/thrift/amqp/amqp_rpc_client.rb', line 92

def write(buf); @outbuf << Bytes.force_binary_encoding(buf) end