Class: LogStash::Outputs::S3::Uploader
- Inherits:
-
Object
- Object
- LogStash::Outputs::S3::Uploader
- Defined in:
- lib/logstash/outputs/s3/uploader.rb
Constant Summary collapse
- DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new({ :min_threads => 1, :max_threads => 8, :max_queue => 1, :fallback_policy => :caller_runs })
Instance Attribute Summary collapse
-
#bucket ⇒ Object
readonly
Returns the value of attribute bucket.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#upload_options ⇒ Object
readonly
Returns the value of attribute upload_options.
Instance Method Summary collapse
-
#initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL, retry_count: Float::INFINITY, retry_delay: 1) ⇒ Uploader
constructor
A new instance of Uploader.
- #stop ⇒ Object
-
#upload(file, options = {}) ⇒ Object
uploads a TemporaryFile to S3.
- #upload_async(file, options = {}) ⇒ Object
Constructor Details
#initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL, retry_count: Float::INFINITY, retry_delay: 1) ⇒ Uploader
Returns a new instance of Uploader.
19 20 21 22 23 24 25 |
# File 'lib/logstash/outputs/s3/uploader.rb', line 19 def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL, retry_count: Float::INFINITY, retry_delay: 1) @bucket = bucket @workers_pool = threadpool @logger = logger @retry_count = retry_count @retry_delay = retry_delay end |
Instance Attribute Details
#bucket ⇒ Object (readonly)
Returns the value of attribute bucket.
17 18 19 |
# File 'lib/logstash/outputs/s3/uploader.rb', line 17 def bucket @bucket end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
17 18 19 |
# File 'lib/logstash/outputs/s3/uploader.rb', line 17 def logger @logger end |
#upload_options ⇒ Object (readonly)
Returns the value of attribute upload_options.
17 18 19 |
# File 'lib/logstash/outputs/s3/uploader.rb', line 17 def @upload_options end |
Instance Method Details
#stop ⇒ Object
68 69 70 71 |
# File 'lib/logstash/outputs/s3/uploader.rb', line 68 def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) # block until its done end |
#upload(file, options = {}) ⇒ Object
uploads a TemporaryFile to S3
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/logstash/outputs/s3/uploader.rb', line 35 def upload(file, = {}) = .fetch(:upload_options, {}) tries = 0 begin obj = bucket.object(file.key) obj.upload_file(file.path, ) rescue Errno::ENOENT => e logger.error("File doesn't exist! Unrecoverable error.", :exception => e.class, :message => e., :path => file.path, :backtrace => e.backtrace) rescue => e # When we get here it usually mean that S3 tried to do some retry by himself (default is 3) # When the retry limit is reached or another error happen we will wait and retry. # # Thread might be stuck here, but I think its better than losing anything # its either a transient errors or something bad really happened. if tries < @retry_count tries += 1 logger.warn("Uploading failed, retrying (##{tries} of #{@retry_count})", :exception => e.class, :message => e., :path => file.path, :backtrace => e.backtrace) sleep @retry_delay retry else logger.error("Failed to upload file (retried #{@retry_count} times).", :exception => e.class, :message => e., :path => file.path, :backtrace => e.backtrace) end end begin [:on_complete].call(file) unless [:on_complete].nil? rescue => e logger.error("An error occurred in the `on_complete` uploader", :exception => e.class, :message => e., :path => file.path, :backtrace => e.backtrace) raise e # reraise it since we don't deal with it now end end |
#upload_async(file, options = {}) ⇒ Object
27 28 29 30 31 32 |
# File 'lib/logstash/outputs/s3/uploader.rb', line 27 def upload_async(file, = {}) @workers_pool.post do LogStash::Util.set_thread_name("S3 output uploader, file: #{file.path}") upload(file, ) end end |