Class: LogStash::Filters::Hashid
- Inherits:
-
Base
- Object
- Base
- LogStash::Filters::Hashid
- Defined in:
- lib/logstash/filters/hashid.rb
Overview
This filter allow you to generate predictable, string encoded hashed keys based om event contents and timestamp. This can be used to avoid getting duplicate records indexed into Elasticsearch.
Hashed keys to be generated based on full or partial hashes and has the ability to prefix these keys based on the event timestamp in order to make then largely ordered by timestamp, which tend to lead to increased indexing performance for event based use cases where data is being indexed in near real time.
When used with the timestamp prefix enabled, it should ideally be run after the date filter has run and populated the @timestamp field.
Constant Summary collapse
- CHARS =
'-0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz'.chars.to_a.freeze
- SHIFTS =
[18, 12, 6, 0].freeze
Instance Method Summary collapse
- #encode_to_sortable_string(data) ⇒ Object
- #filter(event) ⇒ Object
- #register ⇒ Object
- #select_digest(method) ⇒ Object
Instance Method Details
#encode_to_sortable_string(data) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/logstash/filters/hashid.rb', line 95 def encode_to_sortable_string(data) idxes = [] to_take = 0 data.each_slice(3) do |part0, part1, part2| to_take = 0 if part1.nil? part1 = part2 = 0 to_take = 2 end if part2.nil? part2 = 0 to_take = 1 end group24 = (part0 << 16) | (part1 << 8) | part2 idxes.concat(SHIFTS.map{|n| (group24 >> n) & 0x3f }) end CHARS.values_at(*idxes.take(idxes.size - to_take)).join end |
#filter(event) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/logstash/filters/hashid.rb', line 50 def filter(event) hmac = OpenSSL::HMAC.new(@key.value, @digest.new) @source.sort.each do |k| hmac.update("|#{k}|#{event.get(k)}") end hash = hmac.digest if !@hash_bytes_used.nil? && @hash_bytes_used > 0 && hash.length > @hash_bytes_used hash = hash[(-1 * @hash_bytes_used), @hash_bytes_used] end epoch_array = [] if @add_timestamp_prefix epoch = event.get('@timestamp').to_i epoch_array.push(epoch >> 24) epoch_array.push((epoch >> 16) % 256) epoch_array.push((epoch >> 8) % 256) epoch_array.push(epoch % 256) end binary_array = epoch_array + hash.unpack('C*') event.set(@target, encode_to_sortable_string(binary_array).force_encoding(Encoding::UTF_8)) end |
#register ⇒ Object
44 45 46 47 48 |
# File 'lib/logstash/filters/hashid.rb', line 44 def register # convert to symbol for faster comparisons @method = @method.to_sym @digest = select_digest(@method) end |
#select_digest(method) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/logstash/filters/hashid.rb', line 77 def select_digest(method) case method when :SHA1 OpenSSL::Digest::SHA1 when :SHA256 OpenSSL::Digest::SHA256 when :SHA384 OpenSSL::Digest::SHA384 when :SHA512 OpenSSL::Digest::SHA512 when :MD5 OpenSSL::Digest::MD5 else # we really should never get here raise(LogStash::ConfigurationError, "Unknown digest for method=#{method.to_s}") end end |