Class: Fluent::GcloudPubSub::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/gcloud_pubsub/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber

Returns a new instance of Subscriber.

Raises:



128
129
130
131
132
133
134
135
136
137
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 128

def initialize(project, key, topic_name, subscription_name)
  pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key
  if topic_name.nil?
    @client = pubsub.subscription subscription_name
  else
    topic = pubsub.topic topic_name
    @client = topic.subscription subscription_name
  end
  raise Error, "subscription:#{subscription_name} does not exist." if @client.nil?
end

Instance Method Details

#acknowledge(messages) ⇒ Object



145
146
147
148
149
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 145

def acknowledge(messages)
  @client.acknowledge messages
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => e
  raise RetryableError, "Google acknowledge api returns error:#{e.class} message:#{e}"
end

#pull(immediate, max) ⇒ Object



139
140
141
142
143
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 139

def pull(immediate, max)
  @client.pull immediate: immediate, max: max
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => e
  raise RetryableError, "Google pull api returns error:#{e.class} message:#{e}"
end