43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/thrifty_bunny/rpc_server.rb', line 43
def serve(options={})
max_messages = options[:max_messages].nil? ? 10 : options[:max_messages]
@request_channel = @conn.create_channel(nil, max_messages )
@request_channel.prefetch(options[:prefetch]) if options[:prefetch]
@request_queue = @request_channel.queue(@queue_name, :auto_delete => true)
@request_queue.subscribe(:block => true) do |delivery_info, properties, payload|
if log?
Thread.current["correlation_id"] = properties.correlation_id
print_log "---- Message received ----"
print_log "HEADERS: #{properties}"
end
Thread.current["correlation_id"] = properties.correlation_id
response_channel = @conn.create_channel
response_exchange = response_channel.default_exchange
response_required = properties..has_key?('response_required') ? properties.['response_required'] : true
process_timeout = @timeout > properties.expiration.to_i ? @timeout : properties.expiration.to_i
puts "!!!!!!!!!!! process_timeout: #{process_timeout}"
if properties.content_type == 'application/octet-stream'
print_log "Request to process #{@queue_name}.#{properties.['operation']} in #{process_timeout}sec" if log?
input = StringIO.new payload
out = StringIO.new
transport = Thrift::IOStreamTransport.new input, out
protocol = @protocol_factory.new.get_protocol transport
begin
start_time = Time.now
Timeout.timeout(process_timeout, ProcessingTimeout) do
@processor.process protocol, protocol
end
processing_time = Time.now - start_time
if out.length > 0
out.rewind
print_log "Time to process request: #{processing_time}sec Response length: #{out.length}" if log?
if response_required
response_exchange.publish(out.read(out.length),
:routing_key => properties.reply_to,
:correlation_id => properties.correlation_id,
:content_type => 'application/octet-stream' )
end
end
rescue ProcessingTimeout => ex
print_log "A timeout has occurred (#{process_timeout}sec) trying to call #{@queue_name}.#{properties.['operation']}"
end
else
print_log "Unable to process message content of type #{properties.content_type}. The message will be rejected"
@request_channel.reject(delivery_info.delivery_tag, false)
end
response_channel.close
end
end
|