Class: LogStash::Inputs::GoogleCloudPubSub

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/google_cloud_pubsub.rb

Overview

Stream events from files from a Google Cloud PubSub subscription

Instance Method Summary collapse

Instance Method Details

#registerObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/logstash/inputs/google_cloud_pubsub.rb', line 48

def register
  @logger.info("Registering Google Cloud PubSub input", :project => @project, :keyfile => @keyfile, :topic => @topic,
               :subscription => @subscription)

  @pubsub = Gcloud.new(project=@project, keyfile=@keyfile).pubsub
  _topic = @pubsub.topic(@topic, { :autocreate => @autocreate_topic, :project => @topic_project })

  raise "Topic #{@topic} not found" if not _topic
  @logger.debug("Topic: ", :topic => _topic)

  @sub = _topic.subscription(@subscription)

  if (not @sub or not @sub.exists?)
    if @autocreate_subscription
      @logger.debug("Creating Subscription: ", :subscription => @subscription)
      @sub = _topic.subscribe(@subscription)
    else
      raise "Subscription #{@subscription} not found"
    end
  end

  @logger.debug("Subscription: ", :subscription => @sub)

end

#run(queue) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/logstash/inputs/google_cloud_pubsub.rb', line 74

def run(queue)
  while !stop do
    msgs = @sub.wait_for_messages max: @batch_size
    successful_messages = []

    msgs.each do |msg|
      begin
        @codec.decode(msg.data) do |event|
          msg.attributes.each_pair{|k, v| event[k] = v} if msg.attributes 
          decorate(event)
          queue << event
        end
        successful_messages.append(msg)
      rescue StandardError
        @logger.warn("Error processing message", :message => msg)
      end
    end

    @sub.acknowledge successful_messages if successful_messages.length > 0
  end
end