Class: Fluent::Plugin::HTTPOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_http.rb

Defined Under Namespace

Classes: RecoverableResponse

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"
DEFAULT_FORMATTER =
"json"

Instance Method Summary collapse

Constructor Details

#initializeHTTPOutput

Returns a new instance of HTTPOutput.



19
20
21
# File 'lib/fluent/plugin/out_http.rb', line 19

def initialize
  super
end

Instance Method Details

#buffer_compressed?Boolean

Returns:

  • (Boolean)


149
150
151
# File 'lib/fluent/plugin/out_http.rb', line 149

def buffer_compressed?
  @buffered && @buffer.compress == :gzip
end

#bulk_request_format(tag, time, record) ⇒ Object



290
291
292
# File 'lib/fluent/plugin/out_http.rb', line 290

def bulk_request_format(tag, time, record)
  @formatter.format(tag, time, record)
end

#compress_body(req, data) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/fluent/plugin/out_http.rb', line 153

def compress_body(req, data)
  return unless @compress_request

  req['Content-Encoding'] = "gzip"
  if buffer_compressed?
    req.body = data
    return
  end

  gz = Zlib::GzipWriter.new(StringIO.new)
  gz << data
  req.body = gz.close.string
end

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fluent/plugin/out_http.rb', line 80

def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter)
  super

  @ssl_verify_mode = if @ssl_no_verify
                       OpenSSL::SSL::VERIFY_NONE
                     else
                       OpenSSL::SSL::VERIFY_PEER
                     end

  @ca_file = @cacert_file
  @last_request_time = nil
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered

  if @formatter_config = conf.elements('format').first
    @formatter = formatter_create
  end

  if @bulk_request
    class << self
      alias_method :format, :bulk_request_format
    end
    @formatter = formatter_create(type: :json)
    @serializer = :x_ndjson # secret settings for bulk_request
  else
    class << self
      alias_method :format, :split_request_format
    end
  end
end

#create_request(tag, time, record) ⇒ Object



191
192
193
194
195
196
197
198
# File 'lib/fluent/plugin/out_http.rb', line 191

def create_request(tag, time, record)
  url = format_url(tag, time, record)
  uri = URI.parse(url)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.request_uri)
  set_body(req, tag, time, record)
  set_header(req, tag, time, record)
  return req, uri
end

#format(tag, time, record) ⇒ Object



282
283
284
# File 'lib/fluent/plugin/out_http.rb', line 282

def format(tag, time, record)
  # For safety.
end

#format_url(tag, time, record) ⇒ Object



119
120
121
# File 'lib/fluent/plugin/out_http.rb', line 119

def format_url(tag, time, record)
  @endpoint_url
end

#formatted_to_msgpack_binary?Boolean

Returns:

  • (Boolean)


294
295
296
297
298
299
300
# File 'lib/fluent/plugin/out_http.rb', line 294

def formatted_to_msgpack_binary?
  if @bulk_request
    false
  else
    true
  end
end

#handle_record(tag, time, record) ⇒ Object

end send_request



264
265
266
267
268
269
270
# File 'lib/fluent/plugin/out_http.rb', line 264

def handle_record(tag, time, record)
  if @formatter_config
    record = @formatter.format(tag, time, record)
  end
  req, uri = create_request(tag, time, record)
  send_request(req, uri)
end

#handle_records(tag, time, chunk) ⇒ Object



272
273
274
275
276
# File 'lib/fluent/plugin/out_http.rb', line 272

def handle_records(tag, time, chunk)
  record = buffer_compressed? ? chunk.read(compressed: @buffer.compress) : chunk.read
  req, uri = create_request(tag, time, record)
  send_request(req, uri)
end

#http_opts(uri) ⇒ Object



200
201
202
203
204
205
206
207
208
209
# File 'lib/fluent/plugin/out_http.rb', line 200

def http_opts(uri)
    opts = {
      :use_ssl => uri.scheme == 'https'
    }
    opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl]
    opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file)
    opts[:cert] = OpenSSL::X509::Certificate.new(File.read(@client_cert_path)) if File.file?(@client_cert_path)
    opts[:key] = OpenSSL::PKey.read(File.read(@private_key_path), @private_key_passphrase) if File.file?(@private_key_path)
    opts
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


302
303
304
# File 'lib/fluent/plugin/out_http.rb', line 302

def multi_workers_ready?
  true
end

#prefer_buffered_processingObject



278
279
280
# File 'lib/fluent/plugin/out_http.rb', line 278

def prefer_buffered_processing
  @buffered
end

#process(tag, es) ⇒ Object



