Class: LogStash::Outputs::S3

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/outputs/s3.rb,
lib/logstash/outputs/s3/uploader.rb,
lib/logstash/outputs/s3/path_validator.rb,
lib/logstash/outputs/s3/temporary_file.rb,
lib/logstash/outputs/s3/file_repository.rb,
lib/logstash/outputs/s3/size_rotation_policy.rb,
lib/logstash/outputs/s3/time_rotation_policy.rb,
lib/logstash/outputs/s3/temporary_file_factory.rb,
lib/logstash/outputs/s3/writable_directory_validator.rb,
lib/logstash/outputs/s3/size_and_time_rotation_policy.rb,
lib/logstash/outputs/s3/write_bucket_permission_validator.rb

Overview

INFORMATION:

This plugin batches and uploads logstash events into Amazon Simple Storage Service (Amazon S3).

Requirements:

  • Amazon S3 Bucket and S3 Access Permissions (Typically access_key_id and secret_access_key)

  • S3 PutObject permission

S3 outputs create temporary files into the OS’ temporary directory, you can specify where to save them using the ‘temporary_directory` option.

S3 output files have the following format

ls.s3.312bc026-2f5d-49bc-ae9f-5940cf4ad9a6.2013-04-18T10.00.tag_hello.part0.txt

|======= | ls.s3 | indicate logstash plugin s3 | | 312bc026-2f5d-49bc-ae9f-5940cf4ad9a6 | a new, random uuid per file. | | 2013-04-18T10.00 | represents the time whenever you specify time_file. | | tag_hello | this indicates the event’s tag. | | part0 | this means if you indicate size_file then it will generate more parts if you file.size > size_file. When a file is full it will be pushed to the bucket and then deleted from the temporary directory. If a file is empty, it is simply deleted. Empty files will not be pushed | |=======

Crash Recovery:

  • This plugin will recover and upload temporary log files after crash/abnormal termination when using ‘restore` set to true

Note regarding time_file and size_file

:

Both time_file and size_file settings can trigger a log “file rotation” A log rotation pushes the current log “part” to s3 and deleted from local temporary storage.

If you specify BOTH size_file and time_file then it will create file for each tag (if specified). When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered.

If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created. When time_file minutes elapses, a log rotation will be triggered.

If you ONLY specify size_file, but NOT time_file, one files for each tag (if specified) will be created. When size of log file part > size_file, a log rotation will be triggered.

If NEITHER size_file nor time_file is specified, ONLY one file for each tag (if specified) will be created. WARNING: Since no log rotation is triggered, S3 Upload will only occur when logstash restarts.

#### Usage: This is an example of logstash config:

source,ruby

