Class: Fluent::GcloudPubSub::Subscriber
- Inherits:
-
Object
- Object
- Fluent::GcloudPubSub::Subscriber
- Defined in:
- lib/fluent/plugin/gcloud_pubsub/client.rb
Instance Method Summary collapse
- #acknowledge(messages) ⇒ Object
-
#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #pull(immediate, max) ⇒ Object
Constructor Details
#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber
Returns a new instance of Subscriber.
128 129 130 131 132 133 134 135 136 137 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 128 def initialize(project, key, topic_name, subscription_name) pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key if topic_name.nil? @client = pubsub.subscription subscription_name else topic = pubsub.topic topic_name @client = topic.subscription subscription_name end raise Error, "subscription:#{subscription_name} does not exist." if @client.nil? end |
Instance Method Details
#acknowledge(messages) ⇒ Object
145 146 147 148 149 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 145 def acknowledge() @client.acknowledge rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => e raise RetryableError, "Google acknowledge api returns error:#{e.class} message:#{e}" end |
#pull(immediate, max) ⇒ Object
139 140 141 142 143 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 139 def pull(immediate, max) @client.pull immediate: immediate, max: max rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => e raise RetryableError, "Google pull api returns error:#{e.class} message:#{e}" end |