Class: LogStash::Filters::Multiline
- Inherits:
-
Base
- Object
- Base
- LogStash::Filters::Multiline
- Defined in:
- lib/logstash/filters/multiline.rb
Overview
following line.
Constant Summary collapse
- MULTILINE_TAG =
"multiline"
- @@patterns_path =
Register default pattern paths
Set.new
Instance Method Summary collapse
- #close ⇒ Object
- #filter(event) ⇒ Object
- #flush(options = {}) ⇒ Object
-
#initialize(config = {}) ⇒ Multiline
constructor
A new instance of Multiline.
- #register ⇒ Object
Constructor Details
#initialize(config = {}) ⇒ Multiline
Returns a new instance of Multiline.
124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/logstash/filters/multiline.rb', line 124 def initialize(config = {}) super # this filter cannot be parallelized because message order # cannot be garanteed across threads, line #2 could be processed # before line #1 @threadsafe = false # this filter needs to keep state @pending = Hash.new end |
Instance Method Details
#close ⇒ Object
210 211 212 |
# File 'lib/logstash/filters/multiline.rb', line 210 def close # nothing to do end |
#filter(event) ⇒ Object
167 168 169 170 171 172 173 174 175 176 |
# File 'lib/logstash/filters/multiline.rb', line 167 def filter(event) match = event.get(@source).is_a?(Array) ? @grok.match(event.get(@source).first) : @grok.match(event.get(@source)) match = (match && !@negate) || (!match && @negate) # add negate option @logger.debug? && @logger.debug("Multiline", :pattern => @pattern, :message => event.get(@source), :match => match, :negate => @negate) multiline_filter!(event, match) filter_matched(event) unless event.cancelled? end |
#flush(options = {}) ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/logstash/filters/multiline.rb', line 184 def flush( = {}) # note that thread safety concerns are not necessary here because the multiline filter # is not thread safe thus cannot be run in multiple filterworker threads and flushing # is called by the same thread # select all expired events from the @pending hash into a new expired hash # if :final flush then select all events expired = @pending.inject({}) do |result, (key, events)| unless events.empty? age = Time.now - events.first.get("@timestamp").time result[key] = events if (age >= @max_age) || [:final] end result end # return list of uncancelled expired events expired.map do |key, events| @pending.delete(key) event = merge(events) event.uncancel filter_matched(event) event end end |
#register ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/logstash/filters/multiline.rb', line 137 def register require "grok-pure" # rubygem 'jls-grok' @grok = Grok.new @patterns_dir = @@patterns_path.to_a + @patterns_dir @patterns_dir.each do |path| path = File.join(path, "*") if File.directory?(path) Dir.glob(path).each do |file| @logger.info("Grok loading patterns from file", :path => file) @grok.add_patterns_from_file(file) end end @grok.compile(@pattern) case @what when "previous" class << self; alias_method :multiline_filter!, :previous_filter!; end when "next" class << self; alias_method :multiline_filter!, :next_filter!; end else # we should never get here since @what is validated at config raise(ArgumentError, "Unknown multiline 'what' value") end # case @what @logger.debug("Registered multiline plugin", :type => @type, :config => @config) end |