Class: FileWatch::SincedbCollection

Inherits:
Object
  • Object
show all
Includes:
LogStash::Util::Loggable
Defined in:
lib/filewatch/sincedb_collection.rb

Overview

this KV collection has a watched_file storage_key (an InodeStruct) as the key and a SincedbValue as the value. the SincedbValues are built by reading the sincedb file.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(settings) ⇒ SincedbCollection

Returns a new instance of SincedbCollection.



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/filewatch/sincedb_collection.rb', line 14

def initialize(settings)
  @settings = settings
  @sincedb_last_write = 0
  @sincedb = {}
  @serializer = SincedbRecordSerializer.new(@settings.sincedb_expiry_duration)
  @path = Pathname.new(@settings.sincedb_path)
  @write_method = LogStash::Environment.windows? || @path.chardev? || @path.blockdev? ? method(:non_atomic_write) : method(:atomic_write)
  @full_path = @path.to_path
  FileUtils.touch(@full_path)
  @write_requested = false
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



11
12
13
# File 'lib/filewatch/sincedb_collection.rb', line 11

def path
  @path
end

#serializer=(value) ⇒ Object (writeonly)

Sets the attribute serializer

Parameters:

  • value

    the value to set the attribute serializer to.



12
13
14
# File 'lib/filewatch/sincedb_collection.rb', line 12

def serializer=(value)
  @serializer = value
end

Instance Method Details

#associate(watched_file) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/filewatch/sincedb_collection.rb', line 63

def associate(watched_file)
  logger.trace? && logger.trace("associate: finding", :path => watched_file.path, :inode => watched_file.sincedb_key.inode)
  sincedb_value = find(watched_file)
  if sincedb_value.nil?
    # sincedb has no record of this inode
    # and due to the window handling of many files
    # this file may not be opened in this session.
    # a new value will be added when the file is opened
    logger.trace("associate: unmatched", :filename => watched_file.filename)
    return true
  end
  logger.trace? && logger.trace("associate: found sincedb record", :filename => watched_file.filename,
                                :sincedb_key => watched_file.sincedb_key, :sincedb_value => sincedb_value)
  if sincedb_value.watched_file.nil? # not associated
    if sincedb_value.path_in_sincedb.nil?
      handle_association(sincedb_value, watched_file)
      logger.trace? && logger.trace("associate: inode matched but no path in sincedb", :filename => watched_file.filename)
      return true
    end
    if sincedb_value.path_in_sincedb == watched_file.path
      # the path on disk is the same as discovered path and the inode is the same.
      handle_association(sincedb_value, watched_file)
      logger.trace? && logger.trace("associate: inode and path matched", :filename => watched_file.filename)
      return true
    end
    # the path on disk is different from discovered unassociated path but they have the same key (inode)
    # treat as a new file, a new value will be added when the file is opened
    sincedb_value.clear_watched_file
    delete(watched_file.sincedb_key)
    logger.trace? && logger.trace("associate: matched but allocated to another", :filename => watched_file.filename)
    return true
  end
  if sincedb_value.watched_file.equal?(watched_file) # pointer equals
    logger.trace? && logger.trace("associate: already associated", :filename => watched_file.filename)
    return true
  end
  # sincedb_value.watched_file is not this discovered watched_file but they have the same key (inode)
  # this means that the filename path was changed during this session.
  # renamed file can be discovered...
  #   before the original is detected as deleted: state is `active`
  #   after the original is detected as deleted but before it is actually deleted: state is `delayed_delete`
  #   after the original is deleted
  # are not yet in the delete phase, let this play out
  existing_watched_file = sincedb_value.watched_file
  logger.trace? && logger.trace("associate: found sincedb_value has a watched_file - this is a rename",
                                :this_watched_file => watched_file.details, :existing_watched_file => existing_watched_file.details)
  watched_file.rotation_in_progress
  true
end

#clearObject



167
168
169
# File 'lib/filewatch/sincedb_collection.rb', line 167

def clear
  @sincedb.clear
end

#clear_watched_file(key) ⇒ Object



159
160
161
# File 'lib/filewatch/sincedb_collection.rb', line 159

def clear_watched_file(key)
  @sincedb[key].clear_watched_file
end

#delete(key) ⇒ Object



130
131
132
# File 'lib/filewatch/sincedb_collection.rb', line 130

def delete(key)
  @sincedb.delete(key)
