Class: Fluent::GcloudPubSub::Subscriber
- Inherits:
-
Object
- Object
- Fluent::GcloudPubSub::Subscriber
- Defined in:
- lib/fluent/plugin/gcloud_pubsub/client.rb
Instance Method Summary collapse
- #acknowledge(messages) ⇒ Object
-
#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #pull(immediate, max) ⇒ Object
Constructor Details
#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber
Returns a new instance of Subscriber.
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 65 def initialize(project, key, topic_name, subscription_name) pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key if topic_name.nil? @client = pubsub.subscription subscription_name else topic = pubsub.topic topic_name @client = topic.subscription subscription_name end raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil? end |
Instance Method Details
#acknowledge(messages) ⇒ Object
82 83 84 85 86 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 82 def acknowledge() @client.acknowledge rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google acknowledge api returns error:#{ex.class.to_s} message:#{ex.to_s}" end |
#pull(immediate, max) ⇒ Object
76 77 78 79 80 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 76 def pull(immediate, max) @client.pull immediate: immediate, max: max rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google pull api returns error:#{ex.class.to_s} message:#{ex.to_s}" end |