Class: Fluent::GcloudPubSub::Subscriber
- Inherits:
-
Object
- Object
- Fluent::GcloudPubSub::Subscriber
- Defined in:
- lib/fluent/plugin/gcloud_pubsub/client.rb
Instance Method Summary collapse
- #acknowledge(messages) ⇒ Object
-
#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #pull(immediate, max) ⇒ Object
Constructor Details
#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber
Returns a new instance of Subscriber.
60 61 62 63 64 65 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 60 def initialize(project, key, topic_name, subscription_name) pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key topic = pubsub.topic topic_name @client = topic.subscription subscription_name raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil? end |
Instance Method Details
#acknowledge(messages) ⇒ Object
73 74 75 76 77 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 73 def acknowledge() @client.acknowledge rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google acknowledge api returns error:#{ex.class.to_s} message:#{ex.to_s}" end |
#pull(immediate, max) ⇒ Object
67 68 69 70 71 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 67 def pull(immediate, max) @client.pull immediate: immediate, max: max rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google pull api returns error:#{ex.class.to_s} message:#{ex.to_s}" end |