Class: Fluent::GoogleCloudStorageOut
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::GoogleCloudStorageOut
- Includes:
- Mixin::ConfigPlaceholders
- Defined in:
- lib/fluent/plugin/out_google_cloud_storage_out.rb
Constant Summary collapse
- Storage =
Google::Apis::StorageV1
- ServiceAccountCredentials =
Google::Auth::ServiceAccountCredentials
- SUPPORTED_COMPRESS =
{ 'gz' => :gz, 'gzip' => :gzip, }
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ GoogleCloudStorageOut
constructor
A new instance of GoogleCloudStorageOut.
- #path_format(chunk_key) ⇒ Object
- #prepare_client ⇒ Object
- #send(path, data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GoogleCloudStorageOut
Returns a new instance of GoogleCloudStorageOut.
52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 52 def initialize super require 'zlib' require 'net/http' require 'time' require 'mime-types' end |
Instance Method Details
#configure(conf) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 65 def configure(conf) if conf['path'] if conf['path'].index('%S') conf['time_slice_format'] = '%Y%m%d%H%M%S' elsif conf['path'].index('%M') conf['time_slice_format'] = '%Y%m%d%H%M' elsif conf['path'].index('%H') conf['time_slice_format'] = '%Y%m%d%H' end end super @formatter = Plugin.new_formatter(@format) @formatter.configure(conf) prepare_client() end |
#format(tag, time, record) ⇒ Object
102 103 104 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 102 def format(tag, time, record) @formatter.format(tag, time, record) end |
#path_format(chunk_key) ⇒ Object
106 107 108 109 110 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 106 def path_format(chunk_key) path = Time.strptime(chunk_key, @time_slice_format).strftime(@path) log.debug "GCS Path: #{path}" path end |
#prepare_client ⇒ Object
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 83 def prepare_client @storage = Storage::StorageService.new scopes = [Storage::AUTH_CLOUD_PLATFORM, Storage::AUTH_DEVSTORAGE_FULL_CONTROL] @storage. = ServiceAccountCredentials.make_creds( { :json_key_io => File.open(@service_account_json_key_path), :scope => scopes } ) end |
#send(path, data) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 112 def send(path, data) mimetype = MIME::Types.type_for(path).first io = nil if SUPPORTED_COMPRESS.include?(@compress) io = StringIO.new("") writer = Zlib::GzipWriter.new(io) writer.write(data) writer.finish io.rewind else io = StringIO.new(data) end @storage.insert_object(@bucket_id, upload_source: io, name: path, content_type:mimetype_content_type) end |
#shutdown ⇒ Object
98 99 100 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 98 def shutdown super end |
#start ⇒ Object
94 95 96 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 94 def start super end |
#write(chunk) ⇒ Object
129 130 131 132 133 |
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 129 def write(chunk) gcs_path = path_format(chunk.key) send(gcs_path, chunk.read) gcs_path end |