Class: Fluent::GcloudPubSub::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/gcloud_pubsub/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(project, key, autocreate_topic) ⇒ Publisher

Returns a new instance of Publisher.



27
28
29
30
31
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 27

def initialize(project, key, autocreate_topic)
  @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key
  @autocreate_topic = autocreate_topic
  @topics = {}
end

Instance Method Details

#publish(topic_name, messages) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 48

def publish(topic_name, messages)
  topic(topic_name).publish do |batch|
    messages.each do |m|
      batch.publish m.message, m.attributes
    end
  end
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
  raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}"
end

#topic(topic_name) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 33

def topic(topic_name)
  return @topics[topic_name] if @topics.has_key? topic_name

  client = @pubsub.topic topic_name
  if client.nil? && @autocreate_topic
    client = @pubsub.create_topic topic_name
  end
  if client.nil?
    raise Error.new "topic:#{topic_name} does not exist."
  end

  @topics[topic_name] = client
  client
end