Class: FileWatch::SincedbValue
- Inherits:
-
Object
- Object
- FileWatch::SincedbValue
- Defined in:
- lib/filewatch/sincedb_value.rb
Overview
Tracks the position and expiry of the offset of a file-of-interest NOTE: the ‘watched_file.bytes_read` and this `sincedb_value.position` can diverge At any given moment IF the `watched_file.bytes_read` is greater than `sincedb_value.position` then it is larger to account for bytes held in the `watched_file.buffer` in Tail mode if we quit the buffer is not flushed and we restart from the `sincedb_value.position` (end of the last line read). in Read mode the buffer is flushed as a line and both values should be the same.
Instance Attribute Summary collapse
-
#last_changed_at ⇒ Object
readonly
Returns the value of attribute last_changed_at.
-
#path_in_sincedb ⇒ Object
readonly
Returns the value of attribute path_in_sincedb.
-
#position ⇒ Object
readonly
Returns the value of attribute position.
-
#watched_file ⇒ Object
readonly
Returns the value of attribute watched_file.
Instance Method Summary collapse
- #add_path_in_sincedb(path) ⇒ Object
- #clear_watched_file ⇒ Object
- #increment_position(pos) ⇒ Object
-
#initialize(position, last_changed_at = nil, watched_file = nil) ⇒ SincedbValue
constructor
A new instance of SincedbValue.
- #last_changed_at_expires(duration) ⇒ Object
- #reading_completed ⇒ Object
- #set_watched_file(watched_file) ⇒ Object
- #to_s ⇒ Object
- #touch ⇒ Object
- #unset_watched_file ⇒ Object
- #update_position(pos) ⇒ Object
Constructor Details
#initialize(position, last_changed_at = nil, watched_file = nil) ⇒ SincedbValue
Returns a new instance of SincedbValue.
14 15 16 17 18 19 |
# File 'lib/filewatch/sincedb_value.rb', line 14 def initialize(position, last_changed_at = nil, watched_file = nil) @position = position # this is the value read from disk @last_changed_at = last_changed_at @watched_file = watched_file touch if @last_changed_at.nil? || @last_changed_at.zero? end |
Instance Attribute Details
#last_changed_at ⇒ Object (readonly)
Returns the value of attribute last_changed_at.
12 13 14 |
# File 'lib/filewatch/sincedb_value.rb', line 12 def last_changed_at @last_changed_at end |
#path_in_sincedb ⇒ Object (readonly)
Returns the value of attribute path_in_sincedb.
12 13 14 |
# File 'lib/filewatch/sincedb_value.rb', line 12 def path_in_sincedb @path_in_sincedb end |
#position ⇒ Object (readonly)
Returns the value of attribute position.
12 13 14 |
# File 'lib/filewatch/sincedb_value.rb', line 12 def position @position end |
#watched_file ⇒ Object (readonly)
Returns the value of attribute watched_file.
12 13 14 |
# File 'lib/filewatch/sincedb_value.rb', line 12 def watched_file @watched_file end |
Instance Method Details
#add_path_in_sincedb(path) ⇒ Object
21 22 23 24 |
# File 'lib/filewatch/sincedb_value.rb', line 21 def add_path_in_sincedb(path) @path_in_sincedb = path # can be nil self end |
#clear_watched_file ⇒ Object
65 66 67 |
# File 'lib/filewatch/sincedb_value.rb', line 65 def clear_watched_file @watched_file = nil end |
#increment_position(pos) ⇒ Object
37 38 39 40 41 42 43 |
# File 'lib/filewatch/sincedb_value.rb', line 37 def increment_position(pos) # called when actual lines are sent to the observer listener # this gets serialized as its a more true indication of position than # chunk read size touch @position += pos end |
#last_changed_at_expires(duration) ⇒ Object
26 27 28 |
# File 'lib/filewatch/sincedb_value.rb', line 26 def last_changed_at_expires(duration) @last_changed_at + duration end |
#reading_completed ⇒ Object
69 70 71 72 73 |
# File 'lib/filewatch/sincedb_value.rb', line 69 def reading_completed touch @path_in_sincedb = @watched_file.path @position = @watched_file.bytes_read end |
#set_watched_file(watched_file) ⇒ Object
45 46 47 48 |
# File 'lib/filewatch/sincedb_value.rb', line 45 def set_watched_file(watched_file) touch @watched_file = watched_file end |
#to_s ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/filewatch/sincedb_value.rb', line 54 def to_s # consider serializing the watched_file state as well "#{position} #{last_changed_at}".tap do |s| if @watched_file.nil? s.concat(" ").concat(@path_in_sincedb) unless @path_in_sincedb.nil? else s.concat(" ").concat(@watched_file.path) end end end |
#touch ⇒ Object
50 51 52 |
# File 'lib/filewatch/sincedb_value.rb', line 50 def touch @last_changed_at = Time.now.to_f end |
#unset_watched_file ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/filewatch/sincedb_value.rb', line 75 def unset_watched_file # called in read mode only because we flushed any remaining bytes as a final line. # cache the position # we don't cache the path here because we know we are done with this file. # either due via the `delete` handling # or when read mode is done with a file. # in the case of `delete` if the file was renamed then @watched_file is the # watched_file of the previous path and the new path will be discovered and # it should have the same inode as before. # The key from the new watched_file should then locate this entry and we # can resume from the cached position return if @watched_file.nil? wf = @watched_file @watched_file = nil @position = wf.bytes_read end |
#update_position(pos) ⇒ Object
30 31 32 33 34 35 |
# File 'lib/filewatch/sincedb_value.rb', line 30 def update_position(pos) # called when we reset the position to bof or eof on shrink or file read complete touch @position = pos @watched_file.update_bytes_read(pos) unless @watched_file.nil? end |