Class: Fluent::GcloudPubSub::Publisher
- Inherits:
-
Object
- Object
- Fluent::GcloudPubSub::Publisher
- Defined in:
- lib/fluent/plugin/gcloud_pubsub/client.rb
Instance Method Summary collapse
-
#initialize(project, key, autocreate_topic) ⇒ Publisher
constructor
A new instance of Publisher.
- #publish(topic_name, messages) ⇒ Object
- #topic(topic_name) ⇒ Object
Constructor Details
#initialize(project, key, autocreate_topic) ⇒ Publisher
Returns a new instance of Publisher.
27 28 29 30 31 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 27 def initialize(project, key, autocreate_topic) @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key @autocreate_topic = autocreate_topic @topics = {} end |
Instance Method Details
#publish(topic_name, messages) ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 48 def publish(topic_name, ) topic(topic_name).publish do |batch| .each do |m| batch.publish m., m.attributes end end rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}" end |
#topic(topic_name) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 33 def topic(topic_name) return @topics[topic_name] if @topics.has_key? topic_name client = @pubsub.topic topic_name if client.nil? && @autocreate_topic client = @pubsub.create_topic topic_name end if client.nil? raise Error.new "topic:#{topic_name} does not exist." end @topics[topic_name] = client client end |