Class: Fluent::Plugin::InfluxdbOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_influxdb.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"
FORMATTED_RESULT_FOR_INVALID_RECORD =
''.freeze

Instance Method Summary collapse

Constructor Details

#initializeInfluxdbOutput

Returns a new instance of InfluxdbOutput.



62
63
64
65
66
# File 'lib/fluent/plugin/out_influxdb.rb', line 62

def initialize
  super
  @seq = 0
  @prev_timestamp = nil
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


68
69
70
71
72
73
# File 'lib/fluent/plugin/out_influxdb.rb', line 68

def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  @time_precise = time_precise_lambda()
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
end

#format(tag, time, record) ⇒ Object



103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/out_influxdb.rb', line 103

def format(tag, time, record)
  # TODO: Use tag based chunk separation for more reliability
  if record.empty? || record.has_value?(nil)
    FORMATTED_RESULT_FOR_INVALID_RECORD
  else
    [precision_time(time), record].to_msgpack
  end
end

#formatted_to_msgpack_binaryObject



117
118
119
# File 'lib/fluent/plugin/out_influxdb.rb', line 117

def formatted_to_msgpack_binary
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


121
122
123
# File 'lib/fluent/plugin/out_influxdb.rb', line 121

def multi_workers_ready?
  true
end

#precision_time(time) ⇒ Object



215
216
217
218
219
# File 'lib/fluent/plugin/out_influxdb.rb', line 215

def precision_time(time)
  # nsec is supported from v0.14
  nstime = time * (10 ** 9) + (time.is_a?(Integer) ? 0 : time.nsec)
  @time_precise.call(nstime)
end

#shutdownObject



112
113
114
115
# File 'lib/fluent/plugin/out_influxdb.rb', line 112

def shutdown
  super
  @influxdb.stop!
end

#startObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_influxdb.rb', line 75

def start
  super

  log.info "Connecting to database: #{@dbname}, host: #{@host}, port: #{@port}, username: #{@user}, use_ssl = #{@use_ssl}, verify_ssl = #{@verify_ssl}"

  # ||= for testing.
  @influxdb ||= InfluxDB::Client.new @dbname, hosts: @host.split(','),
                                              port: @port,
                                              username: @user,
                                              password: @password,
                                              async: false,
                                              retry: @retry,
                                              time_precision: @time_precision,
                                              use_ssl: @use_ssl,
                                              verify_ssl: @verify_ssl

  begin
    existing_databases = @influxdb.list_databases.map { |x| x['name'] }
    unless existing_databases.include? @dbname
      raise Fluent::ConfigError, 'Database ' + @dbname + ' doesn\'t exist. Create it first, please. Existing databases: ' + existing_databases.join(',')
    end
  rescue InfluxDB::AuthenticationError, InfluxDB::Error
    log.info "skip database presence check because '#{@user}' user doesn't have admin privilege. Check '#{@dbname}' exists on influxdb"
  end
end

#time_precise_lambdaObject



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/fluent/plugin/out_influxdb.rb', line 195

def time_precise_lambda()
  case @time_precision.to_sym
  when :h then
    lambda{|nstime| nstime / (10 ** 9) / (60 ** 2) }
  when :m then
    lambda{|nstime| nstime / (10 ** 9) / 60 }
  when :s then
    lambda{|nstime| nstime / (10 ** 9) }
  when :ms then
    lambda{|nstime| nstime / (10 ** 6) }
  when :u then
    lambda{|nstime| nstime / (10 ** 3) }
  when :ns then
    lambda{|nstime| nstime }
  else
    raise Fluent::ConfigError, 'time_precision ' + @time_precision + ' is invalid.' +
      'should specify either either hour (h), minutes (m), second (s), millisecond (ms), microsecond (u), or nanosecond (ns)'
  end
end

#write(chunk) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/fluent/plugin/out_influxdb.rb', line 125

def write(chunk)
  points = []
  tag = chunk..tag
  chunk.msgpack_each do |time, record|
    timestamp = record.delete(@time_key) || time
    if tag_keys.empty? && !@auto_tags
      values = record
      tags = {}
    else
      values = {}
      tags = {}
      record.each_pair do |k, v|
        if (@auto_tags && v.is_a?(String)) || @tag_keys.include?(k)
          # If the tag value is not nil, empty, or a space, add the tag
          if v.to_s.strip != ''
            tags[k] = v
          end
        else
          values[k] = v
        end
      end
    end
    if @sequence_tag
      if @prev_timestamp == timestamp
        @seq += 1
      else
        @seq = 0
      end
      tags[@sequence_tag] = @seq
      @prev_timestamp = timestamp
    end

    if values.empty?
        log.warn "Skip record '#{record}', because InfluxDB requires at least one value in raw"
        next
    end

    point = {
      timestamp: timestamp,
      series: @measurement || tag,
      values: values,
      tags: tags,
    }
    retention_policy = @default_retention_policy
    unless @retention_policy_key.nil?
      retention_policy = record.delete(@retention_policy_key) || @default_retention_policy
      unless points.nil?
        if retention_policy != @default_retention_policy
          # flush the retention policy first
          @influxdb.write_points(points, nil, @default_retention_policy)
          points = nil
        end
      end
    end
    if points.nil?
      @influxdb.write_points([point], nil, retention_policy)
    else
      points << point
    end
  end

  unless points.nil?
    if @default_retention_policy.nil?
      @influxdb.write_points(points)
    else
      @influxdb.write_points(points, nil, @default_retention_policy)
    end
  end
end