Class: LogStash::Inputs::File
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::File
- Defined in:
- lib/logstash/inputs/file.rb
Instance Attribute Summary collapse
- #watcher ⇒ Object readonly
Class Method Summary collapse
Instance Method Summary collapse
-
#completely_stopped? ⇒ Boolean
def register.
- #handle_deletable_path(path) ⇒ Object
-
#listener_for(path) ⇒ Object
The WatchedFile calls back here as ‘observer.listener_for(@path)`.
- #log_line_received(path, line) ⇒ Object
-
#post_process_this(event, path) ⇒ Object
def run.
- #queue ⇒ Object
- #register ⇒ Object
- #run(queue) ⇒ Object
- #start_processing ⇒ Object
- #stop ⇒ Object
Instance Attribute Details
#watcher ⇒ Object (readonly)
255 256 257 |
# File 'lib/logstash/inputs/file.rb', line 255 def watcher @watcher end |
Class Method Details
.old_validate_value ⇒ Object
242 |
# File 'lib/logstash/inputs/file.rb', line 242 alias_method :old_validate_value, :validate_value |
.validate_value(value, validator) ⇒ Object
244 245 246 247 248 249 250 251 |
# File 'lib/logstash/inputs/file.rb', line 244 def validate_value(value, validator) if validator.is_a?(Array) && validator.size == 2 && validator.first.respond_to?(:call) callable, units = *validator # returns a ValidatedStruct having a `to_a` method suitable to return to the config mixin caller return callable.call(value, units).to_a end old_validate_value(value, validator) end |
Instance Method Details
#completely_stopped? ⇒ Boolean
def register
336 337 338 339 |
# File 'lib/logstash/inputs/file.rb', line 336 def completely_stopped? # to synchronise after(:each) blocks in tests that remove the sincedb file before atomic_write completes @completely_stopped.true? end |
#handle_deletable_path(path) ⇒ Object
386 387 388 389 390 391 |
# File 'lib/logstash/inputs/file.rb', line 386 def handle_deletable_path(path) return if tail_mode? return if @completed_file_handlers.empty? @logger.debug? && @logger.debug(__method__.to_s, :path => path) @completed_file_handlers.each { |handler| handler.handle(path) } end |
#listener_for(path) ⇒ Object
The WatchedFile calls back here as ‘observer.listener_for(@path)`
343 344 345 |
# File 'lib/logstash/inputs/file.rb', line 343 def listener_for(path) FileListener.new(path, self) end |
#log_line_received(path, line) ⇒ Object
393 394 395 |
# File 'lib/logstash/inputs/file.rb', line 393 def log_line_received(path, line) @logger.debug? && @logger.debug("Received line", :path => path, :text => line) end |
#post_process_this(event, path) ⇒ Object
def run
376 377 378 379 380 381 382 383 384 |
# File 'lib/logstash/inputs/file.rb', line 376 def post_process_this(event, path) event.set("[@metadata][path]", path) event.set("[@metadata][host]", @host) attempt_set(event, @source_host_field, @host) attempt_set(event, @source_path_field, path) if path decorate(event) @queue.get << event end |
#queue ⇒ Object
405 406 407 |
# File 'lib/logstash/inputs/file.rb', line 405 def queue @queue.get end |
#register ⇒ Object
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/logstash/inputs/file.rb', line 257 def register require "addressable/uri" require "digest/md5" @logger.trace("Registering file input", :path => @path) @host = Socket.gethostname.force_encoding(Encoding::UTF_8) # This check is Logstash 5 specific. If the class does not exist, and it # won't in older versions of Logstash, then we need to set it to nil. settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil @filewatch_config = { :exclude => @exclude, :stat_interval => @stat_interval, :discover_interval => @discover_interval, :sincedb_write_interval => @sincedb_write_interval, :delimiter => @delimiter, :ignore_older => @ignore_older, :close_older => @close_older, :max_open_files => @max_open_files, :sincedb_clean_after => @sincedb_clean_after, :file_chunk_count => @file_chunk_count, :file_chunk_size => @file_chunk_size, :file_sort_by => @file_sort_by, :file_sort_direction => @file_sort_direction, :exit_after_read => @exit_after_read, :check_archive_validity => @check_archive_validity, } @path.each do |path| if Pathname.new(path).relative? raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}") end end if @sincedb_path.nil? base_sincedb_path = build_sincedb_base_from_settings(settings) || build_sincedb_base_from_env @sincedb_path = build_random_sincedb_filename(base_sincedb_path) @logger.info('No sincedb_path set, generating one based on the "path" setting', :sincedb_path => @sincedb_path.to_s, :path => @path) else @sincedb_path = Pathname.new(@sincedb_path) if @sincedb_path.directory? raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") end end @filewatch_config[:sincedb_path] = @sincedb_path @filewatch_config[:start_new_files_at] = @start_position.to_sym if @file_completed_action.include?('log') if @file_completed_log_path.nil? raise ArgumentError.new('The "file_completed_log_path" setting must be provided when the "file_completed_action" is set to "log" or "log_and_delete"') else @file_completed_log_path = Pathname.new(@file_completed_log_path) unless @file_completed_log_path.exist? begin FileUtils.touch(@file_completed_log_path) rescue raise ArgumentError.new("The \"file_completed_log_path\" file can't be created: #{@file_completed_log_path}") end end end end if tail_mode? if @exit_after_read raise ArgumentError.new('The "exit_after_read" setting only works when the "mode" is set to "read"') end @watcher_class = FileWatch::ObservingTail else @watcher_class = FileWatch::ObservingRead end @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) @completely_stopped = Concurrent::AtomicBoolean.new @queue = Concurrent::AtomicReference.new @source_host_field = ecs_select[disabled: 'host', v1:'[host][name]'] @source_path_field = ecs_select[disabled: 'path', v1:'[log][file][path]'] end |
#run(queue) ⇒ Object
367 368 369 370 371 372 373 374 |
# File 'lib/logstash/inputs/file.rb', line 367 def run(queue) start_processing @queue.set queue @watcher.subscribe(self) # halts here until quit is called # last action of the subscribe call is to write the sincedb exit_flush @completely_stopped.make_true end |
#start_processing ⇒ Object
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/logstash/inputs/file.rb', line 347 def start_processing # if the pipeline restarts this input, # make sure previous files are closed stop @watcher = @watcher_class.new(@filewatch_config) @completed_file_handlers = [] if read_mode? if @file_completed_action.include?('log') @completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path) end if @file_completed_action.include?('delete') @completed_file_handlers << DeleteCompletedFileHandler.new(@watcher.watch) end end @path.each { |path| @watcher.watch_this(path) } end |
#stop ⇒ Object
397 398 399 400 401 402 |
# File 'lib/logstash/inputs/file.rb', line 397 def stop unless @watcher.nil? @codec.close @watcher.quit end end |