Class: Fluent::FlowCounterSimpleOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::FlowCounterSimpleOutput
- Defined in:
- lib/fluent/plugin/out_flowcounter_simple.rb
Instance Attribute Summary collapse
-
#last_checked ⇒ Object
Returns the value of attribute last_checked.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #countup(count) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #flush_emit(step) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #start_watch ⇒ Object
- #watch ⇒ Object
Instance Attribute Details
#last_checked ⇒ Object
Returns the value of attribute last_checked.
13 14 15 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 13 def last_checked @last_checked end |
Instance Method Details
#configure(conf) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 15 def configure(conf) super @indicator_proc = case @indicator when 'num' then Proc.new {|record| 1 } when 'byte' then Proc.new {|record| record.to_msgpack.size } else raise Fluent::ConfigError, "flowcounter-simple count allows num/byte" end @unit = case @unit when 'second' then :second when 'minute' then :minute when 'hour' then :hour when 'day' then :day else raise Fluent::ConfigError, "flowcounter-simple unit allows second/minute/hour/day" end @tick = case @unit when :second then 1 when :minute then 60 when :hour then 3600 when :day then 86400 else raise RuntimeError, "@unit must be one of second/minute/hour/day" end @output_proc = if @comment Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" } else Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" } end @count = 0 @mutex = Mutex.new end |
#countup(count) ⇒ Object
66 67 68 69 70 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 66 def countup(count) @mutex.synchronize { @count = (@count || 0) + count } end |
#emit(tag, es, chain) ⇒ Object
97 98 99 100 101 102 103 104 105 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 97 def emit(tag, es, chain) count = 0 es.each {|time,record| count += @indicator_proc.call(record) } countup(count) chain.next end |
#flush_emit(step) ⇒ Object
72 73 74 75 76 77 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 72 def flush_emit(step) count, @count = @count, 0 if count > 0 log.info @output_proc.call(count) end end |
#shutdown ⇒ Object
60 61 62 63 64 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 60 def shutdown super @watcher.terminate @watcher.join end |
#start ⇒ Object
55 56 57 58 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 55 def start super start_watch end |
#start_watch ⇒ Object
79 80 81 82 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 79 def start_watch # for internal, or tests only @watcher = Thread.new(&method(:watch)) end |
#watch ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/fluent/plugin/out_flowcounter_simple.rb', line 84 def watch # instance variable, and public accessable, for test @last_checked = Fluent::Engine.now while true sleep 0.1 if Fluent::Engine.now - @last_checked >= @tick now = Fluent::Engine.now flush_emit(now - @last_checked) @last_checked = now end end end |