Class: Fluent::Plugin::LokiOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_loki.rb

Overview

Subclass of Fluent Plugin Output

Defined Under Namespace

Classes: LogPostError

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
'memory'

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#record_accessorsObject

Returns the value of attribute record_accessors.



34
35
36
# File 'lib/fluent/plugin/out_loki.rb', line 34

def record_accessors
  @record_accessors
end

Instance Method Details

#client_cert_configured?Boolean

Returns:

  • (Boolean)


128
129
130
# File 'lib/fluent/plugin/out_loki.rb', line 128

def client_cert_configured?
  !@key.nil? && !@cert.nil?
end

#configure(conf) ⇒ Object

rubocop:disable Metrics/CyclomaticComplexity



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/fluent/plugin/out_loki.rb', line 81

def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity
  compat_parameters_convert(conf, :buffer)
  super
  @uri = URI.parse(@url + '/loki/api/v1/push')
  unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS)
    raise Fluent::ConfigError, 'URL parameter must have HTTP/HTTPS scheme'
  end

  @record_accessors = {}
  conf.elements.select { |element| element.name == 'label' }.each do |element|
    element.each_pair do |k, v|
      element.has_key?(k) # rubocop:disable Style/PreferredHashMethods #to suppress unread configuration warning
      v = k if v.empty?
      @record_accessors[k] = record_accessor_create(v)
    end
  end
  @remove_keys_accessors = []
  @remove_keys.each do |key|
    @remove_keys_accessors.push(record_accessor_create(key))
  end

  # If configured, load and validate client certificate (and corresponding key)
  if client_cert_configured?
    load_client_cert
    validate_client_cert_key
  end

  raise "bearer_token_file #{@bearer_token_file} not found" if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file)

  @auth_token_bearer = nil
  if !@bearer_token_file.nil?
    if !File.exist?(@bearer_token_file)
      raise "bearer_token_file #{@bearer_token_file} not found"
    end

    # Read the file once, assume long-lived authentication token.
    @auth_token_bearer = File.read(@bearer_token_file)
    if @auth_token_bearer.empty?
      raise "bearer_token_file #{@bearer_token_file} is empty"
    end
    log.info "will use Bearer token from bearer_token_file #{@bearer_token_file} in Authorization header"
  end


  raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert)
end

#generic_to_loki(chunk) ⇒ Object



201
202
203
204
205
206
# File 'lib/fluent/plugin/out_loki.rb', line 201

def generic_to_loki(chunk)
  # log.debug("GenericToLoki: converting #{chunk}")
  streams = chunk_to_loki(chunk)
  payload = payload_builder(streams)
  payload
end

#http_request_opts(uri) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/fluent/plugin/out_loki.rb', line 171

def http_request_opts(uri)
  opts = {
    use_ssl: uri.scheme == 'https'
  }

  # Optionally disable server server certificate verification.
  if @insecure_tls
    opts = opts.merge(
      verify_mode: OpenSSL::SSL::VERIFY_NONE
    )
  end

  # Optionally present client certificate
  if !@cert.nil? && !@key.nil?
    opts = opts.merge(
      cert: @cert,
      key: @key
    )
  end

  # For server certificate verification: set custom CA bundle.
  # Only takes effect when `insecure_tls` is not set.
  unless @ca_cert.nil?
    opts = opts.merge(
      ca_file: @ca_cert
    )
  end
  opts
end

#load_client_certObject



132
133
134
135
# File 'lib/fluent/plugin/out_loki.rb', line 132

def load_client_cert
  @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
  @key = OpenSSL::PKey.read(File.read(@key)) if @key
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


143
144
145
# File 'lib/fluent/plugin/out_loki.rb', line 143

def multi_workers_ready?
  true
end

#validate_client_cert_keyObject



137
138
139
140
141
# File 'lib/fluent/plugin/out_loki.rb', line 137

def validate_client_cert_key
  if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
    raise "Unsupported private key type #{key.class}"
  end
end

#write(chunk) ⇒ Object

flush a chunk to loki

Raises:



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/fluent/plugin/out_loki.rb', line 148

def write(chunk)
  # streams by label
  payload = generic_to_loki(chunk)
  body = { 'streams' => payload }

  tenant = extract_placeholders(@tenant, chunk) if @tenant

  # add ingest path to loki url
  res = loki_http_request(body, tenant)

  if res.is_a?(Net::HTTPSuccess)
    log.debug "POST request was responded to with status code #{res.code}"
    return
  end

  res_summary = "#{res.code} #{res.message} #{res.body}"
  log.warn "failed to write post to #{@uri} (#{res_summary})"
  log.debug Yajl.dump(body)

  # Only retry 429 and 500s
  raise(LogPostError, res_summary) if res.is_a?(Net::HTTPTooManyRequests) || res.is_a?(Net::HTTPServerError)
end