Class: LogStash::Outputs::S3::Uploader

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/s3/uploader.rb

Constant Summary collapse

DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new({
  :min_threads => 1,
  :max_threads => 8,
  :max_queue => 1,
  :fallback_policy => :caller_runs
})

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL, retry_count: Float::INFINITY, retry_delay: 1) ⇒ Uploader

Returns a new instance of Uploader.



19
20
21
22
23
24
25
# File 'lib/logstash/outputs/s3/uploader.rb', line 19

def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL, retry_count: Float::INFINITY, retry_delay: 1)
  @bucket = bucket
  @workers_pool = threadpool
  @logger = logger
  @retry_count = retry_count
  @retry_delay = retry_delay
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



17
18
19
# File 'lib/logstash/outputs/s3/uploader.rb', line 17

def bucket
  @bucket
end

#loggerObject (readonly)

Returns the value of attribute logger.



17
18
19
# File 'lib/logstash/outputs/s3/uploader.rb', line 17

def logger
  @logger
end

#upload_optionsObject (readonly)

Returns the value of attribute upload_options.



17
18
19
# File 'lib/logstash/outputs/s3/uploader.rb', line 17

def upload_options
  @upload_options
end

Instance Method Details

#stopObject



68
69
70
71
# File 'lib/logstash/outputs/s3/uploader.rb', line 68

def stop
  @workers_pool.shutdown
  @workers_pool.wait_for_termination(nil) # block until its done
end

#upload(file, options = {}) ⇒ Object

uploads a TemporaryFile to S3



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/logstash/outputs/s3/uploader.rb', line 35

def upload(file, options = {})
  upload_options = options.fetch(:upload_options, {})

  tries = 0
  begin
    obj = bucket.object(file.key)
    obj.upload_file(file.path, upload_options)
  rescue Errno::ENOENT => e
    logger.error("File doesn't exist! Unrecoverable error.", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
  rescue => e
    # When we get here it usually mean that S3 tried to do some retry by himself (default is 3)
    # When the retry limit is reached or another error happen we will wait and retry.
    #
    # Thread might be stuck here, but I think its better than losing anything
    # its either a transient errors or something bad really happened.
    if tries < @retry_count
      tries += 1
      logger.warn("Uploading failed, retrying (##{tries} of #{@retry_count})", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
      sleep @retry_delay
      retry
    else
      logger.error("Failed to upload file (retried #{@retry_count} times).", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
    end
  end

  begin
    options[:on_complete].call(file) unless options[:on_complete].nil?
  rescue => e
    logger.error("An error occurred in the `on_complete` uploader", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
    raise e # reraise it since we don't deal with it now
  end
end

#upload_async(file, options = {}) ⇒ Object



27
28
29
30
31
32
# File 'lib/logstash/outputs/s3/uploader.rb', line 27

def upload_async(file, options = {})
  @workers_pool.post do
    LogStash::Util.set_thread_name("S3 output uploader, file: #{file.path}")
    upload(file, options)
  end
end