Class: Fluent::GcloudPubSub::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/gcloud_pubsub/client.rb

Constant Summary collapse

RETRY_COUNT =
5
RETRYABLE_ERRORS =
[Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError]

Instance Method Summary collapse

Constructor Details

#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber

Returns a new instance of Subscriber.



39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 39

def initialize(project, key, topic_name, subscription_name)
  Retryable.retryable(tries: RETRY_COUNT, on: RETRYABLE_ERRORS) do
    pubsub = Google::Cloud::Pubsub.new project: project, keyfile: key
    topic = pubsub.topic topic_name
    @client = topic.subscription subscription_name
  end
  raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil?
end

Instance Method Details

#acknowledge(messages) ⇒ Object



54
55
56
57
58
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 54

def acknowledge(messages)
  @client.acknowledge messages
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
  raise RetryableError.new "Google acknowledge api returns error:#{ex.class.to_s} message:#{ex.to_s}"
end

#pull(immediate, max) ⇒ Object



48
49
50
51
52
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 48

def pull(immediate, max)
  @client.pull immediate: immediate, max: max
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
  raise RetryableError.new "Google pull api returns error:#{ex.class.to_s} message:#{ex.to_s}"
end