Class: Fluent::FlowCounterSimpleOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::FlowCounterSimpleOutput
- Defined in:
- lib/fluent/plugin/out_flowcounter_simple.rb
Instance Attribute Summary collapse
-
#last_checked ⇒ Object
Returns the value of attribute last_checked.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #countup(count) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #flush_emit(step) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #start_watch ⇒ Object
- #watch ⇒ Object
Instance Attribute Details
#last_checked ⇒ Object
Returns the value of attribute last_checked.
8 9 10 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 8 def last_checked @last_checked end |
Instance Method Details
#configure(conf) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 10 def configure(conf) super @indicator_proc = case @indicator when 'num' then Proc.new {|record| 1 } when 'byte' then Proc.new {|record| record.to_msgpack.size } else raise Fluent::ConfigError, "flowcounter-simple count allows num/byte" end @unit = case @unit when 'second' then :second when 'minute' then :minute when 'hour' then :hour when 'day' then :day else raise Fluent::ConfigError, "flowcounter-simple unit allows second/minute/hour/day" end @tick = case @unit when :second then 1 when :minute then 60 when :hour then 3600 when :day then 86400 else raise RuntimeError, "@unit must be one of second/minute/hour/day" end @output_proc = if @comment Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" } else Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" } end @count = 0 @mutex = Mutex.new end |
#countup(count) ⇒ Object
61 62 63 64 65 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 61 def countup(count) @mutex.synchronize { @count = (@count || 0) + count } end |
#emit(tag, es, chain) ⇒ Object
92 93 94 95 96 97 98 99 100 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 92 def emit(tag, es, chain) count = 0 es.each {|time,record| count += @indicator_proc.call(record) } countup(count) chain.next end |
#flush_emit(step) ⇒ Object
67 68 69 70 71 72 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 67 def flush_emit(step) count, @count = @count, 0 if count > 0 $log.info @output_proc.call(count) end end |
#shutdown ⇒ Object
55 56 57 58 59 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 55 def shutdown super @watcher.terminate @watcher.join end |
#start ⇒ Object
50 51 52 53 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 50 def start super start_watch end |
#start_watch ⇒ Object
74 75 76 77 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 74 def start_watch # for internal, or tests only @watcher = Thread.new(&method(:watch)) end |
#watch ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 79 def watch # instance variable, and public accessable, for test @last_checked = Fluent::Engine.now while true sleep 0.1 if Fluent::Engine.now - @last_checked >= @tick now = Fluent::Engine.now flush_emit(now - @last_checked) @last_checked = now end end end |