Class: Fluent::GCSOutput
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::GCSOutput
- Defined in:
- lib/fluent/plugin/out_gcs.rb
Constant Summary collapse
- MAX_HEX_RANDOM_LENGTH =
32
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ GCSOutput
constructor
A new instance of GCSOutput.
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GCSOutput
Returns a new instance of GCSOutput.
12 13 14 15 |
# File 'lib/fluent/plugin/out_gcs.rb', line 12 def initialize super require "google/cloud/storage" end |
Instance Method Details
#configure(conf) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fluent/plugin/out_gcs.rb', line 56 def configure(conf) super if @hex_random_length > MAX_HEX_RANDOM_LENGTH raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less." end # The customer-supplied, AES-256 encryption key that will be used to encrypt the file. @encryption_opts = { encryption_key: @encryption_key, } if @object_metadata @object_metadata_hash = @object_metadata.map {|m| [m.key, m.value] }.to_h end @formatter = Fluent::Plugin.new_formatter(@format) @formatter.configure(conf) @object_creator = Fluent::GCS.discovered_object_creator(@store_as, transcoding: @transcoding) end |
#format(tag, time, record) ⇒ Object
91 92 93 |
# File 'lib/fluent/plugin/out_gcs.rb', line 91 def format(tag, time, record) @formatter.format(tag, time, record) end |
#start ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/out_gcs.rb', line 78 def start @gcs = Google::Cloud::Storage.new( project: @project, keyfile: @keyfile, retries: @client_retries, timeout: @client_timeout ) @gcs_bucket = @gcs.bucket(@bucket) ensure_bucket super end |
#write(chunk) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/fluent/plugin/out_gcs.rb', line 95 def write(chunk) path = generate_path(chunk) @object_creator.create(chunk) do |obj| opts = { metadata: @object_metadata_hash, acl: @acl, storage_class: @storage_class, content_type: @object_creator.content_type, content_encoding: @object_creator.content_encoding, } opts.merge!(@encryption_opts) log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" } @gcs_bucket.upload_file(obj.path, path, opts) end end |