Class: Fluent::GcloudPubSub::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/gcloud_pubsub/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(project, key, autocreate_topic, dest_project, endpoint, timeout) ⇒ Publisher

Returns a new instance of Publisher.



27
28
29
30
31
32
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 27

def initialize(project, key, autocreate_topic, dest_project, endpoint, timeout)
  @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key, endpoint: endpoint, timeout: timeout
  @autocreate_topic = autocreate_topic
  @dest_project = dest_project
  @topics = {}
end

Instance Method Details

#publish(topic_name, messages) ⇒ Object



53
54
55
56
57
58
59
60
61
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 53

def publish(topic_name, messages)
  topic(topic_name).publish do |batch|
    messages.each do |m|
      batch.publish m.message, m.attributes
    end
  end
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
  raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}"
end

#topic(topic_name) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 34

def topic(topic_name)
  return @topics[topic_name] if @topics.has_key? topic_name

  if @dest_project.nil?
    client = @pubsub.topic topic_name
    if client.nil? && @autocreate_topic
      client = @pubsub.create_topic topic_name
    end
  else
    client = @pubsub.topic topic_name, project: @dest_project
  end
  if client.nil?
    raise Error.new "topic:#{topic_name} does not exist."
  end

  @topics[topic_name] = client
  client
end