Class: Fluent::GcloudPubSub::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/gcloud_pubsub/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(project, key, topic_name, subscription_name) ⇒ Subscriber

Returns a new instance of Subscriber.



65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 65

def initialize(project, key, topic_name, subscription_name)
  pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key
  if topic_name.nil?
    @client = pubsub.subscription subscription_name
  else
    topic = pubsub.topic topic_name
    @client = topic.subscription subscription_name
  end
  raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil?
end

Instance Method Details

#acknowledge(messages) ⇒ Object



82
83
84
85
86
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 82

def acknowledge(messages)
  @client.acknowledge messages
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
  raise RetryableError.new "Google acknowledge api returns error:#{ex.class.to_s} message:#{ex.to_s}"
end

#pull(immediate, max) ⇒ Object



76
77
78
79
80
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 76

def pull(immediate, max)
  @client.pull immediate: immediate, max: max
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
  raise RetryableError.new "Google pull api returns error:#{ex.class.to_s} message:#{ex.to_s}"
end