end

#find(watched_file) ⇒ Object



113
114
115
# File 'lib/filewatch/sincedb_collection.rb', line 113

def find(watched_file)
  get(watched_file.sincedb_key)
end

#flush_at_intervalObject



180
181
182
183
184
185
186
187
# File 'lib/filewatch/sincedb_collection.rb', line 180

def flush_at_interval
  now = Time.now
  delta = now.to_i - @sincedb_last_write
  if delta >= @settings.sincedb_write_interval
    logger.debug("writing sincedb (delta since last write = #{delta})")
    sincedb_write(now)
  end
end

#get(key) ⇒ Object



121
122
123
# File 'lib/filewatch/sincedb_collection.rb', line 121

def get(key)
  @sincedb[key]
end

#increment(key, amount) ⇒ Object



142
143
144
# File 'lib/filewatch/sincedb_collection.rb', line 142

def increment(key, amount)
  @sincedb[key].increment_position(amount)
end

#keysObject



171
172
173
# File 'lib/filewatch/sincedb_collection.rb', line 171

def keys
  @sincedb.keys
end

#last_read(key) ⇒ Object



134
135
136
# File 'lib/filewatch/sincedb_collection.rb', line 134

def last_read(key)
  @sincedb[key].position
end

#member?(key) ⇒ Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/filewatch/sincedb_collection.rb', line 117

def member?(key)
  @sincedb.member?(key)
end

#openObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/filewatch/sincedb_collection.rb', line 46

def open
  @time_sdb_opened = Time.now.to_f
  begin
    path.open do |file|
      logger.debug("open: reading from #{path}")
      @serializer.deserialize(file) do |key, value|
        logger.trace? && logger.trace("open: importing #{key.inspect} => #{value.inspect}")
        set_key_value(key, value)
      end
    end
    logger.trace("open: count of keys read: #{@sincedb.keys.size}")
  rescue => e
    #No existing sincedb to load
    logger.debug("open: error opening #{path}", :exception => e.class, :message => e.message)
  end
end

#reading_completed(key) ⇒ Object



163
164
165
# File 'lib/filewatch/sincedb_collection.rb', line 163

def reading_completed(key)
  @sincedb[key].reading_completed
end

#request_disk_flushObject



30
31
32
33
# File 'lib/filewatch/sincedb_collection.rb', line 30

def request_disk_flush
  @write_requested = true
  flush_at_interval
end

#rewind(key) ⇒ Object



138
139
140
# File 'lib/filewatch/sincedb_collection.rb', line 138

def rewind(key)
  @sincedb[key].update_position(0)
end

#set(key, value) ⇒ Object



125
126
127
128
# File 'lib/filewatch/sincedb_collection.rb', line 125

def set(key, value)
  @sincedb[key] = value
  value
end

#set_watched_file(key, watched_file) ⇒ Object



146
147
148
# File 'lib/filewatch/sincedb_collection.rb', line 146

def set_watched_file(key, watched_file)
  @sincedb[key].set_watched_file(watched_file)
end

#store_last_read(key, pos) ⇒ Object



155
156
157
# File 'lib/filewatch/sincedb_collection.rb', line 155

def store_last_read(key, pos)
  @sincedb[key].update_position(pos)
end

#watched_file_deleted(watched_file) ⇒ Object



150
151
152
153
# File 'lib/filewatch/sincedb_collection.rb', line 150

def watched_file_deleted(watched_file)
  value = @sincedb[watched_file.sincedb_key]
  value.unset_watched_file if value
end

#watched_file_unset?(key) ⇒ Boolean

Returns:

  • (Boolean)


175
176
177
178
# File 'lib/filewatch/sincedb_collection.rb', line 175

def watched_file_unset?(key)
  return false unless member?(key)
  get(key).watched_file.nil?
end

#write(reason = nil) ⇒ Object



41
42
43
44
# File 'lib/filewatch/sincedb_collection.rb', line 41

def write(reason=nil)
  logger.trace("caller requested sincedb write (#{reason})")
  sincedb_write
end

#write_if_requestedObject



35
36
37
38
39
# File 'lib/filewatch/sincedb_collection.rb', line 35

def write_if_requested
  if write_requested?
    flush_at_interval
  end
end

#write_requested?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'lib/filewatch/sincedb_collection.rb', line 26

def write_requested?
  @write_requested
end