Class: LogStash::Filters::Multiline

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/filters/multiline.rb

Overview

following line.

Constant Summary collapse

MULTILINE_TAG =
"multiline"
@@patterns_path =

Register default pattern paths

Set.new

Instance Method Summary collapse

Constructor Details

#initialize(config = {}) ⇒ Multiline

Returns a new instance of Multiline.



124
125
126
127
128
129
130
131
132
133
134
# File 'lib/logstash/filters/multiline.rb', line 124

def initialize(config = {})
  super

  # this filter cannot be parallelized because message order
  # cannot be garanteed across threads, line #2 could be processed
  # before line #1
  @threadsafe = false

  # this filter needs to keep state
  @pending = Hash.new
end

Instance Method Details

#closeObject



210
211
212
# File 'lib/logstash/filters/multiline.rb', line 210

def close
  # nothing to do
end

#filter(event) ⇒ Object



167
168
169
170
171
172
173
174
175
176
# File 'lib/logstash/filters/multiline.rb', line 167

def filter(event)
  match = event.get(@source).is_a?(Array) ? @grok.match(event.get(@source).first) : @grok.match(event.get(@source))
  match = (match && !@negate) || (!match && @negate) # add negate option

  @logger.debug? && @logger.debug("Multiline", :pattern => @pattern, :message => event.get(@source), :match => match, :negate => @negate)

  multiline_filter!(event, match)

  filter_matched(event) unless event.cancelled?
end

#flush(options = {}) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/logstash/filters/multiline.rb', line 184

def flush(options = {})
  # note that thread safety concerns are not necessary here because the multiline filter
  # is not thread safe thus cannot be run in multiple filterworker threads and flushing
  # is called by the same thread

  # select all expired events from the @pending hash into a new expired hash
  # if :final flush then select all events
  expired = @pending.inject({}) do |result, (key, events)|
    unless events.empty?
      age = Time.now - events.first.get("@timestamp").time
      result[key] = events if (age >= @max_age) || options[:final]
    end
    result
  end

  # return list of uncancelled expired events
  expired.map do |key, events|
    @pending.delete(key)
    event = merge(events)
    event.uncancel
    filter_matched(event)
    event
  end
end

#registerObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/logstash/filters/multiline.rb', line 137

def register
  require "grok-pure" # rubygem 'jls-grok'

  @grok = Grok.new

  @patterns_dir = @@patterns_path.to_a + @patterns_dir
  @patterns_dir.each do |path|
    path = File.join(path, "*") if File.directory?(path)
    Dir.glob(path).each do |file|
      @logger.info("Grok loading patterns from file", :path => file)
      @grok.add_patterns_from_file(file)
    end
  end

  @grok.compile(@pattern)

  case @what
  when "previous"
    class << self; alias_method :multiline_filter!, :previous_filter!; end
  when "next"
    class << self; alias_method :multiline_filter!, :next_filter!; end
  else
    # we should never get here since @what is validated at config
    raise(ArgumentError, "Unknown multiline 'what' value")
  end # case @what

  @logger.debug("Registered multiline plugin", :type => @type, :config => @config)
end