Class: Fluent::GoogleCloudStorageOut

Inherits:
TimeSlicedOutput
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/out_google_cloud_storage_out.rb

Constant Summary collapse

Storage =
Google::Apis::StorageV1
ServiceAccountCredentials =
Google::Auth::ServiceAccountCredentials
SUPPORTED_COMPRESS =
{
  'gz' => :gz,
  'gzip' => :gzip,
}

Instance Method Summary collapse

Constructor Details

#initializeGoogleCloudStorageOut

Returns a new instance of GoogleCloudStorageOut.



52
53
54
55
56
57
58
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 52

def initialize
  super
  require 'zlib'
  require 'net/http'
  require 'time'
  require 'mime-types'
end

Instance Method Details

#configure(conf) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 65

def configure(conf)
  if conf['path']
    if conf['path'].index('%S')
      conf['time_slice_format'] = '%Y%m%d%H%M%S'
    elsif conf['path'].index('%M')
      conf['time_slice_format'] = '%Y%m%d%H%M'
    elsif conf['path'].index('%H')
      conf['time_slice_format'] = '%Y%m%d%H'
    end
  end

  super

  @formatter = Plugin.new_formatter(@format)
  @formatter.configure(conf)
  prepare_client()
end

#format(tag, time, record) ⇒ Object



102
103
104
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 102

def format(tag, time, record)
  @formatter.format(tag, time, record)
end

#path_format(chunk_key) ⇒ Object



106
107
108
109
110
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 106

def path_format(chunk_key)
  path = Time.strptime(chunk_key, @time_slice_format).strftime(@path)
  log.debug "GCS Path: #{path}"
  path
end

#prepare_clientObject



83
84
85
86
87
88
89
90
91
92
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 83

def prepare_client
  @storage = Storage::StorageService.new
  scopes = [Storage::AUTH_CLOUD_PLATFORM, Storage::AUTH_DEVSTORAGE_FULL_CONTROL]
  @storage.authorization = ServiceAccountCredentials.make_creds(
    {
      :json_key_io => File.open(),
      :scope => scopes
    }
  )
end

#send(path, data) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 112

def send(path, data)
  mimetype = MIME::Types.type_for(path).first

  io = nil
  if SUPPORTED_COMPRESS.include?(@compress)
    io = StringIO.new("")
    writer = Zlib::GzipWriter.new(io)
    writer.write(data)
    writer.finish
    io.rewind
  else
    io = StringIO.new(data)
  end

  @storage.insert_object(@bucket_id, upload_source: io, name: path, content_type:mimetype_content_type)
end

#shutdownObject



98
99
100
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 98

def shutdown
  super
end

#startObject



94
95
96
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 94

def start
  super
end

#write(chunk) ⇒ Object



129
130
131
132
133
# File 'lib/fluent/plugin/out_google_cloud_storage_out.rb', line 129

def write(chunk)
  gcs_path = path_format(chunk.key)
  send(gcs_path, chunk.read)
  gcs_path
end