Class: FileWatch::WatchedFile

Inherits:
Object
  • Object
show all
Defined in:
lib/filewatch/watched_file.rb

Constant Summary collapse

PATH_BASED_STAT =
0
IO_BASED_STAT =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pathname, stat, settings) ⇒ WatchedFile

this class represents a file that has been discovered path based stat is taken at discovery



16
17
18
19
20
21
22
23
24
25
# File 'lib/filewatch/watched_file.rb', line 16

def initialize(pathname, stat, settings)
  @settings = settings
  @pathname = Pathname.new(pathname) # given arg pathname might be a string or a Pathname object
  @path = @pathname.to_path.freeze
  @filename = @pathname.basename.to_s
  full_state_reset(stat)
  watch
  set_standard_read_loop
  set_accessed_at
end

Instance Attribute Details

#accessed_atObject (readonly)

Returns the value of attribute accessed_at.



9
10
11
# File 'lib/filewatch/watched_file.rb', line 9

def accessed_at
  @accessed_at
end

#bufferObject (readonly)

Returns the value of attribute buffer.



8
9
10
# File 'lib/filewatch/watched_file.rb', line 8

def buffer
  @buffer
end

#bytes_readObject (readonly)

Returns the value of attribute bytes_read.



8
9
10
# File 'lib/filewatch/watched_file.rb', line 8

def bytes_read
  @bytes_read
end

#bytes_unreadObject (readonly)

Returns the value of attribute bytes_unread.



8
9
10
# File 'lib/filewatch/watched_file.rb', line 8

def bytes_unread
  @bytes_unread
end

#fileObject (readonly)

Returns the value of attribute file.



8
9
10
# File 'lib/filewatch/watched_file.rb', line 8

def file
  @file
end

#filenameObject (readonly)

Returns the value of attribute filename.



9
10
11
# File 'lib/filewatch/watched_file.rb', line 9

def filename
  @filename
end

#last_open_warning_atObject

Returns the value of attribute last_open_warning_at.



12
13
14
# File 'lib/filewatch/watched_file.rb', line 12

def last_open_warning_at
  @last_open_warning_at
end

#listenerObject (readonly)

Returns the value of attribute listener.



10
11
12
# File 'lib/filewatch/watched_file.rb', line 10

def listener
  @listener
end

#loop_count_modeObject (readonly)

Returns the value of attribute loop_count_mode.



11
12
13
# File 'lib/filewatch/watched_file.rb', line 11

def loop_count_mode
  @loop_count_mode
end

#loop_count_typeObject (readonly)

Returns the value of attribute loop_count_type.



11
12
13
# File 'lib/filewatch/watched_file.rb', line 11

def loop_count_type
  @loop_count_type
end

#pathObject (readonly)

Returns the value of attribute path.



9
10
11
# File 'lib/filewatch/watched_file.rb', line 9

def path
  @path
end

#pathnameObject (readonly)

Returns the value of attribute pathname.



9
10
11
# File 'lib/filewatch/watched_file.rb', line 9

def pathname
  @pathname
end

#read_chunk_sizeObject (readonly)

Returns the value of attribute read_chunk_size.



10
11
12
# File 'lib/filewatch/watched_file.rb', line 10

def read_chunk_size
  @read_chunk_size
end

#read_loop_countObject (readonly)

Returns the value of attribute read_loop_count.



10
11
12
# File 'lib/filewatch/watched_file.rb', line 10

def read_loop_count
  @read_loop_count
end

#recent_statesObject (readonly)

Returns the value of attribute recent_states.



8
9
10
# File 'lib/filewatch/watched_file.rb', line 8

def recent_states
  @recent_states
end

#statObject (readonly)

Returns the value of attribute stat.



10
11
12
# File 'lib/filewatch/watched_file.rb', line 10

def stat
  @stat
end

#stateObject (readonly)

Returns the value of attribute state.



8
9
10
# File 'lib/filewatch/watched_file.rb', line 8

def state
  @state
end

Instance Method Details

#activateObject



282
283
284
# File 'lib/filewatch/watched_file.rb', line 282

def activate
  set_state :active
end

#active?Boolean

Returns:

  • (Boolean)


319
320
321
# File 'lib/filewatch/watched_file.rb', line 319

def active?
  @state == :active
end

#all_read?Boolean

Returns:

  • (Boolean)


