Class: Fluent::Plugin::HTTPOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_http.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"
DEFAULT_FORMATTER =
"json"

Instance Method Summary collapse

Constructor Details

#initializeHTTPOutput

Returns a new instance of HTTPOutput.



15
16
17
# File 'lib/fluent/plugin/out_http.rb', line 15

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/out_http.rb', line 61

def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter)
  super

  @ssl_verify_mode = if @ssl_no_verify
                       OpenSSL::SSL::VERIFY_NONE
                     else
                       OpenSSL::SSL::VERIFY_PEER
                     end

  @ca_file = @cacert_file
  @last_request_time = nil
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered

  http_methods = [:get, :put, :post, :delete]
  @http_method = if http_methods.include? @http_method.intern
                   @http_method.intern
                 else
                   :post
                 end
  puts @endpoint_url
  @uri = URI.parse(@endpoint_url)
  # ssl_verify = @ssl_no_verify
  url = @uri.scheme  + "://" + @uri.host + ":" + @uri.port.to_s
  @adapter = Faraday.new(url: url, ssl: {verify:false} ) do |f|
    f.request :retry, max:                 5,
      interval:            1,
      interval_randomness: 0.5,
      backoff_factor:      2,
      methods:             @http_method,
      exceptions:          %w(Errno::ETIMEDOUT
                              Faraday::TimeoutError
                              Faraday::Error::TimeoutError
                              Net::ReadTimeout).freeze

    f.adapter :net_http_persistent
  end

  if @formatter_config = conf.elements('format').first
    @formatter = formatter_create
  end
end

#create_request(tag, time, record) ⇒ Object



155
156
157
158
159
160
161
162
# File 'lib/fluent/plugin/out_http.rb', line 155

def create_request(tag, time, record)
  url = format_url(tag, time, record)
  uri = URI.parse(url)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.request_uri)
  set_body(req, tag, time, record)
  set_header(req, tag, time, record)
  return req, uri
end

#format(tag, time, record) ⇒ Object



204
205
206
# File 'lib/fluent/plugin/out_http.rb', line 204

def format(tag, time, record)
  [time, record].to_msgpack
end

#format_url(tag, time, record) ⇒ Object



112
113
114
# File 'lib/fluent/plugin/out_http.rb', line 112

def format_url(tag, time, record)
  @endpoint_url
end

#formatted_to_msgpack_binary?Boolean

Returns:

  • (Boolean)


208
209
210
# File 'lib/fluent/plugin/out_http.rb', line 208

def formatted_to_msgpack_binary?
  true
end

#handle_record(tag, time, record) ⇒ Object

end send_request



192
193
194
195
196
197
198
# File 'lib/fluent/plugin/out_http.rb', line 192

def handle_record(tag, time, record)
  if @formatter_config
    record = @formatter.format(tag, time, record)
  end
  # req, uri = create_request(tag, time, record)
  send_request(record)
end

#http_opts(uri) ⇒ Object



164
165
166
167
168
169
170
171
# File 'lib/fluent/plugin/out_http.rb', line 164

def http_opts(uri)
    opts = {
      :use_ssl => uri.scheme == 'https'
    }
    opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl]
    opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file)
    opts
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


212
213
214
# File 'lib/fluent/plugin/out_http.rb', line 212

def multi_workers_ready?
  true
end

#prefer_buffered_processingObject



200
201
202
# File 'lib/fluent/plugin/out_http.rb', line 200

def prefer_buffered_processing
  @buffered
end

#process(tag, es) ⇒ Object



216
217
218
219
220
# File 'lib/fluent/plugin/out_http.rb', line 216

def process(tag, es)
  es.each do |time, record|
    handle_record(tag, time, record)
  end
end

#proxiesObject



173
174
175
# File 'lib/fluent/plugin/out_http.rb', line 173

def proxies
  ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy']
end

#send_request(record) ⇒ Object



177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/fluent/plugin/out_http.rb', line 177

def send_request(record)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    log.info('Dropped request due to rate limiting')
    return
  end

  _ = @adapter.post(@uri.path) do |request|
    request.headers['Content-Type'] = 'application/json'
    request.body = Yajl.dump(record)
    request.options.timeout = 60
    request.options.open_timeout = 60
  end
end

#set_body(req, tag, time, record) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/out_http.rb', line 116

def set_body(req, tag, time, record)
  if @serializer == :json
    set_json_body(req, record)
  elsif @serializer == :text
    set_text_body(req, record)
  elsif @serializer == :raw
    set_raw_body(req, record)
  else
    req.set_form_data(record)
  end
  req
end

#set_header(req, tag, time, record) ⇒ Object



129
130
131
132
133
134
135
136
137
138
# File 'lib/fluent/plugin/out_http.rb', line 129

def set_header(req, tag, time, record)
  if @custom_headers
    @custom_headers.each do |k,v|
      req[k] = v
    end
    req
  else
    req
  end
end

#set_json_body(req, data) ⇒ Object



140
141
142
143
# File 'lib/fluent/plugin/out_http.rb', line 140

def set_json_body(req, data)
  req.body = Yajl.dump(data)
  req['Content-Type'] = 'application/json'
end

#set_raw_body(req, data) ⇒ Object



150
151
152
153
# File 'lib/fluent/plugin/out_http.rb', line 150

def set_raw_body(req, data)
  req.body = data.to_s
  req['Content-Type'] = 'application/octet-stream'
end

#set_text_body(req, data) ⇒ Object



145
146
147
148
# File 'lib/fluent/plugin/out_http.rb', line 145

def set_text_body(req, data)
  req.body = data["message"]
  req['Content-Type'] = 'text/plain'
end

#shutdownObject



108
109
110
# File 'lib/fluent/plugin/out_http.rb', line 108

def shutdown
  super
end

#startObject



104
105
106
# File 'lib/fluent/plugin/out_http.rb', line 104

def start
  super
end

#write(chunk) ⇒ Object



222
223
224
225
226
227
228
# File 'lib/fluent/plugin/out_http.rb', line 222

def write(chunk)
  tag = chunk..tag
  @endpoint_url = extract_placeholders(@endpoint_url, chunk.)
  chunk.msgpack_each do |time, record|
    handle_record(tag, time, record)
  end
end