Class: Fluent::HistogramOutput

Inherits:
Output
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/out_histogram.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHistogramOutput

fluentd output plugin’s methods



30
31
32
# File 'lib/fluent/plugin/out_histogram.rb', line 30

def initialize
  super
end

Instance Attribute Details

#flush_intervalObject

Returns the value of attribute flush_interval.



23
24
25
# File 'lib/fluent/plugin/out_histogram.rb', line 23

def flush_interval
  @flush_interval
end

#histsObject

Returns the value of attribute hists.



24
25
26
# File 'lib/fluent/plugin/out_histogram.rb', line 24

def hists
  @hists
end

#remove_prefix_stringObject

Returns the value of attribute remove_prefix_string.



26
27
28
# File 'lib/fluent/plugin/out_histogram.rb', line 26

def remove_prefix_string
  @remove_prefix_string
end

#zero_histObject

Returns the value of attribute zero_hist.



25
26
27
# File 'lib/fluent/plugin/out_histogram.rb', line 25

def zero_hist
  @zero_hist
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_histogram.rb', line 34

def configure(conf)
  super

  raise Fluent::ConfigError, 'bin_num must be > 0' if @bin_num <= 0
  raise Fluent::ConfigError, 'sampling_rate must be >= 1' if @sampling_rate < 1
  $log.warn %Q[too small "bin_num(=#{@bin_num})" may raise unexpected outcome] if @bin_num < 100
  @sampling = true if !!conf['sampling_rate']

  @tag_prefix_string = @tag_prefix + '.' if @tag_prefix
  @tag_suffix_string = '.' + @tag_suffix if @tag_suffix
  if @input_tag_remove_prefix
    @remove_prefix_string = @input_tag_remove_prefix + '.'
    @remove_prefix_length = @remove_prefix_string.length
  end

  @zero_hist = [0] * @bin_num

  @hists = initialize_hists
  @sampling_counter = 0
  @tick = @sampling ? @sampling_rate.to_i : 1

  if @alpha > 0
    @revalue = (@alpha+1)**2 if @alpha != 0
  else
    @disable_revalue = true
  end

  @mutex = Mutex.new

end

#emit(tag, es, chain) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/out_histogram.rb', line 115

def emit(tag, es, chain)
  chain.next

  es.each do |time, record|
    keys = record[@count_key]
    [keys].flatten.each do |k| 
      if !@sampling
        increment(tag, k)
      else
        @sampling_counter += 1
        if @sampling_counter >= @sampling_rate 
          increment(tag, k) 
          @sampling_counter = 0
        end
      end
    end
  end
end

#flushObject



178
179
180
181
# File 'lib/fluent/plugin/out_histogram.rb', line 178

def flush
  flushed, @hists = generate_output(@hists), initialize_hists(@hists.keys.dup)
  tagging(flushed)
end

#flush_emit(now) ⇒ Object



183
184
185
186
187
188
# File 'lib/fluent/plugin/out_histogram.rb', line 183

def flush_emit(now)
  flushed = flush
  flushed.each do |tag, data|
    Fluent::Engine.emit(tag, now, data)
  end
end

#generate_output(flushed) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/fluent/plugin/out_histogram.rb', line 160

def generate_output(flushed)
  output = {}
  flushed.each do |tag, hist|
    output[tag] = {}
    sum = hist.inject(:+)
    avg = sum / hist.size
    sd = hist.instance_eval do
      sigmas = map { |n| (avg - n)**2 }
      Math.sqrt(sigmas.inject(:+) / size)
    end
    output[tag][:hist] = hist if @out_include_hist
    output[tag][:sum] = @disable_revalue ? sum : sum / @revalue
    output[tag][:avg] = @disable_revalue ? avg : avg / @revalue
    output[tag][:sd] = sd.to_i
  end
  output
end

#increment(tag, key) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_histogram.rb', line 101

def increment(tag, key)
  @hists[tag] ||= @zero_hist.dup

  # id = key.hash % @bin_num
  id = key[0..9].codepoints.collect{|cp| cp}.join().to_i % @bin_num # attention to long key(length > 10)
  @mutex.synchronize {
    (0..@alpha).each do |alpha|
      (-alpha..alpha).each do |al|
        @hists[tag][(id + al) % @bin_num] += @tick
      end
    end
  }
end

#initialize_hists(tags = nil) ⇒ Object

Histogram plugin’s method



91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_histogram.rb', line 91

def initialize_hists(tags=nil)
  hists = {}
  if tags
    tags.each do |tag|
      hists[tag] = @zero_hist.dup
    end
  end
  hists
end

#shutdownObject



82
83
84
85
86
# File 'lib/fluent/plugin/out_histogram.rb', line 82

def shutdown
  super
  @watcher.terminate
  @watcher.join
end

#startObject



65
66
67
68
# File 'lib/fluent/plugin/out_histogram.rb', line 65

def start
  super
  @watcher = Thread.new(&method(:watch))
end

#tagging(flushed) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/fluent/plugin/out_histogram.rb', line 134

def tagging(flushed)
  tagged = {}
  tagged = Hash[ flushed.map do |tag, hist|
    tagged_tag = tag.dup
    if @tag 
      tagged_tag = @tag
    else
      if @input_tag_remove_prefix &&
        ( ( tag.start_with?(@remove_prefix_string) && 
           tag.length > @remove_prefix_length ) ||
           tag == @input_tag_remove_prefix)
        tagged_tag = tagged_tag[@input_tag_remove_prefix.length..-1]
      end
      
      tagged_tag = @tag_prefix_string + tagged_tag if @tag_prefix
      tagged_tag << @tag_suffix_string if @tag_suffix

      tagged_tag.gsub!(/(^\.+)|(\.+$)/, '')
      tagged_tag.gsub!(/(\.\.+)/, '.')
    end

    [tagged_tag, hist]
  end ]
  tagged
end

#watchObject



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_histogram.rb', line 70

def watch
  @last_checked = Fluent::Engine.now
  while true
    sleep 0.5
    if Fluent::Engine.now - @last_checked >= @flush_interval
      now = Fluent::Engine.now
      flush_emit(now)
      @last_checked = now
    end
  end
end