Class: Fluent::HttpPumaInput
- Inherits:
-
Input
- Object
- Input
- Fluent::HttpPumaInput
- Defined in:
- lib/fluent/plugin/in_http_puma.rb
Constant Summary collapse
- OK_RESPONSE =
[200, {'Content-type'=>'text/plain'}, ["OK"]]
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ HttpPumaInput
constructor
A new instance of HttpPumaInput.
- #on_request(env) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ HttpPumaInput
Returns a new instance of HttpPumaInput.
5 6 7 8 9 |
# File 'lib/fluent/plugin/in_http_puma.rb', line 5 def initialize require 'puma' require 'uri' super end |
Instance Method Details
#configure(conf) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/fluent/plugin/in_http_puma.rb', line 20 def configure(conf) super if @use_ssl && !@ssl_keys raise ConfigError, 'ssl_keys parameter is required when use_ssl is true' end if @format != 'default' @parser = TextParser.new @parser.configure(conf) end end |
#on_request(env) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/in_http_puma.rb', line 64 def on_request(env) uri = URI.parse(env['REQUEST_URI'.freeze]) params = Rack::Utils.parse_query(uri.query) path_info = uri.path begin path = path_info[1..-1] # remove / tag = path.split('/').join('.') record_time, record = parse_record(env, params) # Skip nil record if record.nil? return OK_RESPONSE end time = if param_time = params['time'.freeze] param_time = param_time.to_i param_time.zero? ? Engine.now : param_time else record_time.nil? ? Engine.now : record_time end rescue => e return [400, {'Content-type'=>'text/plain'}, ["Bad Request\n#{e}\n"]] end begin # Support batched requests if record.is_a?(Array) mes = MultiEventStream.new record.each do |single_record| single_time = single_record.delete("time".freeze) || time mes.add(single_time, single_record) end Engine.emit_stream(tag, mes) else Engine.emit(tag, time, record) end rescue => e return [500, {'Content-type'=>'text/plain'}, ["Internal Server Error\n#{e}\n"]] end return OK_RESPONSE end |
#run ⇒ Object
55 56 57 58 59 60 |
# File 'lib/fluent/plugin/in_http_puma.rb', line 55 def run @server.run(false) rescue => e log.error "unexpected error", :error => e.to_s log.error_backtrace e.backtrace end |
#shutdown ⇒ Object
50 51 52 53 |
# File 'lib/fluent/plugin/in_http_puma.rb', line 50 def shutdown @server.stop(true) @thread.join end |
#start ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/fluent/plugin/in_http_puma.rb', line 33 def start super # Refer puma's Runner and Rack handler for puma server setup @server = ::Puma::Server.new(method(:on_request)) @server.min_threads = @min_threads @server.max_threads = @max_threads @server.leak_stack_on_error = false if @use_ssl setup_https else setup_http end @thread = Thread.new(&method(:run)) end |