161
162
163
# File 'lib/filewatch/watched_file.rb', line 161

def all_read?
  @bytes_read >= @size
end

#buffer_extract(data) ⇒ Object



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/filewatch/watched_file.rb', line 247

def buffer_extract(data)
  warning, additional = "", {}
  lines = @buffer.extract(data)
  if lines.empty?
    warning.concat("buffer_extract: a delimiter can't be found in current chunk")
    warning.concat(", maybe there are no more delimiters or the delimiter is incorrect")
    warning.concat(" or the text before the delimiter, a 'line', is very large")
    warning.concat(", if this message is logged often try increasing the `file_chunk_size` setting.")
    additional["delimiter"] = @settings.delimiter
    additional["read_position"] = @bytes_read
    additional["bytes_read_count"] = data.bytesize
    additional["last_known_file_size"] = last_stat_size
    additional["file_path"] = @path
  end
  BufferExtractResult.new(lines, warning, additional)
end

#closeObject



295
296
297
# File 'lib/filewatch/watched_file.rb', line 295

def close
  set_state :closed
end

#closed?Boolean

Returns:

  • (Boolean)


331
332
333
# File 'lib/filewatch/watched_file.rb', line 331

def closed?
  @state == :closed
end

#compressed?Boolean

Returns:

  • (Boolean)


197
198
199
# File 'lib/filewatch/watched_file.rb', line 197

def compressed?
  @path.end_with?('.gz','.gzip')
end

#current_sizeObject



143
144
145
# File 'lib/filewatch/watched_file.rb', line 143

def current_size
  @size
end

#delay_deleteObject



307
308
309
# File 'lib/filewatch/watched_file.rb', line 307

def delay_delete
  set_state :delayed_delete
end

#delayed_delete?Boolean

Returns:

  • (Boolean)


323
324
325
# File 'lib/filewatch/watched_file.rb', line 323

def delayed_delete?
  @state == :delayed_delete
end

#detailsObject



418
419
420
421
422
423
# File 'lib/filewatch/watched_file.rb', line 418

def details
  detail = "@filename='#{@filename}', @state=#{@state.inspect}, @recent_states=#{@recent_states.inspect}, "
  detail.concat("@bytes_read=#{@bytes_read}, @bytes_unread=#{@bytes_unread}, current_size=#{current_size}, ")
  detail.concat("last_stat_size=#{last_stat_size}, file_open?=#{file_open?}, @initial=#{@initial}")
  "<FileWatch::WatchedFile: #{detail}, sincedb_key='#{sincedb_key}'>"
end

#expiry_close_enabled?Boolean

Returns:

  • (Boolean)


343
344
345
# File 'lib/filewatch/watched_file.rb', line 343

def expiry_close_enabled?
  !@settings.close_older.nil?
end

#expiry_ignore_enabled?Boolean

Returns:

  • (Boolean)


347
348
349
# File 'lib/filewatch/watched_file.rb', line 347

def expiry_ignore_enabled?
  !@settings.ignore_older.nil?
end

#file_add_opened(rubyfile) ⇒ Object



212
213
214
215
# File 'lib/filewatch/watched_file.rb', line 212

def file_add_opened(rubyfile)
  @file = rubyfile
  @buffer = BufferedTokenizer.new(@settings.delimiter) if @buffer.nil?
end

#file_at_path_found_againObject



165
166
167
# File 'lib/filewatch/watched_file.rb', line 165

def file_at_path_found_again
  restore_previous_state
end

#file_can_close?Boolean

Returns:

  • (Boolean)


413
414
415
416
# File 'lib/filewatch/watched_file.rb', line 413

def file_can_close?
  return false unless expiry_close_enabled?
  (Time.now.to_f - @accessed_at) > @settings.close_older
end

#file_closable?Boolean

Returns:

  • (Boolean)


401
402
403
# File 'lib/filewatch/watched_file.rb', line 401

def file_closable?
  file_can_close? && all_read?
end

#file_closeObject



217
218
219
220
221
# File 'lib/filewatch/watched_file.rb', line 217

def file_close
  return if @file.nil? || @file.closed?
  @file.close
  @file = nil
end

#file_ignorable?Boolean

Returns:

  • (Boolean)


405
406
407
408
409
410
411
# File 'lib/filewatch/watched_file.rb', line 405

