Class: Fluent::Plugin::TailInput::TailWatcher
- Inherits:
-
Object
- Object
- Fluent::Plugin::TailInput::TailWatcher
- Defined in:
- lib/fluent/plugin/in_tail.rb
Defined Under Namespace
Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler
Instance Attribute Summary collapse
-
#group_watcher ⇒ Object
Returns the value of attribute group_watcher.
-
#ino ⇒ Object
readonly
Returns the value of attribute ino.
-
#line_buffer_timer_flusher ⇒ Object
readonly
Returns the value of attribute line_buffer_timer_flusher.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#pe ⇒ Object
readonly
Returns the value of attribute pe.
-
#unwatched ⇒ Object
This is used for removing position entry from PositionFile.
-
#watchers ⇒ Object
readonly
Returns the value of attribute watchers.
Instance Method Summary collapse
- #close ⇒ Object
- #detach(shutdown_start_time = nil) ⇒ Object
- #eof? ⇒ Boolean
-
#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher
constructor
A new instance of TailWatcher.
- #io_handler ⇒ Object
- #on_notify ⇒ Object
- #on_rotate(stat) ⇒ Object
- #register_watcher(watcher) ⇒ Object
- #swap_state(pe) ⇒ Object
- #tag ⇒ Object
Constructor Details
#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher
Returns a new instance of TailWatcher.
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 |
# File 'lib/fluent/plugin/in_tail.rb', line 813 def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path @ino = target_info.ino @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @follow_inodes = follow_inodes @update_watcher = update_watcher @log = log @rotate_handler = RotateHandler.new(log, &method(:on_rotate)) @line_buffer_timer_flusher = line_buffer_timer_flusher @io_handler = nil @io_handler_build = io_handler_build @metrics = metrics @watchers = [] end |
Instance Attribute Details
#group_watcher ⇒ Object
Returns the value of attribute group_watcher.
834 835 836 |
# File 'lib/fluent/plugin/in_tail.rb', line 834 def group_watcher @group_watcher end |
#ino ⇒ Object (readonly)
Returns the value of attribute ino.
829 830 831 |
# File 'lib/fluent/plugin/in_tail.rb', line 829 def ino @ino end |
#line_buffer_timer_flusher ⇒ Object (readonly)
Returns the value of attribute line_buffer_timer_flusher.
831 832 833 |
# File 'lib/fluent/plugin/in_tail.rb', line 831 def line_buffer_timer_flusher @line_buffer_timer_flusher end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
829 830 831 |
# File 'lib/fluent/plugin/in_tail.rb', line 829 def path @path end |
#pe ⇒ Object (readonly)
Returns the value of attribute pe.
830 831 832 |
# File 'lib/fluent/plugin/in_tail.rb', line 830 def pe @pe end |
#unwatched ⇒ Object
This is used for removing position entry from PositionFile
832 833 834 |
# File 'lib/fluent/plugin/in_tail.rb', line 832 def unwatched @unwatched end |
#watchers ⇒ Object (readonly)
Returns the value of attribute watchers.
833 834 835 |
# File 'lib/fluent/plugin/in_tail.rb', line 833 def watchers @watchers end |
Instance Method Details
#close ⇒ Object
852 853 854 855 856 857 |
# File 'lib/fluent/plugin/in_tail.rb', line 852 def close if @io_handler @io_handler.close @io_handler = nil end end |
#detach(shutdown_start_time = nil) ⇒ Object
844 845 846 847 848 849 850 |
# File 'lib/fluent/plugin/in_tail.rb', line 844 def detach(shutdown_start_time = nil) if @io_handler @io_handler.ready_to_shutdown(shutdown_start_time) @io_handler.on_notify end @line_buffer_timer_flusher&.close(self) end |
#eof? ⇒ Boolean
859 860 861 |
# File 'lib/fluent/plugin/in_tail.rb', line 859 def eof? @io_handler.nil? || @io_handler.eof? end |
#io_handler ⇒ Object
957 958 959 |
# File 'lib/fluent/plugin/in_tail.rb', line 957 def io_handler @io_handler_build.call(self, @path) end |
#on_notify ⇒ Object
863 864 865 866 867 868 869 870 871 872 873 874 |
# File 'lib/fluent/plugin/in_tail.rb', line 863 def on_notify begin stat = Fluent::FileWrapper.stat(@path) rescue Errno::ENOENT, Errno::EACCES # moved or deleted stat = nil end @rotate_handler.on_notify(stat) if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher @io_handler.on_notify if @io_handler end |
#on_rotate(stat) ⇒ Object
876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 |
# File 'lib/fluent/plugin/in_tail.rb', line 876 def on_rotate(stat) if @io_handler.nil? if stat # first time fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case of a and b, seek to the saved position # c) file was once renamed, truncated and then backed # in this case, consider it truncated @pe.update(inode, 0) if fsize < @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. @pe.update(inode, 0) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end @io_handler = io_handler else @io_handler = NullIOHandler.new end else watcher_needs_update = false if stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(0) @io_handler.close elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher @pe.update(inode, 0) else # file is rotated and new file found watcher_needs_update = true # Handle the old log file before renewing TailWatcher [fluentd#1055] @io_handler.on_notify end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil watcher_needs_update = true end if watcher_needs_update if @follow_inodes # If stat is nil (file not present), NEED to stop and discard this watcher. # When the file is disappeared but is resurrected soon, then `#refresh_watcher` # can't recognize this TailWatcher needs to be stopped. # This can happens when the file is rotated. # If a notify comes before the new file for the path is created during rotation, # then it appears as if the file was resurrected once it disappeared. # Don't want to swap state because we need latest read offset in pos file even after rotate_wait @update_watcher.call(self, @pe, stat&.ino) else # Permit to handle if stat is nil (file not present). # If a file is mv-ed and a new file is created during # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers` # and `#stop_watchers()` for the path because `target_paths_hash` # always contains the path. @update_watcher.call(self, swap_state(@pe), stat&.ino) end else @log.info "detected rotation of #{@path}" @io_handler = io_handler end @metrics.rotated.inc end end |
#register_watcher(watcher) ⇒ Object
840 841 842 |
# File 'lib/fluent/plugin/in_tail.rb', line 840 def register_watcher(watcher) @watchers << watcher end |
#swap_state(pe) ⇒ Object
961 962 963 964 965 966 967 |
# File 'lib/fluent/plugin/in_tail.rb', line 961 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe pe # This pe will be updated in on_rotate after TailWatcher is initialized end |
#tag ⇒ Object
836 837 838 |
# File 'lib/fluent/plugin/in_tail.rb', line 836 def tag @parsed_tag ||= @path.tr('/', '.').squeeze('.').gsub(/^\./, '') end |