Class: ThriftServer::ThreadPoolServer

Inherits:
Thrift::ThreadPoolServer
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/thrift_server/thread_pool_server.rb

Defined Under Namespace

Classes: LogSubscriber, MetricsSubscriber

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#portObject

Returns the value of attribute port.



38
39
40
# File 'lib/thrift_server/thread_pool_server.rb', line 38

def port
  @port
end

Instance Method Details

#log(logger) ⇒ Object



40
41
42
43
# File 'lib/thrift_server/thread_pool_server.rb', line 40

def log(logger)
  subscribe LogSubscriber.new(logger)
  subscribe ThriftServer::LogSubscriber.new(logger)
end

#metrics(statsd) ⇒ Object



45
46
47
48
49
# File 'lib/thrift_server/thread_pool_server.rb', line 45

def metrics(statsd)
  subscribe MetricsSubscriber.new(statsd)
  subscribe ServerMetricsSubscriber.new(statsd)
  subscribe RpcMetricsSubscriber.new(statsd)
end

#protocolObject



55
56
57
# File 'lib/thrift_server/thread_pool_server.rb', line 55

def protocol
  @protocol_factory
end

#serveObject

NOTE: this a direct copy of the upstream code with instrumentation added.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/thrift_server/thread_pool_server.rb', line 74

def serve
  @server_transport.listen

  begin
    loop do
      @thread_q.push(:token)
      publish :thread_pool_server_pool_change, delta: 1

      Thread.new do
        begin
          loop do
            client = @server_transport.accept
            remote_address = client.handle.remote_address

            publish :server_connection_opened, remote_address

            trans = @transport_factory.get_transport(client)
            prot = @protocol_factory.get_protocol(trans)
            begin
              loop do
                @processor.process(prot, prot)
              end
            rescue Thrift::TransportException, Thrift::ProtocolException => e
              publish :server_connection_closed, remote_address
            ensure
              trans.close
            end
          end
        rescue => e
          @exception_q.push(e)
        ensure
          publish :thread_pool_server_pool_change, delta: -1
          @thread_q.pop # thread died!
        end
      end
    end
  ensure
    @server_transport.close
  end
end

#server_transportObject



63
64
65
# File 'lib/thrift_server/thread_pool_server.rb', line 63

def server_transport
  @server_transport
end

#start(dry_run: false) ⇒ Object



67
68
69
70
71
# File 'lib/thrift_server/thread_pool_server.rb', line 67

def start(dry_run: false)
  publish :server_start, self

  serve unless dry_run
end

#threadsObject



51
52
53
# File 'lib/thrift_server/thread_pool_server.rb', line 51

def threads
  @thread_q.max
end

#transportObject



59
60
61
# File 'lib/thrift_server/thread_pool_server.rb', line 59

def transport
  @transport_factory
end