Class: Fluent::GcloudPubSub::Subscriber
- Inherits:
-
Object
- Object
- Fluent::GcloudPubSub::Subscriber
- Defined in:
- lib/fluent/plugin/gcloud_pubsub/client.rb
Constant Summary collapse
- RETRY_COUNT =
5
- RETRYABLE_ERRORS =
[Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError]
Instance Method Summary collapse
- #acknowledge(messages) ⇒ Object
-
#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #pull(immediate, max) ⇒ Object
Constructor Details
#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber
Returns a new instance of Subscriber.
39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 39 def initialize(project, key, topic_name, subscription_name) Retryable.retryable(tries: RETRY_COUNT, on: RETRYABLE_ERRORS) do pubsub = Google::Cloud::Pubsub.new project: project, keyfile: key topic = pubsub.topic topic_name @client = topic.subscription subscription_name end raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil? end |
Instance Method Details
#acknowledge(messages) ⇒ Object
54 55 56 57 58 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 54 def acknowledge() @client.acknowledge rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google acknowledge api returns error:#{ex.class.to_s} message:#{ex.to_s}" end |
#pull(immediate, max) ⇒ Object
48 49 50 51 52 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 48 def pull(immediate, max) @client.pull immediate: immediate, max: max rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google pull api returns error:#{ex.class.to_s} message:#{ex.to_s}" end |