Class: Fluent::HttpListInput

Inherits:
Input
  • Object
show all
Includes:
DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/in_http_list.rb

Defined Under Namespace

Classes: Handler, KeepaliveManager

Instance Method Summary collapse

Constructor Details

#initializeHttpListInput

Returns a new instance of HttpListInput.



10
11
12
13
# File 'lib/fluent/plugin/in_http_list.rb', line 10

def initialize
  require 'webrick/httputils'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



20
21
22
# File 'lib/fluent/plugin/in_http_list.rb', line 20

def configure(conf)
  super
end

#on_request(path_info, params) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/in_http_list.rb', line 86

def on_request(path_info, params)
  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')

    if js = params['json']
      records = JSON.parse(js)
      p records
    else
      raise "'json' parameter is required" + params.keys.to_s
    end

  time = params['time'].nil? ? Engine.now : params['time'].to_i

  rescue
    return ["400 Bad Request", {'Content-type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
  end


  begin
    records.each{|r| 
        Engine.emit(tag, time, r)
    }
  rescue
    return ["500 Internal Server Error", {'Content-type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"]
  end

  return ["200 OK", {'Content-type'=>'text/plain'}, ""]
end

#runObject



79
80
81
82
83
84
# File 'lib/fluent/plugin/in_http_list.rb', line 79

def run
  @loop.run
rescue
  $log.error "unexpected error", :error=>$!.to_s
  $log.error_backtrace
end

#shutdownObject



72
73
74
75
76
77
# File 'lib/fluent/plugin/in_http_list.rb', line 72

def shutdown
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @lsock.close
  @thread.join
end

#startObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/in_http_list.rb', line 55

def start
  $log.debug "listening for http on #{@bind}:#{@port}"
  lsock = TCPServer.new(@bind, @port)

  detach_multi_process do
    super
    @km = KeepaliveManager.new(@keepalive_timeout)
    @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), @body_size_limit)

    @loop = Coolio::Loop.new
    @loop.attach(@km)
    @loop.attach(@lsock)

    @thread = Thread.new(&method(:run))
  end
end