Class: ThriftyBunny::ClientTransport

Inherits:
Thrift::BaseTransport
  • Object
show all
Defined in:
lib/thrifty_bunny/client_transport.rb

Instance Method Summary collapse

Constructor Details

#initialize(config = Config.new, options = {}) ⇒ ClientTransport

Returns a new instance of ClientTransport.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/thrifty_bunny/client_transport.rb', line 11

def initialize(config=Config.new, options={})
  @service_queue_name = config.queue
  @outbuf = Thrift::Bytes.empty_byte_buffer

  if options[:connection].nil?
    @conn = Bunny.new(config.bunny_config)
    @conn.start
    @connection_started = true
  else
    @conn = options[:connection]
    @connection_started = false
  end

  @from_name                 = options[:from_name] || 'Unknown Client'
  @exchange                  = options[:exchange]

  @ch                        = @conn.create_channel
  @service_exchange          = @exchange.nil? ? @ch.default_exchange : @ch.direct(@exchange, durable: true)
  @service_response_exchange = @ch.default_exchange
  @reply_queue               = @ch.queue('', exclusive: true)
  @is_opened                 = true

  @timeout = config.timeout
  @log = config.log
end

Instance Method Details

#closeObject



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/thrifty_bunny/client_transport.rb', line 41

def close
  if @is_opened
    @reply_queue.delete
    @ch.close

    if @connection_started
      @conn.close
      @connection_started = false
    end

    @is_opened = false
  end
end

#flush(options = {}) ⇒ Object

If blocking is set to true then wait for a response message in the reply_to queue, otherwise just send and go!



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/thrifty_bunny/client_transport.rb', line 61

def flush(options={})

  operation = options[:operation] || ""
  blocking = options.has_key?(:blocking) ? options[:blocking] : true

  correlation_id = self.generate_uuid

  headers = {
    :service_name      => @service_queue_name,
    :operation         => operation,
    :response_required => blocking,   #Tell the receiver if a response is required
    :from_name         => @from_name
  }

  #Publish the message
  print_log "Publishing message reply-to: #{@reply_queue.name} - headers: #{headers}", correlation_id if log?
  start_time = Time.now
  @service_exchange.publish(@outbuf,
                            :routing_key    => @service_queue_name,
                            :correlation_id => correlation_id,
                            :expiration     => @timeout,
                            :reply_to       => @reply_queue.name,
                            :headers        => headers)

  #If this is a standard RPC blocking call, then wait for there to be a response from the
  #service provider or timeout and log the timeout
  if blocking
    @response = ""
    begin
      #Adding 1sec to timeout to account for clock differences
      Timeout.timeout(@timeout + 1, ResponseTimeout) do
        @reply_queue.subscribe(:block => true) do |delivery_info, properties, payload|

          if log?
            response_time = Time.now - start_time
            print_log "---- Response Message received in #{response_time}sec for #{@reply_queue.name}", correlation_id
            print_log "HEADERS: #{properties}", correlation_id
          end

          if properties[:correlation_id] == correlation_id
            @response = payload

            #once the return message has been received, no need to continue a subscription
            delivery_info.consumer.cancel
          end
        end
      end
    rescue ResponseTimeout => ex
      #Trying to work around weirdness being seen in a multi threaded workflow environment
      if @response == ""
        msg = "A timeout has occurred (#{@timeout}sec) trying to call #{@service_queue_name}.#{operation}"
        print_log msg, correlation_id
        raise ex, msg
      else
        print_log "Ignoring timeout - #{@response}", correlation_id
      end
    end
    @inbuf = StringIO.new Thrift::Bytes.force_binary_encoding(@response)
  end
  @outbuf = Thrift::Bytes.empty_byte_buffer
end

#log?Boolean

Returns:

  • (Boolean)


37
38
39
# File 'lib/thrifty_bunny/client_transport.rb', line 37

def log?
  @log
end

#open?Boolean

Returns:

  • (Boolean)


55
# File 'lib/thrifty_bunny/client_transport.rb', line 55

def open?; @is_opened end

#read(sz) ⇒ Object



56
# File 'lib/thrifty_bunny/client_transport.rb', line 56

def read(sz); @inbuf.read sz end

#write(buf) ⇒ Object



57
# File 'lib/thrifty_bunny/client_transport.rb', line 57

def write(buf); @outbuf << Thrift::Bytes.force_binary_encoding(buf) end