Class: Fluent::Plugin::LokiOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::LokiOutput
- Defined in:
- lib/fluent/plugin/out_loki.rb
Overview
Subclass of Fluent Plugin Output
Defined Under Namespace
Classes: LogPostError
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
'memory'
Instance Attribute Summary collapse
-
#record_accessors ⇒ Object
Returns the value of attribute record_accessors.
Instance Method Summary collapse
- #client_cert_configured? ⇒ Boolean
-
#configure(conf) ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity.
- #generic_to_loki(chunk) ⇒ Object
- #http_request_opts(uri) ⇒ Object
- #load_client_cert ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #validate_client_cert_key ⇒ Object
-
#write(chunk) ⇒ Object
flush a chunk to loki.
Instance Attribute Details
#record_accessors ⇒ Object
Returns the value of attribute record_accessors.
34 35 36 |
# File 'lib/fluent/plugin/out_loki.rb', line 34 def record_accessors @record_accessors end |
Instance Method Details
#client_cert_configured? ⇒ Boolean
128 129 130 |
# File 'lib/fluent/plugin/out_loki.rb', line 128 def client_cert_configured? !@key.nil? && !@cert.nil? end |
#configure(conf) ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/fluent/plugin/out_loki.rb', line 81 def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity compat_parameters_convert(conf, :buffer) super @uri = URI.parse(@url + '/loki/api/v1/push') unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS) raise Fluent::ConfigError, 'URL parameter must have HTTP/HTTPS scheme' end @record_accessors = {} conf.elements.select { |element| element.name == 'label' }.each do |element| element.each_pair do |k, v| element.has_key?(k) # rubocop:disable Style/PreferredHashMethods #to suppress unread configuration warning v = k if v.empty? @record_accessors[k] = record_accessor_create(v) end end @remove_keys_accessors = [] @remove_keys.each do |key| @remove_keys_accessors.push(record_accessor_create(key)) end # If configured, load and validate client certificate (and corresponding key) if client_cert_configured? load_client_cert validate_client_cert_key end raise "bearer_token_file #{@bearer_token_file} not found" if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file) @auth_token_bearer = nil if !@bearer_token_file.nil? if !File.exist?(@bearer_token_file) raise "bearer_token_file #{@bearer_token_file} not found" end # Read the file once, assume long-lived authentication token. @auth_token_bearer = File.read(@bearer_token_file) if @auth_token_bearer.empty? raise "bearer_token_file #{@bearer_token_file} is empty" end log.info "will use Bearer token from bearer_token_file #{@bearer_token_file} in Authorization header" end raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert) end |
#generic_to_loki(chunk) ⇒ Object
201 202 203 204 205 206 |
# File 'lib/fluent/plugin/out_loki.rb', line 201 def generic_to_loki(chunk) # log.debug("GenericToLoki: converting #{chunk}") streams = chunk_to_loki(chunk) payload = payload_builder(streams) payload end |
#http_request_opts(uri) ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/fluent/plugin/out_loki.rb', line 171 def http_request_opts(uri) opts = { use_ssl: uri.scheme == 'https' } # Optionally disable server server certificate verification. if @insecure_tls opts = opts.merge( verify_mode: OpenSSL::SSL::VERIFY_NONE ) end # Optionally present client certificate if !@cert.nil? && !@key.nil? opts = opts.merge( cert: @cert, key: @key ) end # For server certificate verification: set custom CA bundle. # Only takes effect when `insecure_tls` is not set. unless @ca_cert.nil? opts = opts.merge( ca_file: @ca_cert ) end opts end |
#load_client_cert ⇒ Object
132 133 134 135 |
# File 'lib/fluent/plugin/out_loki.rb', line 132 def load_client_cert @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert @key = OpenSSL::PKey.read(File.read(@key)) if @key end |
#multi_workers_ready? ⇒ Boolean
143 144 145 |
# File 'lib/fluent/plugin/out_loki.rb', line 143 def multi_workers_ready? true end |
#validate_client_cert_key ⇒ Object
137 138 139 140 141 |
# File 'lib/fluent/plugin/out_loki.rb', line 137 def validate_client_cert_key if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA) raise "Unsupported private key type #{key.class}" end end |
#write(chunk) ⇒ Object
flush a chunk to loki
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/fluent/plugin/out_loki.rb', line 148 def write(chunk) # streams by label payload = generic_to_loki(chunk) body = { 'streams' => payload } tenant = extract_placeholders(@tenant, chunk) if @tenant # add ingest path to loki url res = loki_http_request(body, tenant) if res.is_a?(Net::HTTPSuccess) log.debug "POST request was responded to with status code #{res.code}" return end res_summary = "#{res.code} #{res.} #{res.body}" log.warn "failed to write post to #{@uri} (#{res_summary})" log.debug Yajl.dump(body) # Only retry 429 and 500s raise(LogPostError, res_summary) if res.is_a?(Net::HTTPTooManyRequests) || res.is_a?(Net::HTTPServerError) end |