Class: Fluent::GcloudPubSub::Subscriber
- Inherits:
-
Object
- Object
- Fluent::GcloudPubSub::Subscriber
- Defined in:
- lib/fluent/plugin/gcloud_pubsub/client.rb
Instance Method Summary collapse
- #acknowledge(messages) ⇒ Object
-
#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #pull(immediate, max) ⇒ Object
Constructor Details
#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber
Returns a new instance of Subscriber.
60 61 62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 60 def initialize(project, key, topic_name, subscription_name) pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key if topic_name.nil? @client = pubsub.subscription subscription_name else topic = pubsub.topic topic_name @client = topic.subscription subscription_name end raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil? end |
Instance Method Details
#acknowledge(messages) ⇒ Object
77 78 79 80 81 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 77 def acknowledge() @client.acknowledge rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google acknowledge api returns error:#{ex.class.to_s} message:#{ex.to_s}" end |
#pull(immediate, max) ⇒ Object
71 72 73 74 75 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 71 def pull(immediate, max) @client.pull immediate: immediate, max: max rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google pull api returns error:#{ex.class.to_s} message:#{ex.to_s}" end |