Class: Fluent::Plugin::TailInput::TailWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Defined Under Namespace

Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher

Returns a new instance of TailWatcher.



813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
# File 'lib/fluent/plugin/in_tail.rb', line 813

def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
  @path = target_info.path
  @ino = target_info.ino
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @follow_inodes = follow_inodes
  @update_watcher = update_watcher
  @log = log
  @rotate_handler = RotateHandler.new(log, &method(:on_rotate))
  @line_buffer_timer_flusher = line_buffer_timer_flusher
  @io_handler = nil
  @io_handler_build = io_handler_build
  @metrics = metrics
  @watchers = []
end

Instance Attribute Details

#group_watcherObject

Returns the value of attribute group_watcher.



834
835
836
# File 'lib/fluent/plugin/in_tail.rb', line 834

def group_watcher
  @group_watcher
end

#inoObject (readonly)

Returns the value of attribute ino.



829
830
831
# File 'lib/fluent/plugin/in_tail.rb', line 829

def ino
  @ino
end

#line_buffer_timer_flusherObject (readonly)

Returns the value of attribute line_buffer_timer_flusher.



831
832
833
# File 'lib/fluent/plugin/in_tail.rb', line 831

def line_buffer_timer_flusher
  @line_buffer_timer_flusher
end

#pathObject (readonly)

Returns the value of attribute path.



829
830
831
# File 'lib/fluent/plugin/in_tail.rb', line 829

def path
  @path
end

#peObject (readonly)

Returns the value of attribute pe.



830
831
832
# File 'lib/fluent/plugin/in_tail.rb', line 830

def pe
  @pe
end

#unwatchedObject

This is used for removing position entry from PositionFile



832
833
834
# File 'lib/fluent/plugin/in_tail.rb', line 832

def unwatched
  @unwatched
end

#watchersObject (readonly)

Returns the value of attribute watchers.



833
834
835
# File 'lib/fluent/plugin/in_tail.rb', line 833

def watchers
  @watchers
end

Instance Method Details

#closeObject



852
853
854
855
856
857
# File 'lib/fluent/plugin/in_tail.rb', line 852

def close
  if @io_handler
    @io_handler.close
    @io_handler = nil
  end
end

#detach(shutdown_start_time = nil) ⇒ Object



844
845
846
847
848
849
850
# File 'lib/fluent/plugin/in_tail.rb', line 844

def detach(shutdown_start_time = nil)
  if @io_handler
    @io_handler.ready_to_shutdown(shutdown_start_time)
    @io_handler.on_notify
  end
  @line_buffer_timer_flusher&.close(self)
end

#eof?Boolean

Returns:

  • (Boolean)


859
860
861
# File 'lib/fluent/plugin/in_tail.rb', line 859

def eof?
  @io_handler.nil? || @io_handler.eof?
end

#io_handlerObject



957
958
959
# File 'lib/fluent/plugin/in_tail.rb', line 957

def io_handler
  @io_handler_build.call(self, @path)
end

#on_notifyObject



863
864
865
866
867
868
869
870
871
872
873
874
# File 'lib/fluent/plugin/in_tail.rb', line 863

def on_notify
  begin
    stat = Fluent::FileWrapper.stat(@path)
  rescue Errno::ENOENT, Errno::EACCES
    # moved or deleted
    stat = nil
  end

  @rotate_handler.on_notify(stat) if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  @io_handler.on_notify if @io_handler
end

#on_rotate(stat) ⇒ Object



876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
# File 'lib/fluent/plugin/in_tail.rb', line 876

def on_rotate(stat)
  if @io_handler.nil?
    if stat
      # first time
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case of a and b, seek to the saved position
        #   c) file was once renamed, truncated and then backed
        # in this case, consider it truncated
        @pe.update(inode, 0) if fsize < @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        @pe.update(inode, 0)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      @io_handler = io_handler
    else
      @io_handler = NullIOHandler.new
    end
  else
    watcher_needs_update = false

    if stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(0)
        @io_handler.close
      elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, 0)
      else # file is rotated and new file found
        watcher_needs_update = true
        # Handle the old log file before renewing TailWatcher [fluentd#1055]
        @io_handler.on_notify
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      watcher_needs_update = true
    end

    if watcher_needs_update
      if @follow_inodes
        # If stat is nil (file not present), NEED to stop and discard this watcher.
        #   When the file is disappeared but is resurrected soon, then `#refresh_watcher`
        #   can't recognize this TailWatcher needs to be stopped.
        #   This can happens when the file is rotated.
        #   If a notify comes before the new file for the path is created during rotation,
        #   then it appears as if the file was resurrected once it disappeared.
        # Don't want to swap state because we need latest read offset in pos file even after rotate_wait
        @update_watcher.call(self, @pe, stat&.ino)
      else
        # Permit to handle if stat is nil (file not present).
        # If a file is mv-ed and a new file is created during
        # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers`
        # and `#stop_watchers()` for the path because `target_paths_hash`
        # always contains the path.
        @update_watcher.call(self, swap_state(@pe), stat&.ino)
      end
    else
      @log.info "detected rotation of #{@path}"
      @io_handler = io_handler
    end
    @metrics.rotated.inc
  end
end

#register_watcher(watcher) ⇒ Object



840
841
842
# File 'lib/fluent/plugin/in_tail.rb', line 840

def register_watcher(watcher)
  @watchers << watcher
end

#swap_state(pe) ⇒ Object



961
962
963
964
965
966
967
# File 'lib/fluent/plugin/in_tail.rb', line 961

def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

#tagObject



836
837
838
# File 'lib/fluent/plugin/in_tail.rb', line 836

def tag
  @parsed_tag ||= @path.tr('/', '.').squeeze('.').gsub(/^\./, '')
end