Class: Fluent::HttpListInput
- Inherits:
-
Input
- Object
- Input
- Fluent::HttpListInput
show all
- Includes:
- DetachMultiProcessMixin
- Defined in:
- lib/fluent/plugin/in_http_list.rb
Defined Under Namespace
Classes: Handler, KeepaliveManager
Instance Method Summary
collapse
Constructor Details
Returns a new instance of HttpListInput.
10
11
12
13
|
# File 'lib/fluent/plugin/in_http_list.rb', line 10
def initialize
require 'webrick/httputils'
super
end
|
Instance Method Details
20
21
22
|
# File 'lib/fluent/plugin/in_http_list.rb', line 20
def configure(conf)
super
end
|
#on_request(path_info, params) ⇒ Object
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
# File 'lib/fluent/plugin/in_http_list.rb', line 86
def on_request(path_info, params)
begin
path = path_info[1..-1] tag = path.split('/').join('.')
if js = params['json']
records = JSON.parse(js)
p records
else
raise "'json' parameter is required" + params.keys.to_s
end
time = params['time'].nil? ? Engine.now : params['time'].to_i
rescue
return ["400 Bad Request", {'Content-type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
end
begin
records.each{|r|
Engine.emit(tag, time, r)
}
rescue
return ["500 Internal Server Error", {'Content-type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"]
end
return ["200 OK", {'Content-type'=>'text/plain'}, ""]
end
|
#run ⇒ Object
79
80
81
82
83
84
|
# File 'lib/fluent/plugin/in_http_list.rb', line 79
def run
@loop.run
rescue
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end
|
#shutdown ⇒ Object
72
73
74
75
76
77
|
# File 'lib/fluent/plugin/in_http_list.rb', line 72
def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@lsock.close
@thread.join
end
|
#start ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/fluent/plugin/in_http_list.rb', line 55
def start
$log.debug "listening for http on #{@bind}:#{@port}"
lsock = TCPServer.new(@bind, @port)
detach_multi_process do
super
@km = KeepaliveManager.new(@keepalive_timeout)
@lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), @body_size_limit)
@loop = Coolio::Loop.new
@loop.attach(@km)
@loop.attach(@lsock)
@thread = Thread.new(&method(:run))
end
end
|