Class: Fluent::GcloudPubSubOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_gcloud_pubsub.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


28
29
30
31
32
# File 'lib/fluent/plugin/out_gcloud_pubsub.rb', line 28

def configure(conf)
  super

  raise Fluent::ConfigError, "'topic' must be specified." unless @topic
end

#format(tag, time, record) ⇒ Object



41
42
43
# File 'lib/fluent/plugin/out_gcloud_pubsub.rb', line 41

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#startObject



34
35
36
37
38
39
# File 'lib/fluent/plugin/out_gcloud_pubsub.rb', line 34

def start
  super

  pubsub = (Gcloud.new @project, @key).pubsub
  @client = pubsub.topic @topic, autocreate: @autocreate_topic
end

#write(chunk) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_gcloud_pubsub.rb', line 45

def write(chunk)
  messages = []

  chunk.msgpack_each do |tag, time, record|
    messages << record.to_json
  end

  if messages.length > 0
    @client.publish do |batch|
      messages.each do |m|
        batch.publish m
      end
    end
  end
rescue => e
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
  raise e
end