Class: Fluent::OutputFieldMultiFlattenJson

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_field_flatten_json.rb

Instance Method Summary collapse

Instance Method Details

#emit(tag, es, chain) ⇒ Object



11
12
13
14
15
16
17
# File 'lib/fluent/plugin/out_field_flatten_json.rb', line 11

def emit(tag, es, chain)
  tag = update_tag(tag)
  es.each { |time, record|
    Engine.emit(tag, time, flatten_field(record))
  }
  chain.next
end

#flatten(json, prefix) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/out_field_flatten_json.rb', line 33

def flatten(json, prefix)
  json.keys.each do |key|
    if prefix.empty?
      full_path = key
    else
      full_path = [prefix, key].join('.')
    end
  
    if json[key].is_a?(Hash)
      value = json[key]
      json.delete key
      json.merge! flatten(value, full_path)
    else
      value = json[key]
      json.delete key
      json[full_path] = value
    end
  end
  return json
end

#flatten_field(record) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/out_field_flatten_json.rb', line 54

def flatten_field(record)
  if !(flatten_key.nil?) and !(record.nil?) and record.key?(flatten_key)
    source = record[flatten_key].to_s
    json_field = JSON.parse(source)
    res_hash = flatten(json_field,flatten_key)
    res_hash.each do |key,val|
      record[key] = val
    end
  end
  return record
end

#update_tag(tag) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/out_field_flatten_json.rb', line 19

def update_tag(tag)
  if remove_tag_prefix
    if remove_tag_prefix == tag
      tag = ''
    elsif tag.to_s.start_with?(remove_tag_prefix+'.')
      tag = tag[remove_tag_prefix.length+1 .. -1]
    end
  end
  if add_tag_prefix
    tag = tag && tag.length > 0 ? "#{add_tag_prefix}.#{tag}" : add_tag_prefix
  end
  return tag
end