Class: Thrift::AmqpRpcServer

Inherits:
BaseServer
  • Object
show all
Defined in:
lib/thrift/amqp/amqp_rpc_service.rb

Defined Under Namespace

Classes: ProcessingTimeout

Instance Method Summary collapse

Constructor Details

#initialize(processor, opts = {}) ⇒ AmqpRpcServer

Returns a new instance of AmqpRpcServer.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/thrift/amqp/amqp_rpc_service.rb', line 28

def initialize(processor, opts={})

  @processor = processor

  if opts[:connection].nil?

    if opts[:host].nil?
      raise ArgumentError, ":host key not provided in opts dict to make connection"
    end

    if opts[:port].nil?
      raise ArgumentError, ":port key not provided in opts dict to make connection"
    end

    vhost = opts[:vhost] || "/"
    user = opts[:user] || "guest"
    password = opts[:password] || "guest"
    ssl = opts[:ssl] || false

    @conn = Bunny.new(:host => opts[:host], :port => opts[:port], :vhost => vhost, :user => user, :password => password, :ssl=> ssl)
    @conn.start
  else
    @conn = opts[:connection]
  end

  #print "service:", @conn, "\n"

  if not opts.has_key?(:queue_name)
    raise ArgumentError, "A service queue name has not been specified"
  end

  @queue_name = opts[:queue_name]
  @protocol_factory = opts[:protocol_factory] || BinaryProtocolFactory
  @exchange = opts[:exchange] || nil

end

Instance Method Details

#closeObject



65
66
67
68
69
70
71
72
73
74
# File 'lib/thrift/amqp/amqp_rpc_service.rb', line 65

def close

  if not @request_channel.nil? and @request_channel.respond_to?('close')
    @request_channel.close
  end

  #Always close the broker connection when closing the server
  @conn.close

end

#serve(options = {}) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/thrift/amqp/amqp_rpc_service.rb', line 78

def serve(options={})
  log_messages = options[:log_messages] || false
  max_messages = options[:max_messages].nil? ? 10 : options[:max_messages]
  response_timeout = options[:response_timeout] || 10

  #Create a channel to the service queue
  @request_channel = @conn.create_channel(nil, max_messages )

  if @exchange.nil?
    @service_exchange = @request_channel.default_exchange
    @request_queue = @request_channel.queue(@queue_name, :auto_delete => true)
  else
    @service_exchange = @request_channel.direct(@exchange,:durable => true)
    @request_queue = @request_channel.queue(@queue_name, :auto_delete => true).bind(@service_exchange, :routing_key => @queue_name)
  end

  @request_queue.subscribe(:block => true) do |delivery_info, properties, payload|
    if log_messages
      Thread.current["correlation_id"] = properties.correlation_id
      print_log "---- Message received ----"
      print_log "HEADERS: #{properties}"
    end

    Thread.current["correlation_id"] = properties.correlation_id

    response_channel = @conn.create_channel
    response_exchange = response_channel.default_exchange

    response_required = (properties.headers && properties.headers.has_key?('response_required')) ? properties.headers['response_required'] : true
    process_timeout = response_timeout.to_i > properties.expiration.to_i ? response_timeout.to_i : properties.expiration.to_i


    print_log "Request to process #{@queue_name}.#{properties.headers['operation']} in #{process_timeout}sec" if log_messages

    input = StringIO.new payload
    out = StringIO.new
    transport = IOStreamTransport.new input, out
    protocol = @protocol_factory.new.get_protocol transport

    begin
      start_time = Time.now
      Timeout.timeout(process_timeout, ProcessingTimeout) do
        @processor.process protocol, protocol
      end
      processing_time = Time.now - start_time

      #rewind the buffer for reading
      if out.length > 0
        out.rewind

        print_log "Time to process request: #{processing_time}sec  Response length: #{out.length}"   if log_messages

        if response_required
          response_exchange.publish(out.read(out.length),
                                    :routing_key => properties.reply_to,
                                    :correlation_id => properties.correlation_id,
                                    :content_type => 'application/octet-stream' )
        end
      end

    rescue ProcessingTimeout => ex
      print_log "A timeout has occurred (#{process_timeout}sec) trying to call #{@queue_name}.#{properties.headers['operation']}"
    end

    response_channel.close


  end
end