Class: Fluent::HttpPumaInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_http_puma.rb

Constant Summary collapse

OK_RESPONSE =
[200, {'Content-type'=>'text/plain'}, ["OK"]]

Instance Method Summary collapse

Constructor Details

#initializeHttpPumaInput

Returns a new instance of HttpPumaInput.



5
6
7
8
9
# File 'lib/fluent/plugin/in_http_puma.rb', line 5

def initialize
  require 'puma'
  require 'uri'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/in_http_puma.rb', line 20

def configure(conf)
  super

  if @use_ssl && !@ssl_keys
    raise ConfigError, 'ssl_keys parameter is required when use_ssl is true'
  end

  if @format != 'default'
    @parser = TextParser.new
    @parser.configure(conf)
  end
end

#on_request(env) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/in_http_puma.rb', line 64

def on_request(env)
  uri = URI.parse(env['REQUEST_URI'.freeze])
  params = Rack::Utils.parse_query(uri.query)
  path_info = uri.path

  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')
    record_time, record = parse_record(env, params)

    # Skip nil record
    if record.nil?
      return OK_RESPONSE
    end

    time = if param_time = params['time'.freeze]
             param_time = param_time.to_i
             param_time.zero? ? Engine.now : param_time
           else
             record_time.nil? ? Engine.now : record_time
           end
  rescue => e
    return [400, {'Content-type'=>'text/plain'}, ["Bad Request\n#{e}\n"]]
  end

  begin
    # Support batched requests
    if record.is_a?(Array)           
      mes = MultiEventStream.new
      record.each do |single_record|
        single_time = single_record.delete("time".freeze) || time 
        mes.add(single_time, single_record)
      end
      Engine.emit_stream(tag, mes)
	else
      Engine.emit(tag, time, record)
    end
  rescue => e
    return [500, {'Content-type'=>'text/plain'}, ["Internal Server Error\n#{e}\n"]]
  end

  return OK_RESPONSE
end

#runObject



55
56
57
58
59
60
# File 'lib/fluent/plugin/in_http_puma.rb', line 55

def run
  @server.run(false)
rescue => e
  log.error "unexpected error", :error => e.to_s
  log.error_backtrace e.backtrace
end

#shutdownObject



50
51
52
53
# File 'lib/fluent/plugin/in_http_puma.rb', line 50

def shutdown
  @server.stop(true)
  @thread.join
end

#startObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/fluent/plugin/in_http_puma.rb', line 33

def start
  super

  # Refer puma's Runner and Rack handler for puma server setup
  @server = ::Puma::Server.new(method(:on_request))
  @server.min_threads = @min_threads
  @server.max_threads = @max_threads
  @server.leak_stack_on_error = false
  if @use_ssl
    setup_https
  else
    setup_http
  end

  @thread = Thread.new(&method(:run))
end