Class: Fluent::Plugin::TailInput::TailWatcher
- Inherits:
-
Object
- Object
- Fluent::Plugin::TailInput::TailWatcher
- Defined in:
- lib/fluent/plugin/in_tail.rb
Defined Under Namespace
Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler
Instance Attribute Summary collapse
-
#group_watcher ⇒ Object
Returns the value of attribute group_watcher.
-
#ino ⇒ Object
readonly
Returns the value of attribute ino.
-
#line_buffer_timer_flusher ⇒ Object
readonly
Returns the value of attribute line_buffer_timer_flusher.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#pe ⇒ Object
readonly
Returns the value of attribute pe.
-
#unwatched ⇒ Object
This is used for removing position entry from PositionFile.
-
#watchers ⇒ Object
readonly
Returns the value of attribute watchers.
Instance Method Summary collapse
- #close ⇒ Object
- #detach(shutdown_start_time = nil) ⇒ Object
- #eof? ⇒ Boolean
-
#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher
constructor
A new instance of TailWatcher.
- #io_handler ⇒ Object
- #on_notify ⇒ Object
- #on_rotate(stat) ⇒ Object
- #register_watcher(watcher) ⇒ Object
- #swap_state(pe) ⇒ Object
- #tag ⇒ Object
Constructor Details
#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher
Returns a new instance of TailWatcher.
825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 |
# File 'lib/fluent/plugin/in_tail.rb', line 825 def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path @ino = target_info.ino @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @follow_inodes = follow_inodes @update_watcher = update_watcher @log = log @rotate_handler = RotateHandler.new(log, &method(:on_rotate)) @line_buffer_timer_flusher = line_buffer_timer_flusher @io_handler = nil @io_handler_build = io_handler_build @metrics = metrics @watchers = [] end |
Instance Attribute Details
#group_watcher ⇒ Object
Returns the value of attribute group_watcher.
846 847 848 |
# File 'lib/fluent/plugin/in_tail.rb', line 846 def group_watcher @group_watcher end |
#ino ⇒ Object (readonly)
Returns the value of attribute ino.
841 842 843 |
# File 'lib/fluent/plugin/in_tail.rb', line 841 def ino @ino end |
#line_buffer_timer_flusher ⇒ Object (readonly)
Returns the value of attribute line_buffer_timer_flusher.
843 844 845 |
# File 'lib/fluent/plugin/in_tail.rb', line 843 def line_buffer_timer_flusher @line_buffer_timer_flusher end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
841 842 843 |
# File 'lib/fluent/plugin/in_tail.rb', line 841 def path @path end |
#pe ⇒ Object (readonly)
Returns the value of attribute pe.
842 843 844 |
# File 'lib/fluent/plugin/in_tail.rb', line 842 def pe @pe end |
#unwatched ⇒ Object
This is used for removing position entry from PositionFile
844 845 846 |
# File 'lib/fluent/plugin/in_tail.rb', line 844 def unwatched @unwatched end |
#watchers ⇒ Object (readonly)
Returns the value of attribute watchers.
845 846 847 |
# File 'lib/fluent/plugin/in_tail.rb', line 845 def watchers @watchers end |
Instance Method Details
#close ⇒ Object
864 865 866 867 868 869 |
# File 'lib/fluent/plugin/in_tail.rb', line 864 def close if @io_handler @io_handler.close @io_handler = nil end end |
#detach(shutdown_start_time = nil) ⇒ Object
856 857 858 859 860 861 862 |
# File 'lib/fluent/plugin/in_tail.rb', line 856 def detach(shutdown_start_time = nil) if @io_handler @io_handler.ready_to_shutdown(shutdown_start_time) @io_handler.on_notify end @line_buffer_timer_flusher&.close(self) end |
#eof? ⇒ Boolean
871 872 873 |
# File 'lib/fluent/plugin/in_tail.rb', line 871 def eof? @io_handler.nil? || @io_handler.eof? end |
#io_handler ⇒ Object
969 970 971 |
# File 'lib/fluent/plugin/in_tail.rb', line 969 def io_handler @io_handler_build.call(self, @path) end |
#on_notify ⇒ Object
875 876 877 878 879 880 881 882 883 884 885 886 |
# File 'lib/fluent/plugin/in_tail.rb', line 875 def on_notify begin stat = Fluent::FileWrapper.stat(@path) rescue Errno::ENOENT, Errno::EACCES # moved or deleted stat = nil end @rotate_handler.on_notify(stat) if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher @io_handler.on_notify if @io_handler end |
#on_rotate(stat) ⇒ Object
888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 |
# File 'lib/fluent/plugin/in_tail.rb', line 888 def on_rotate(stat) if @io_handler.nil? if stat # first time fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case of a and b, seek to the saved position # c) file was once renamed, truncated and then backed # in this case, consider it truncated @pe.update(inode, 0) if fsize < @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. @pe.update(inode, 0) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end @io_handler = io_handler else @io_handler = NullIOHandler.new end else watcher_needs_update = false if stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(0) @io_handler.close elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher @pe.update(inode, 0) else # file is rotated and new file found watcher_needs_update = true # Handle the old log file before renewing TailWatcher [fluentd#1055] @io_handler.on_notify end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil watcher_needs_update = true end if watcher_needs_update if @follow_inodes # If stat is nil (file not present), NEED to stop and discard this watcher. # When the file is disappeared but is resurrected soon, then `#refresh_watcher` # can't recognize this TailWatcher needs to be stopped. # This can happens when the file is rotated. # If a notify comes before the new file for the path is created during rotation, # then it appears as if the file was resurrected once it disappeared. # Don't want to swap state because we need latest read offset in pos file even after rotate_wait @update_watcher.call(self, @pe, stat&.ino) else # Permit to handle if stat is nil (file not present). # If a file is mv-ed and a new file is created during # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers` # and `#stop_watchers()` for the path because `target_paths_hash` # always contains the path. @update_watcher.call(self, swap_state(@pe), stat&.ino) end else @log.info "detected rotation of #{@path}" @io_handler = io_handler end @metrics.rotated.inc end end |
#register_watcher(watcher) ⇒ Object
852 853 854 |
# File 'lib/fluent/plugin/in_tail.rb', line 852 def register_watcher(watcher) @watchers << watcher end |
#swap_state(pe) ⇒ Object
973 974 975 976 977 978 979 |
# File 'lib/fluent/plugin/in_tail.rb', line 973 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe pe # This pe will be updated in on_rotate after TailWatcher is initialized end |
#tag ⇒ Object
848 849 850 |
# File 'lib/fluent/plugin/in_tail.rb', line 848 def tag @parsed_tag ||= @path.tr('/', '.').squeeze('.').gsub(/^\./, '') end |