Class: LogStash::Filters::Hashid

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/filters/hashid.rb

Overview

This filter allow you to generate predictable, string encoded hashed keys based om event contents and timestamp. This can be used to avoid getting duplicate records indexed into Elasticsearch.

Hashed keys to be generated based on full or partial hashes and has the ability to prefix these keys based on the event timestamp in order to make then largely ordered by timestamp, which tend to lead to increased indexing performance for event based use cases where data is being indexed in near real time.

When used with the timestamp prefix enabled, it should ideally be run after the date filter has run and populated the @timestamp field.

Constant Summary collapse

CHARS =
'-0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz'.chars.to_a.freeze
SHIFTS =
[18, 12, 6, 0].freeze

Instance Method Summary collapse

Instance Method Details

#encode_to_sortable_string(data) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/logstash/filters/hashid.rb', line 95

def encode_to_sortable_string(data)
  idxes = []
  to_take = 0
  data.each_slice(3) do |part0, part1, part2|
    to_take = 0
    if part1.nil?
      part1 = part2 = 0
      to_take = 2
    end
    if part2.nil?
      part2 = 0
      to_take = 1
    end
    group24 = (part0 << 16) | (part1 << 8) | part2
    idxes.concat(SHIFTS.map{|n| (group24 >> n) & 0x3f })
  end
  CHARS.values_at(*idxes.take(idxes.size - to_take)).join
end

#filter(event) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/logstash/filters/hashid.rb', line 50

def filter(event)
  hmac = OpenSSL::HMAC.new(@key.value, @digest.new)

  @source.sort.each do |k|
    hmac.update("|#{k}|#{event.get(k)}") 
  end

  hash = hmac.digest

  if !@hash_bytes_used.nil? && @hash_bytes_used > 0 && hash.length > @hash_bytes_used
    hash = hash[(-1 * @hash_bytes_used), @hash_bytes_used]
  end

  epoch_array = []
  if @add_timestamp_prefix
    epoch = event.get('@timestamp').to_i
    epoch_array.push(epoch >> 24)
    epoch_array.push((epoch >> 16) % 256)
    epoch_array.push((epoch >> 8) % 256)
    epoch_array.push(epoch % 256)
  end

  binary_array = epoch_array + hash.unpack('C*')

  event.set(@target, encode_to_sortable_string(binary_array).force_encoding(Encoding::UTF_8))
end

#registerObject



44
45
46
47
48
# File 'lib/logstash/filters/hashid.rb', line 44

def register
  # convert to symbol for faster comparisons
  @method = @method.to_sym
  @digest = select_digest(@method)
end

#select_digest(method) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/logstash/filters/hashid.rb', line 77

def select_digest(method)
  case method
  when :SHA1
    OpenSSL::Digest::SHA1
  when :SHA256
    OpenSSL::Digest::SHA256
  when :SHA384
    OpenSSL::Digest::SHA384
  when :SHA512
    OpenSSL::Digest::SHA512
  when :MD5
    OpenSSL::Digest::MD5
  else
    # we really should never get here
    raise(LogStash::ConfigurationError, "Unknown digest for method=#{method.to_s}")
  end
end