306
307
308
309
310
# File 'lib/fluent/plugin/out_http.rb', line 306

def process(tag, es)
  es.each do |time, record|
    handle_record(tag, time, record)
  end
end

#proxiesObject



211
212
213
# File 'lib/fluent/plugin/out_http.rb', line 211

def proxies
  ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy']
end

#send_request(req, uri) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/fluent/plugin/out_http.rb', line 215

def send_request(req, uri)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    log.info('Dropped request due to rate limiting')
    return
  end

  res = nil

  begin
    if @authentication == :basic
      req.basic_auth(@username, @password)
    elsif @authentication == :bearer
      req['authorization'] = "bearer #{@token}"
    elsif @authentication == :jwt
      req['authorization'] = "jwt #{@token}"
    end
    @last_request_time = Time.now.to_f

    if proxy = proxies
      proxy_uri = URI.parse(proxy)

      res = Net::HTTP.start(uri.host, uri.port,
                            proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password,
                            **http_opts(uri)) {|http| http.request(req) }
    else
      res = Net::HTTP.start(uri.host, uri.port, **http_opts(uri)) {|http| http.request(req) }
    end

  rescue => e # rescue all StandardErrors
    # server didn't respond
    log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'"
    raise e if @raise_on_error
  else
     unless res and res.is_a?(Net::HTTPSuccess)
        res_summary = if res
                         "#{res.code} #{res.message} #{res.body}"
                      else
                         "res=nil"
                      end
        if @recoverable_status_codes.include?(res.code.to_i)
          raise RecoverableResponse, res_summary
        else
          log.warn "failed to #{req.method} #{uri} (#{res_summary})"
        end
     end #end unless
  end # end begin
end

#set_body(req, tag, time, record) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin/out_http.rb', line 123

def set_body(req, tag, time, record)
  if @serializer == :json
    set_json_body(req, record)
  elsif @serializer == :text
    set_text_body(req, record)
  elsif @serializer == :raw
    set_raw_body(req, record)
  elsif @serializer == :x_ndjson
    set_bulk_body(req, record)
  else
    req.set_form_data(record)
  end
  req
end

#set_bulk_body(req, data) ⇒ Object



185
186
187
188
189
# File 'lib/fluent/plugin/out_http.rb', line 185

def set_bulk_body(req, data)
  req.body = data.to_s
  req['Content-Type'] = 'application/x-ndjson'
  compress_body(req, req.body)
end

#set_header(req, tag, time, record) ⇒ Object



138
139
140
141
142
143
144
145
146
147
# File 'lib/fluent/plugin/out_http.rb', line 138

def set_header(req, tag, time, record)
  if @custom_headers
    @custom_headers.each do |k,v|
      req[k] = v
    end
    req
  else
    req
  end
end

#set_json_body(req, data) ⇒ Object



167
168
169
170
171
# File 'lib/fluent/plugin/out_http.rb', line 167

def set_json_body(req, data)
  req.body = Yajl.dump(data)
  req['Content-Type'] = 'application/json'
  compress_body(req, req.body)
end

#set_raw_body(req, data) ⇒ Object



179
180
181
182
183
# File 'lib/fluent/plugin/out_http.rb', line 179

def set_raw_body(req, data)
  req.body = data.to_s
  req['Content-Type'] = 'application/octet-stream'
  compress_body(req, req.body)
end

#set_text_body(req, data) ⇒ Object



173
174
175
176
177
# File 'lib/fluent/plugin/out_http.rb', line 173

def set_text_body(req, data)
  req.body = data["message"]
  req['Content-Type'] = 'text/plain'
  compress_body(req, req.body)
end

#shutdownObject



115
116
117
# File 'lib/fluent/plugin/out_http.rb', line 115

def shutdown
  super
end

#split_request_format(tag, time, record) ⇒ Object



286
287
288
# File 'lib/fluent/plugin/out_http.rb', line 286

def split_request_format(tag, time, record)
  [time, record].to_msgpack
end

#startObject



111
112
113
# File 'lib/fluent/plugin/out_http.rb', line 111

def start
  super
end

#write(chunk) ⇒ Object



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/fluent/plugin/out_http.rb', line 312

def write(chunk)
  tag = chunk..tag
  @endpoint_url = extract_placeholders(@endpoint_url, chunk)

  log.debug { "#{@http_method.capitalize} data to #{@endpoint_url} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" }

  if @bulk_request
    time = Fluent::Engine.now
    handle_records(tag, time, chunk)
  else
    chunk.msgpack_each do |time, record|
      handle_record(tag, time, record)
    end
  end
end