Class: S3Downloader
- Inherits:
-
Object
- Object
- S3Downloader
- Defined in:
- lib/logstash/inputs/s3/downloader.rb
Instance Method Summary collapse
- #cleanup_local_object(record) ⇒ Object
- #cleanup_s3object(record) ⇒ Object
- #copy_s3object_to_disk(record) ⇒ Object
-
#initialize(logger, stop_semaphore, options) ⇒ S3Downloader
constructor
A new instance of S3Downloader.
- #stop? ⇒ Boolean
Constructor Details
#initialize(logger, stop_semaphore, options) ⇒ S3Downloader
Returns a new instance of S3Downloader.
7 8 9 10 11 12 13 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 7 def initialize(logger, stop_semaphore, ) @logger = logger @stopped = stop_semaphore @factory = [:s3_client_factory] @delete_on_success = [:delete_on_success] @include_object_properties = [:include_object_properties] end |
Instance Method Details
#cleanup_local_object(record) ⇒ Object
36 37 38 39 40 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 36 def cleanup_local_object(record) FileUtils.remove_entry_secure(record[:local_file], true) if ::File.exists?(record[:local_file]) rescue Exception => e @logger.warn("Could not delete file", :file => record[:local_file], :error => e) end |
#cleanup_s3object(record) ⇒ Object
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 42 def cleanup_s3object(record) return unless @delete_on_success begin @factory.get_s3_client(record[:bucket]) do |s3| s3.delete_object(bucket: record[:bucket], key: record[:key]) end rescue Exception => e @logger.warn("Failed to delete s3 object", :record => record, :error => e) end end |
#copy_s3object_to_disk(record) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 15 def copy_s3object_to_disk(record) # (from docs) WARNING: # yielding data to a block disables retries of networking errors! begin @factory.get_s3_client(record[:bucket]) do |s3| response = s3.get_object( bucket: record[:bucket], key: record[:key], response_target: record[:local_file] ) record[:s3_data] = response.to_h.keep_if { |key| @include_object_properties.include?(key) } end rescue Aws::S3::Errors::ServiceError => e @logger.error("Unable to download file. Requeuing the message", :error => e, :record => record) # prevent sqs message deletion throw :skip_delete end throw :skip_delete if stop? return true end |
#stop? ⇒ Boolean
53 54 55 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 53 def stop? @stopped.value end |