output {

s3{
  access_key_id => "crazy_key"             (required)
  secret_access_key => "monkey_access_key" (required)
  region => "eu-west-1"                    (optional, default = "us-east-1")
  bucket => "your_bucket"                  (required)
  size_file => 2048                        (optional) - Bytes
  time_file => 5                           (optional) - Minutes
  codec => "plain"                         (optional)
  canned_acl => "private"                  (optional. Options are "private", "public-read", "public-read-write", "authenticated-read", "aws-exec-read", "bucket-owner-read", "bucket-owner-full-control", "log-delivery-write". Defaults to "private" )
}

Defined Under Namespace

Classes: FileRepository, PathValidator, SizeAndTimeRotationPolicy, SizeRotationPolicy, TemporaryFile, TemporaryFileFactory, TimeRotationPolicy, Uploader, WritableDirectoryValidator, WriteBucketPermissionValidator

Constant Summary collapse

PREFIX_KEY_NORMALIZE_CHARACTER =
"_"
PERIODIC_CHECK_INTERVAL_IN_SECONDS =
15
CRASH_RECOVERY_THREADPOOL =
Concurrent::ThreadPoolExecutor.new({
  :min_threads => 1,
  :max_threads => 2,
  :fallback_policy => :caller_runs
})
GZIP_ENCODING =
"gzip"

Instance Method Summary collapse

Instance Method Details

#closeObject



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/logstash/outputs/s3.rb', line 264

def close
  stop_periodic_check if @rotation.needs_periodic?

  @logger.debug("Uploading current workspace")

  @file_repository.shutdown # stop stale sweeps

  # The plugin has stopped receiving new events, but we still have
  # data on disk, lets make sure it get to S3.
  # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
  # the content in the temporary directly and upload it.
  # This will block the shutdown until all upload are done or the use force quit.
  @file_repository.each_files do |file|
    upload_file(file)
  end

  @uploader.stop # wait until all the current upload are complete
  @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
end

#full_optionsObject



284
285
286
287
288
# File 'lib/logstash/outputs/s3.rb', line 284

def full_options
  options = aws_options_hash || {}
  options[:signature_version] = @signature_version if @signature_version
  symbolized_settings.merge(options)
end

#multi_receive_encoded(events_and_encoded) ⇒ Object



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/logstash/outputs/s3.rb', line 243

def multi_receive_encoded(events_and_encoded)
  prefix_written_to = Set.new

  events_and_encoded.each do |event, encoded|
    prefix_key = normalize_key(event.sprintf(@prefix))
    prefix_written_to << prefix_key

    begin
      @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
      # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
      # Log the error and rethrow it.
    rescue Errno::ENOSPC => e
      @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
      raise e
    end
  end

  # Groups IO calls to optimize fstat checks
  rotate_if_needed(prefix_written_to)
end

#normalize_key(prefix_key) ⇒ Object



309
310
311
# File 'lib/logstash/outputs/s3.rb', line 309

def normalize_key(prefix_key)
  prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
end

#registerObject



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/logstash/outputs/s3.rb', line 201

def register
  # I've move the validation of the items into custom classes
  # to prepare for the new config validation that will be part of the core so the core can
  # be moved easily.
  unless @prefix.empty?
    if !PathValidator.valid?(prefix)
      raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}"
    end
  end

  if !WritableDirectoryValidator.valid?(@temporary_directory)
    raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}"
  end

  if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.new(@logger).valid?(bucket_resource, upload_options)
    raise LogStash::ConfigurationError, "Logstash must have the privileges to write to root bucket `#{@bucket}`, check your credentials or your permissions."
  end

  if @time_file.nil? && @size_file.nil? || @size_file == 0 && @time_file == 0
    raise LogStash::ConfigurationError, "The S3 plugin must have at least one of time_file or size_file set to a value greater than 0"
  end

  @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

  @rotation = rotation_strategy

  executor = Concurrent::ThreadPoolExecutor.new({ :min_threads => 1,
                                                  :max_threads => @upload_workers_count,
                                                  :max_queue => @upload_queue_size,
                                                  :fallback_policy => :caller_runs })

  @uploader = Uploader.new(bucket_resource, @logger, executor, retry_count: @retry_count, retry_delay: @retry_delay)

  # Restoring from crash will use a new threadpool to slowly recover
  # New events should have more priority.
  restore_from_crash if @restore

  # If we need time based rotation we need to do periodic check on the file
  # to take care of file that were not updated recently
  start_periodic_check if @rotation.needs_periodic?
end

#symbolize_keys_and_cast_true_false(hash) ⇒ Object



294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/logstash/outputs/s3.rb', line 294

def symbolize_keys_and_cast_true_false(hash)
  case hash
  when Hash
    symbolized = {}
    hash.each { |key, value| symbolized[key.to_sym] = symbolize_keys_and_cast_true_false(value) }
    symbolized
  when 'true'
    true
  when 'false'
    false
  else
    hash
  end
end

#symbolized_settingsObject



290
291
292
# File 'lib/logstash/outputs/s3.rb', line 290

def symbolized_settings
  @symbolized_settings ||= symbolize_keys_and_cast_true_false(@additional_settings)
end

#upload_optionsObject



313
314
315
316
317
318
319
320
321
322
# File 'lib/logstash/outputs/s3.rb', line 313

def upload_options
  {
    :acl => @canned_acl,
    :server_side_encryption => @server_side_encryption ? @server_side_encryption_algorithm : nil,
    :ssekms_key_id => @server_side_encryption_algorithm == "aws:kms" ? @ssekms_key_id : nil,
    :storage_class => @storage_class,
    :content_encoding => @encoding == GZIP_ENCODING ? GZIP_ENCODING : nil,
    :multipart_threshold => @upload_multipart_threshold
  }
end