Class: ThriftyBunny::RpcServer
- Inherits:
-
Thrift::BaseServer
- Object
- Thrift::BaseServer
- ThriftyBunny::RpcServer
- Defined in:
- lib/thrifty_bunny/rpc_server.rb
Defined Under Namespace
Classes: ProcessingTimeout
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(processor, config = Config.new, options = {}) ⇒ RpcServer
constructor
A new instance of RpcServer.
- #serve(options = {}) ⇒ Object
Constructor Details
#initialize(processor, config = Config.new, options = {}) ⇒ RpcServer
Returns a new instance of RpcServer.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/thrifty_bunny/rpc_server.rb', line 9 def initialize(processor, config=Config.new, ={}) @processor = processor if [:connection].nil? @conn = Bunny.new(config.bunny_config) @conn.start else @conn = [:connection] end @queue_name = config.queue @protocol_factory = [:protocol_factory] || Thrift::BinaryProtocolFactory @exchange = config.exchange end |
Instance Method Details
#close ⇒ Object
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/thrifty_bunny/rpc_server.rb', line 26 def close if not @request_channel.nil? and @request_channel.respond_to?('close') @request_channel.close end #Always close the broker connection when closing the server @conn.close end |
#serve(options = {}) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/thrifty_bunny/rpc_server.rb', line 37 def serve(={}) = [:log_messages] || false = [:max_messages].nil? ? 10 : [:max_messages] response_timeout = [:response_timeout] || 10 #Create a channel to the service queue @request_channel = @conn.create_channel(nil, ) @request_channel.prefetch([:prefetch]) if [:prefetch] @request_queue = @request_channel.queue(@queue_name, :auto_delete => true) @request_queue.subscribe(:block => true) do |delivery_info, properties, payload| if Thread.current["correlation_id"] = properties.correlation_id print_log "---- Message received ----" print_log "HEADERS: #{properties}" end Thread.current["correlation_id"] = properties.correlation_id response_channel = @conn.create_channel response_exchange = response_channel.default_exchange response_required = properties.headers.has_key?('response_required') ? properties.headers['response_required'] : true process_timeout = response_timeout.to_i > properties.expiration.to_i ? response_timeout.to_i : properties.expiration.to_i #Binary content will imply thrift based message payload if properties.content_type == 'application/octet-stream' print_log "Request to process #{@queue_name}.#{properties.headers['operation']} in #{process_timeout}sec" if input = StringIO.new payload out = StringIO.new transport = Thrift::IOStreamTransport.new input, out protocol = @protocol_factory.new.get_protocol transport begin start_time = Time.now Timeout.timeout(process_timeout, ProcessingTimeout) do @processor.process protocol, protocol end processing_time = Time.now - start_time #rewind the buffer for reading if out.length > 0 out.rewind print_log "Time to process request: #{processing_time}sec Response length: #{out.length}" if if response_required response_exchange.publish(out.read(out.length), :routing_key => properties.reply_to, :correlation_id => properties.correlation_id, :content_type => 'application/octet-stream' ) end end rescue ProcessingTimeout => ex print_log "A timeout has occurred (#{process_timeout}sec) trying to call #{@queue_name}.#{properties.headers['operation']}" end else print_log "Unable to process message content of type #{properties.content_type}. The message will be rejected" @request_channel.reject(delivery_info.delivery_tag, false) end response_channel.close end end |