def file_ignorable?
  return false unless expiry_ignore_enabled?
  # (Time.now - stat.mtime) <- in jruby, this does int and float
  # conversions before the subtraction and returns a float.
  # so use all floats upfront
  (Time.now.to_f - modified_at) > @settings.ignore_older
end

#file_open?Boolean

Returns:

  • (Boolean)


232
233
234
# File 'lib/filewatch/watched_file.rb', line 232

def file_open?
  !@file.nil? && !@file.closed?
end

#file_read(amount = nil) ⇒ Object



227
228
229
230
# File 'lib/filewatch/watched_file.rb', line 227

def file_read(amount = nil)
  set_accessed_at
  @file.sysread(amount || @read_chunk_size)
end

#file_seek(amount, whence = IO::SEEK_SET) ⇒ Object



223
224
225
# File 'lib/filewatch/watched_file.rb', line 223

def file_seek(amount, whence = IO::SEEK_SET)
  @file.sysseek(amount, whence)
end

#full_state_reset(this_stat = nil) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/filewatch/watched_file.rb', line 27

def full_state_reset(this_stat = nil)
  if this_stat.nil?
    begin
      this_stat = PathStatClass.new(pathname)
    rescue Errno::ENOENT
      delay_delete
      return
    end
  end
  @bytes_read = 0 # tracks bytes read from the open file or initialized from a matched sincedb_value off disk.
  @bytes_unread = 0 # tracks bytes not yet read from the open file. So we can warn on shrink when unread bytes are seen.
  file_close
  set_stat(this_stat)
  @listener = nil
  @last_open_warning_at = nil
  # initial as true means we have not associated this watched_file with a previous sincedb value yet.
  # and we should read from the beginning if necessary
  @initial = true
  @recent_states = [] # keep last 8 states, managed in set_state
  # the prepare_inode method is sourced from the mixed module above
  watch if active? || @state.nil?
end

#grown?Boolean

Returns:

  • (Boolean)


151
152
153
# File 'lib/filewatch/watched_file.rb', line 151

def grown?
  @size > @bytes_read
end

#has_listener?Boolean

Returns:

  • (Boolean)


177
178
179
# File 'lib/filewatch/watched_file.rb', line 177

def has_listener?
  !@listener.nil?
end

#ignoreObject



286
287
288
# File 'lib/filewatch/watched_file.rb', line 286

def ignore
  set_state :ignored
end

#ignore_as_unreadObject



290
291
292
293
# File 'lib/filewatch/watched_file.rb', line 290

def ignore_as_unread
  ignore
  @bytes_read = @size
end

#ignored?Boolean

Returns:

  • (Boolean)


327
328
329
# File 'lib/filewatch/watched_file.rb', line 327

def ignored?
  @state == :ignored
end

#increment_bytes_read(delta) ⇒ Object



264
265
266
267
268
269
# File 'lib/filewatch/watched_file.rb', line 264

def increment_bytes_read(delta)
  return if delta.nil?
  @bytes_read += delta
  update_bytes_unread
  @bytes_read
end

#initial?Boolean

Returns:

  • (Boolean)


193
194
195
# File 'lib/filewatch/watched_file.rb', line 193

def initial?
  @initial
end

#initial_completedObject



185
186
187
# File 'lib/filewatch/watched_file.rb', line 185

def initial_completed
  @initial = false
end

#inspectObject



425
426
427
# File 'lib/filewatch/watched_file.rb', line 425

def inspect
  "<FileWatch::WatchedFile: @filename='#{@filename}', @state=#{@state.inspect}, current_size=#{current_size}, sincedb_key='#{sincedb_key}'>"
end

#last_stat_sizeObject



139
140
141
# File 'lib/filewatch/watched_file.rb', line 139

def last_stat_size
  @stat.size
end

#loop_control_adjusted_for_stat_sizeObject



365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/filewatch/watched_file.rb', line 365

def loop_control_adjusted_for_stat_size
  more = false
  to_read = current_size - @bytes_read
  return LoopControlResult.new(0, 0, more) if to_read < 1
  return LoopControlResult.new(1, to_read, more) if to_read < @read_chunk_size
  # set as if to_read is greater than or equal to max_bytes
  # use the ones from settings and don't indicate more
  count = @read_loop_count
  if to_read < @standard_loop_max_bytes
    # if the defaults are used then this branch will be taken
    # e.g. to_read is 100 and max_bytes is 4 * 30 -> 120
    # will overrun and trigger EOF, build less iterations
    # will generate 3 * 30 -> 90 this time and we indicate more
    # a 2GB file in read mode will get one loop of 64666 x 32768 (2119006656 / 32768)
    # and a second loop with 1 x 31168
    count = to_read / @read_chunk_size
    more = true
  end
  LoopControlResult.new(count, @read_chunk_size, more)
