Class: Fluent::GcloudPubSub::Publisher
- Inherits:
-
Object
- Object
- Fluent::GcloudPubSub::Publisher
- Defined in:
- lib/fluent/plugin/gcloud_pubsub/client.rb
Instance Method Summary collapse
-
#initialize(project, key, autocreate_topic, dest_project, endpoint, timeout) ⇒ Publisher
constructor
A new instance of Publisher.
- #publish(topic_name, messages) ⇒ Object
- #topic(topic_name) ⇒ Object
Constructor Details
#initialize(project, key, autocreate_topic, dest_project, endpoint, timeout) ⇒ Publisher
Returns a new instance of Publisher.
27 28 29 30 31 32 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 27 def initialize(project, key, autocreate_topic, dest_project, endpoint, timeout) @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key, endpoint: endpoint, timeout: timeout @autocreate_topic = autocreate_topic @dest_project = dest_project @topics = {} end |
Instance Method Details
#publish(topic_name, messages) ⇒ Object
53 54 55 56 57 58 59 60 61 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 53 def publish(topic_name, ) topic(topic_name).publish do |batch| .each do |m| batch.publish m., m.attributes end end rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}" end |
#topic(topic_name) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 34 def topic(topic_name) return @topics[topic_name] if @topics.has_key? topic_name if @dest_project.nil? client = @pubsub.topic topic_name if client.nil? && @autocreate_topic client = @pubsub.create_topic topic_name end else client = @pubsub.topic topic_name, project: @dest_project end if client.nil? raise Error.new "topic:#{topic_name} does not exist." end @topics[topic_name] = client client end |