Class: Fluent::GCSOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_gcs.rb

Constant Summary collapse

MAX_HEX_RANDOM_LENGTH =
32

Instance Method Summary collapse

Constructor Details

#initializeGCSOutput

Returns a new instance of GCSOutput.



12
13
14
15
# File 'lib/fluent/plugin/out_gcs.rb', line 12

def initialize
  super
  require "google/cloud/storage"
end

Instance Method Details

#configure(conf) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/out_gcs.rb', line 56

def configure(conf)
  super

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less."
  end

  # The customer-supplied, AES-256 encryption key that will be used to encrypt the file.
  @encryption_opts = {
    encryption_key: @encryption_key,
  }

  if @object_metadata
    @object_metadata_hash = @object_metadata.map {|m| [m.key, m.value] }.to_h
  end

  @formatter = Fluent::Plugin.new_formatter(@format)
  @formatter.configure(conf)

  @object_creator = Fluent::GCS.discovered_object_creator(@store_as, transcoding: @transcoding)
end

#format(tag, time, record) ⇒ Object



91
92
93
# File 'lib/fluent/plugin/out_gcs.rb', line 91

def format(tag, time, record)
  @formatter.format(tag, time, record)
end

#startObject



78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/out_gcs.rb', line 78

def start
  @gcs = Google::Cloud::Storage.new(
    project: @project,
    keyfile: @keyfile,
    retries: @client_retries,
    timeout: @client_timeout
  )
  @gcs_bucket = @gcs.bucket(@bucket)

  ensure_bucket
  super
end

#write(chunk) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/out_gcs.rb', line 95

def write(chunk)
  path = generate_path(chunk)

  @object_creator.create(chunk) do |obj|
    opts = {
      metadata: @object_metadata_hash,
      acl: @acl,
      storage_class: @storage_class,
      content_type: @object_creator.content_type,
      content_encoding: @object_creator.content_encoding,
    }
    opts.merge!(@encryption_opts)

    log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" }
    @gcs_bucket.upload_file(obj.path, path, opts)
  end
end