end

#modified_at(update = false) ⇒ Object



115
116
117
118
119
120
121
# File 'lib/filewatch/watched_file.rb', line 115

def modified_at(update = false)
  if update || @modified_at.nil?
    @modified_at = @stat.modified_at
  else
    @modified_at
  end
end

#modified_at_changed?Boolean

Returns whether modified_at changed since it was last read.

Returns:

  • (Boolean)

    whether modified_at changed since it was last read

See Also:



125
126
127
# File 'lib/filewatch/watched_file.rb', line 125

def modified_at_changed?
  modified_at != @stat.modified_at
end

#openObject



208
209
210
# File 'lib/filewatch/watched_file.rb', line 208

def open
  file_add_opened(FileOpener.open(@path))
end

#position_for_new_sincedb_valueObject



129
130
131
132
133
134
135
136
137
# File 'lib/filewatch/watched_file.rb', line 129

def position_for_new_sincedb_value
  if @initial
    # this file was found in first discovery
    @settings.start_new_files_at == :beginning ? 0 : last_stat_size
  else
    # always start at the beginning if found after first discovery
    0
  end
end

#read_extract_lines(amount) ⇒ Object



240
241
242
243
244
245
# File 'lib/filewatch/watched_file.rb', line 240

def read_extract_lines(amount)
  data = file_read(amount)
  result = buffer_extract(data)
  increment_bytes_read(data.bytesize)
  result
end

#recent_state_historyObject



397
398
399
# File 'lib/filewatch/watched_file.rb', line 397

def recent_state_history
  @recent_states + Array(@state)
end

#reopenObject



201
202
203
204
205
206
# File 'lib/filewatch/watched_file.rb', line 201

def reopen
  if file_open?
    file_close
    open
  end
end

#reset_bufferObject



236
237
238
# File 'lib/filewatch/watched_file.rb', line 236

def reset_buffer
  @buffer.flush
end

#reset_bytes_unreadObject



386
387
388
389
# File 'lib/filewatch/watched_file.rb', line 386

def reset_bytes_unread
  # called from shrink
  @bytes_unread = 0
end

#restat!Object

Returns true if the file was modified since last stat.

Returns:

  • true if the file was modified since last stat



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/filewatch/watched_file.rb', line 101

def restat!
  modified_at # to always be able to detect changes
  @stat.restat
  if rotation_detected?
    # switch to new state now
    rotation_in_progress
    return true
  else
    @size = @stat.size
    update_bytes_unread
    modified_at_changed?
  end
end

#restore_previous_stateObject



311
312
313
# File 'lib/filewatch/watched_file.rb', line 311

def restore_previous_state
  set_state @recent_states.pop
end

#rotate_as_file(bytes_read = 0) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/filewatch/watched_file.rb', line 76

def rotate_as_file(bytes_read = 0)
  # rotation, when a sincedb record exists for new inode, but no watched file to rotate from
  # probably caused by a deletion detected in the middle of the rename cascade
  # RARE due to delayed_delete - there would have to be a large time span between the renames.
  @bytes_read = bytes_read # tracks bytes read from the open file or initialized from a matched sincedb_value off disk.
  @bytes_unread = 0 # tracks bytes not yet read from the open file. So we can warn on shrink when unread bytes are seen.
  @last_open_warning_at = nil
  # initial as true means we have not associated this watched_file with a previous sincedb value yet.
  # and we should read from the beginning if necessary
  @initial = false
  @recent_states = [] # keep last 8 states, managed in set_state
  set_stat(PathStatClass.new(pathname))
  reopen
  watch
end

#rotate_from(other) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/filewatch/watched_file.rb', line 50

def rotate_from(other)
  # move all state from other to this one
  set_standard_read_loop
  file_close
  @bytes_read = other.bytes_read
  @bytes_unread = other.bytes_unread
  @listener = nil
  @initial = false
  @recent_states = other.recent_states
  @accessed_at = other.accessed_at
  if !other.delayed_delete?
    # we don't know if a file exists at the other.path yet
    # so no reset
    other.full_state_reset
  end
  set_stat PathStatClass.new(pathname)
  ignore
