Class: Fluent::HTTPSOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_https_client.rb

Instance Method Summary collapse

Constructor Details

#initializeHTTPSOutput

Returns a new instance of HTTPSOutput.



4
5
6
7
8
9
10
# File 'lib/fluent/plugin/out_https_client.rb', line 4

def initialize
  super
  require 'net/https'
  require 'openssl'
  require 'uri'
  require 'yajl'
end

Instance Method Details

#configure(conf) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/out_https_client.rb', line 27

def configure(conf)
  super

  @use_ssl = conf['use_ssl']
  @include_tag = conf['include_tag']
  @include_timestamp = conf['include_timestamp']


  @use_proxy = false
  if @proxy_port and @proxy_addr
    # check for proxy settings
    if @proxy_port > 0 and @proxy_addr.empty?
      raise Fluent::ConfigError, 'HTTPS Output :: provide a valid proxy address'
    elsif  @proxy_port <= 0 and !@proxy_addr.empty?
      raise Fluent::ConfigError, 'HTTPS Output :: provide a valid proxy port number'
    elsif @proxy_port == 0
      raise Fluent::ConfigError, 'HTTPS Output :: provide a valid proxy port number'
    elsif  @proxy_port > 0 and !@proxy_addr.empty?
      @use_proxy = true
    end
  end

  serializers = [:json, :form]
  @serializer = if serializers.include? @serializer.intern
                  @serializer.intern
                else
                  :form
                end

  http_methods = [:get, :put, :post, :delete]
  @http_method = if http_methods.include? @http_method.intern
                  @http_method.intern
                else
                  :post
                end

  @auth = case @auth
          when 'basic' then :basic
          else
            :none
          end

  # create the headers hash
  @headers = {}
  conf.elements.each do |elem|
      elem.keys.each do |key|
        @headers[key] = elem[key]
      end
  end
end

#create_request(tag, time, record) ⇒ Object



116
117
118
119
120
121
122
123
# File 'lib/fluent/plugin/out_https_client.rb', line 116

def create_request(tag, time, record)
  url = format_url()
  uri = URI.parse(url)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.path)
  set_body(req, tag, record)
  set_header(req)
  return req, uri
end

#emit(tag, es, chain) ⇒ Object



168
169
170
171
172
173
# File 'lib/fluent/plugin/out_https_client.rb', line 168

def emit(tag, es, chain)
  es.each do |time, record|
    handle_record(tag, time, record)
  end
  chain.next
end

#format_urlObject



86
87
88
# File 'lib/fluent/plugin/out_https_client.rb', line 86

def format_url
   @endpoint_url
end

#handle_record(tag, time, record) ⇒ Object



163
164
165
166
# File 'lib/fluent/plugin/out_https_client.rb', line 163

def handle_record(tag, time, record)
  req, uri = create_request(tag, time, record)
  send_request(req, uri, record)
end

#send_request(req, uri, record) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/fluent/plugin/out_https_client.rb', line 125

def send_request(req, uri, record)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    $log.info('Dropped request due to rate limiting')
    return
  end
  
  res = nil
  begin
    if @auth and @auth == :basic
      req.basic_auth(@username, @password)
    end
    @last_request_time = Time.now.to_f
    if @use_proxy
      https = Net::HTTP.new(uri.host, uri.port, @proxy_addr, @proxy_port)
    else
      https = Net::HTTP.new(uri.host, uri.port)
    end
    https.use_ssl = @use_ssl
    https.ca_file = OpenSSL::X509::DEFAULT_CERT_FILE 
    https.verify_mode = OpenSSL::SSL::VERIFY_NONE
    res = https.start {|http| http.request(req) }
  rescue IOError, EOFError, SystemCallError
    $log.warn "HTTPS Output :: Net::HTTP.#{req.method.capitalize} raises exception: #{$!.class}, '#{$!.message}'"
  end
  unless res and res.is_a?(Net::HTTPSuccess)
    res_summary = if res
                    "#{res.code} #{res.message} #{res.body}"
                  else
                    "res=nil"
                  end
    $log.warn "HTTPS Output :: failed to #{req.method} #{uri} (#{res_summary})"
    $log.warn "HTTPS Output :: record failed to send : #{record}"
  else 
    $log.info "HTTPS Output :: emitted record : #{record}"
  end
end

#set_body(req, tag, record) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/out_https_client.rb', line 90

def set_body(req, tag, record)
  if @include_tag
    record['tag'] = tag
  end
  if @include_timestamp
    record['timestamp'] = Time.now.to_i
  end 
  if @serializer == :json
    set_json_body(req, record)
  else
    req.set_form_data(record)
  end
   req
end

#set_header(req) ⇒ Object



105
106
107
108
109
# File 'lib/fluent/plugin/out_https_client.rb', line 105

def set_header(req)
  @headers.each do |key, value|
    req[key] = value
  end
end

#set_json_body(req, data) ⇒ Object



111
112
113
114
# File 'lib/fluent/plugin/out_https_client.rb', line 111

def set_json_body(req, data)
  req.body = Yajl.dump(data)
  req['Content-Type'] = 'application/json'
end

#shutdownObject



82
83
84
# File 'lib/fluent/plugin/out_https_client.rb', line 82

def shutdown
  super
end

#startObject



78
79
80
# File 'lib/fluent/plugin/out_https_client.rb', line 78

def start
  super
end