Class: Fluent::Plugin::TailInput::TailWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Defined Under Namespace

Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher

Returns a new instance of TailWatcher.



825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
# File 'lib/fluent/plugin/in_tail.rb', line 825

def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
  @path = target_info.path
  @ino = target_info.ino
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @follow_inodes = follow_inodes
  @update_watcher = update_watcher
  @log = log
  @rotate_handler = RotateHandler.new(log, &method(:on_rotate))
  @line_buffer_timer_flusher = line_buffer_timer_flusher
  @io_handler = nil
  @io_handler_build = io_handler_build
  @metrics = metrics
  @watchers = []
end

Instance Attribute Details

#group_watcherObject

Returns the value of attribute group_watcher.



846
847
848
# File 'lib/fluent/plugin/in_tail.rb', line 846

def group_watcher
  @group_watcher
end

#inoObject (readonly)

Returns the value of attribute ino.



841
842
843
# File 'lib/fluent/plugin/in_tail.rb', line 841

def ino
  @ino
end

#line_buffer_timer_flusherObject (readonly)

Returns the value of attribute line_buffer_timer_flusher.



843
844
845
# File 'lib/fluent/plugin/in_tail.rb', line 843

def line_buffer_timer_flusher
  @line_buffer_timer_flusher
end

#pathObject (readonly)

Returns the value of attribute path.



841
842
843
# File 'lib/fluent/plugin/in_tail.rb', line 841

def path
  @path
end

#peObject (readonly)

Returns the value of attribute pe.



842
843
844
# File 'lib/fluent/plugin/in_tail.rb', line 842

def pe
  @pe
end

#unwatchedObject

This is used for removing position entry from PositionFile



844
845
846
# File 'lib/fluent/plugin/in_tail.rb', line 844

def unwatched
  @unwatched
end

#watchersObject (readonly)

Returns the value of attribute watchers.



845
846
847
# File 'lib/fluent/plugin/in_tail.rb', line 845

def watchers
  @watchers
end

Instance Method Details

#closeObject



864
865
866
867
868
869
# File 'lib/fluent/plugin/in_tail.rb', line 864

def close
  if @io_handler
    @io_handler.close
    @io_handler = nil
  end
end

#detach(shutdown_start_time = nil) ⇒ Object



856
857
858
859
860
861
862
# File 'lib/fluent/plugin/in_tail.rb', line 856

def detach(shutdown_start_time = nil)
  if @io_handler
    @io_handler.ready_to_shutdown(shutdown_start_time)
    @io_handler.on_notify
  end
  @line_buffer_timer_flusher&.close(self)
end

#eof?Boolean

Returns:

  • (Boolean)


871
872
873
# File 'lib/fluent/plugin/in_tail.rb', line 871

def eof?
  @io_handler.nil? || @io_handler.eof?
end

#io_handlerObject



969
970
971
# File 'lib/fluent/plugin/in_tail.rb', line 969

def io_handler
  @io_handler_build.call(self, @path)
end

#on_notifyObject



875
876
877
878
879
880
881
882
883
884
885
886
# File 'lib/fluent/plugin/in_tail.rb', line 875

def on_notify
  begin
    stat = Fluent::FileWrapper.stat(@path)
  rescue Errno::ENOENT, Errno::EACCES
    # moved or deleted
    stat = nil
  end

  @rotate_handler.on_notify(stat) if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  @io_handler.on_notify if @io_handler
end

#on_rotate(stat) ⇒ Object



888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
# File 'lib/fluent/plugin/in_tail.rb', line 888

def on_rotate(stat)
  if @io_handler.nil?
    if stat
      # first time
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case of a and b, seek to the saved position
        #   c) file was once renamed, truncated and then backed
        # in this case, consider it truncated
        @pe.update(inode, 0) if fsize < @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        @pe.update(inode, 0)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      @io_handler = io_handler
    else
      @io_handler = NullIOHandler.new
    end
  else
    watcher_needs_update = false

    if stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(0)
        @io_handler.close
      elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, 0)
      else # file is rotated and new file found
        watcher_needs_update = true
        # Handle the old log file before renewing TailWatcher [fluentd#1055]
        @io_handler.on_notify
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      watcher_needs_update = true
    end

    if watcher_needs_update
      if @follow_inodes
        # If stat is nil (file not present), NEED to stop and discard this watcher.
        #   When the file is disappeared but is resurrected soon, then `#refresh_watcher`
        #   can't recognize this TailWatcher needs to be stopped.
        #   This can happens when the file is rotated.
        #   If a notify comes before the new file for the path is created during rotation,
        #   then it appears as if the file was resurrected once it disappeared.
        # Don't want to swap state because we need latest read offset in pos file even after rotate_wait
        @update_watcher.call(self, @pe, stat&.ino)
      else
        # Permit to handle if stat is nil (file not present).
        # If a file is mv-ed and a new file is created during
        # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers`
        # and `#stop_watchers()` for the path because `target_paths_hash`
        # always contains the path.
        @update_watcher.call(self, swap_state(@pe), stat&.ino)
      end
    else
      @log.info "detected rotation of #{@path}"
      @io_handler = io_handler
    end
    @metrics.rotated.inc
  end
end

#register_watcher(watcher) ⇒ Object



852
853
854
# File 'lib/fluent/plugin/in_tail.rb', line 852

def register_watcher(watcher)
  @watchers << watcher
end

#swap_state(pe) ⇒ Object



973
974
975
976
977
978
979
# File 'lib/fluent/plugin/in_tail.rb', line 973

def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

#tagObject



848
849
850
# File 'lib/fluent/plugin/in_tail.rb', line 848

def tag
  @parsed_tag ||= @path.tr('/', '.').squeeze('.').gsub(/^\./, '')
end