Class: LogStash::Filters::Hashtree
- Inherits:
-
Base
- Object
- Base
- LogStash::Filters::Hashtree
- Defined in:
- lib/logstash/filters/hashtree.rb
Overview
Combine fingerprints of one field of consecutive messages.
Instance Method Summary collapse
- #filter(event) ⇒ Object
- #fingerprint(data) ⇒ Object
- #register ⇒ Object
- #select_digest(method) ⇒ Object
Instance Method Details
#filter(event) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/logstash/filters/hashtree.rb', line 33 def filter(event) fingerprint = fingerprint(event.get(@source)) File.open(@file, File::RDWR|File::CREAT, 0644) {|f| f.flock(File::LOCK_EX) event.set(@previous, previous = f.read) event.set(@target, combined = fingerprint(fingerprint + previous)) f.rewind f.write(combined) } filter_matched(event) end |
#fingerprint(data) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/logstash/filters/hashtree.rb', line 45 def fingerprint(data) # since OpenSSL::Digest instances aren't thread safe, we must ensure that # each pipeline worker thread gets its own instance. # Also, since a logstash pipeline may contain multiple fingerprint filters # we must include the id in the thread local variable name, so that we can # store multiple digest instances digest_string = "digest-#{id}" Thread.current[digest_string] ||= select_digest(@method) digest = Thread.current[digest_string] # in JRuby 1.7.11 outputs as ASCII-8BIT digest.hexdigest(data.to_s).force_encoding(Encoding::UTF_8) end |
#register ⇒ Object
27 28 29 30 |
# File 'lib/logstash/filters/hashtree.rb', line 27 def register # convert to symbol for faster comparisons @method = @method.to_sym end |
#select_digest(method) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/logstash/filters/hashtree.rb', line 58 def select_digest(method) case method when :SHA1 OpenSSL::Digest::SHA1.new when :SHA256 OpenSSL::Digest::SHA256.new when :SHA384 OpenSSL::Digest::SHA384.new when :SHA512 OpenSSL::Digest::SHA512.new when :MD5 OpenSSL::Digest::MD5.new else # we really should never get here raise(LogStash::ConfigurationError, "Unknown digest for method=#{method.to_s}") end end |