end

#rotation_detected?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/filewatch/watched_file.rb', line 96

def rotation_detected?
  stat_sincedb_key != sincedb_key
end

#rotation_in_progressObject



278
279
280
# File 'lib/filewatch/watched_file.rb', line 278

def rotation_in_progress
  set_state :rotation_in_progress
end

#rotation_in_progress?Boolean

Returns:

  • (Boolean)


315
316
317
# File 'lib/filewatch/watched_file.rb', line 315

def rotation_in_progress?
  @state == :rotation_in_progress
end

#set_accessed_atObject



189
190
191
# File 'lib/filewatch/watched_file.rb', line 189

def set_accessed_at
  @accessed_at = Time.now.to_f
end

#set_listener(observer) ⇒ Object



169
170
171
# File 'lib/filewatch/watched_file.rb', line 169

def set_listener(observer)
  @listener = observer.listener_for(@path)
end

#set_maximum_read_loopObject



358
359
360
361
362
363
# File 'lib/filewatch/watched_file.rb', line 358

def set_maximum_read_loop
  # used to quickly fully read an open file when rotation is detected
  @read_loop_count = FileWatch::MAX_ITERATIONS
  @read_chunk_size = FileWatch::FILE_READ_SIZE
  @standard_loop_max_bytes = @read_loop_count * @read_chunk_size
end

#set_standard_read_loopObject



351
352
353
354
355
356
# File 'lib/filewatch/watched_file.rb', line 351

def set_standard_read_loop
  @read_loop_count = @settings.file_chunk_count
  @read_chunk_size = @settings.file_chunk_size
  # e.g. 1 * 10 bytes -> 10 or 256 * 65536 -> 1677716 or 140737488355327 * 32768 -> 4611686018427355136
  @standard_loop_max_bytes = @read_loop_count * @read_chunk_size
end

#set_state(value) ⇒ Object



391
392
393
394
395
# File 'lib/filewatch/watched_file.rb', line 391

def set_state(value)
  @recent_states.shift if @recent_states.size == 8
  @recent_states << @state unless @state.nil?
  @state = value
end

#shrunk?Boolean

Returns:

  • (Boolean)


147
148
149
# File 'lib/filewatch/watched_file.rb', line 147

def shrunk?
  @size < @bytes_read
end

#sincedb_keyObject



181
182
183
# File 'lib/filewatch/watched_file.rb', line 181

def sincedb_key
  @sdb_key_v1
end

#size_changed?Boolean

Returns:

  • (Boolean)


155
156
157
158
159
# File 'lib/filewatch/watched_file.rb', line 155

def size_changed?
  # called from closed and ignored
  # before the last stat was taken file should be fully read.
  @size != @bytes_read
end

#stat_sincedb_keyObject



92
93
94
# File 'lib/filewatch/watched_file.rb', line 92

def stat_sincedb_key
  @stat.inode_struct
end

#to_sObject



429
430
431
# File 'lib/filewatch/watched_file.rb', line 429

def to_s
  inspect
end

#unset_listenerObject



173
174
175
# File 'lib/filewatch/watched_file.rb', line 173

def unset_listener
  @listener = nil
end

#unwatchObject



303
304
305
# File 'lib/filewatch/watched_file.rb', line 303

def unwatch
  set_state :unwatched
end

#unwatched?Boolean

Returns:

  • (Boolean)


339
340
341
# File 'lib/filewatch/watched_file.rb', line 339

def unwatched?
  @state == :unwatched
end

#update_bytes_read(total_bytes_read) ⇒ Object



271
272
273
274
275
276
# File 'lib/filewatch/watched_file.rb', line 271

def update_bytes_read(total_bytes_read)
  return if total_bytes_read.nil?
  @bytes_read = total_bytes_read
  update_bytes_unread
  @bytes_read
end

#watchObject



299
300
301
# File 'lib/filewatch/watched_file.rb', line 299

def watch
  set_state :watched
end

#watched?Boolean

Returns:

  • (Boolean)


335
336
337
# File 'lib/filewatch/watched_file.rb', line 335

def watched?
  @state == :watched
end