Class: LogStash::Outputs::S3::Uploader

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/s3/uploader.rb

Constant Summary collapse

DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new({
  :min_threads => 1,
  :max_threads => 8,
  :max_queue => 1,
  :fallback_policy => :caller_runs
})

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL, retry_count: Float::INFINITY, retry_delay: 1) ⇒ Uploader

Returns a new instance of Uploader.



21
22
23
24
25
26
27
# File 'lib/logstash/outputs/s3/uploader.rb', line 21

def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL, retry_count: Float::INFINITY, retry_delay: 1)
  @bucket = bucket
  @workers_pool = threadpool
  @logger = logger
  @retry_count = retry_count
  @retry_delay = retry_delay
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



19
20
21
# File 'lib/logstash/outputs/s3/uploader.rb', line 19

def bucket
  @bucket
end

#loggerObject (readonly)

Returns the value of attribute logger.



19
20
21
# File 'lib/logstash/outputs/s3/uploader.rb', line 19

def logger
  @logger
end

#upload_optionsObject (readonly)

Returns the value of attribute upload_options.



19
20
21
# File 'lib/logstash/outputs/s3/uploader.rb', line 19

def upload_options
  @upload_options
end

Instance Method Details

#stopObject



74
75
76
77
# File 'lib/logstash/outputs/s3/uploader.rb', line 74

def stop
  @workers_pool.shutdown
  @workers_pool.wait_for_termination(nil) # block until its done
end

#upload(file, options = {}) ⇒ Object

uploads a TemporaryFile to S3



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/logstash/outputs/s3/uploader.rb', line 37

def upload(file, options = {})
  upload_options = options.fetch(:upload_options, {})

  zstd_compressed_file = "#{SecureRandom.uuid}.json.zst"

  compressed = Zstd.compress_file(file.path, zstd_compressed_file)

  tries = 0
  begin
    obj = bucket.object(compressed.key)
    obj.upload_file(compressed.path, upload_options)
  rescue Errno::ENOENT => e
    logger.error("File doesn't exist! Unrecoverable error.", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
  rescue => e
    # When we get here it usually mean that S3 tried to do some retry by himself (default is 3)
    # When the retry limit is reached or another error happen we will wait and retry.
    #
    # Thread might be stuck here, but I think its better than losing anything
    # its either a transient errors or something bad really happened.
    if tries < @retry_count
      tries += 1
      logger.warn("Uploading failed, retrying (##{tries} of #{@retry_count})", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
      sleep @retry_delay
      retry
    else
      logger.error("Failed to upload file (retried #{@retry_count} times).", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
    end
  end

  begin
    options[:on_complete].call(compressed) unless options[:on_complete].nil?
  rescue => e
    logger.error("An error occurred in the `on_complete` uploader", :exception => e.class, :message => e.message, :path => compressed.path, :backtrace => e.backtrace)
    raise e # reraise it since we don't deal with it now
  end
end

#upload_async(file, options = {}) ⇒ Object



29
30
31
32
33
34
# File 'lib/logstash/outputs/s3/uploader.rb', line 29

def upload_async(file, options = {})
  @workers_pool.post do
    LogStash::Util.set_thread_name("S3 output uploader, file: #{file.path}")
    upload(file, options)
  end
end