Class: Fluent::GcloudPubSub::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/gcloud_pubsub/client.rb

Constant Summary collapse

RETRY_COUNT =
5
RETRYABLE_ERRORS =
[Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError]

Instance Method Summary collapse

Constructor Details

#initialize(project, key, topic_name, skip_lookup) ⇒ Publisher

autocreate_topic is unused



16
17
18
19
20
21
22
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 16

def initialize(project, key, topic_name, skip_lookup)
  Retryable.retryable(tries: RETRY_COUNT, on: RETRYABLE_ERRORS) do
    pubsub = Google::Cloud::Pubsub.new project: project, keyfile: key
    @client = pubsub.topic topic_name, skip_lookup: skip_lookup
  end
  raise Error.new "topic:#{topic_name} does not exist." if @client.nil?
end

Instance Method Details

#publish(messages) ⇒ Object



24
25
26
27
28
29
30
31
32
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 24

def publish(messages)
  @client.publish do |batch|
    messages.each do |m|
      batch.publish m
    end
  end
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
  raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}"
end