Class: Fluent::GcloudPubSub::MessageUnpacker

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/gcloud_pubsub/client.rb

Class Method Summary collapse

Class Method Details

.unpack(message) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 153

def self.unpack(message)
  attributes = message.attributes
  algorithm = attributes["compression_algorithm"]

  case algorithm
  when nil
    # For an uncompressed message return the single line and attributes
    [[message.message.data.chomp, message.attributes]]
  when COMPRESSION_ALGORITHM_ZLIB
    # Return all of the lines in the message, with empty attributes
    Zlib::Inflate
      .inflate(message.message.data)
      .split(BATCHED_RECORD_SEPARATOR)
      .map { |line| [line, {}] }
  else
    raise ArgumentError, "unknown compression algorithm: '#{algorithm}'"
  end
end