Class: LogStash::Inputs::GoogleCloudPubSub
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::GoogleCloudPubSub
- Defined in:
- lib/logstash/inputs/google_cloud_pubsub.rb
Overview
Stream events from files from a Google Cloud PubSub subscription
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/logstash/inputs/google_cloud_pubsub.rb', line 48 def register @logger.info("Registering Google Cloud PubSub input", :project => @project, :keyfile => @keyfile, :topic => @topic, :subscription => @subscription) @pubsub = Gcloud.new(project=@project, keyfile=@keyfile).pubsub _topic = @pubsub.topic(@topic, { :autocreate => @autocreate_topic, :project => @topic_project }) raise "Topic #{@topic} not found" if not _topic @logger.debug("Topic: ", :topic => _topic) @sub = _topic.subscription(@subscription) if (not @sub or not @sub.exists?) if @autocreate_subscription @logger.debug("Creating Subscription: ", :subscription => @subscription) @sub = _topic.subscribe(@subscription) else raise "Subscription #{@subscription} not found" end end @logger.debug("Subscription: ", :subscription => @sub) end |
#run(queue) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/logstash/inputs/google_cloud_pubsub.rb', line 74 def run(queue) while !stop do msgs = @sub. max: @batch_size = [] msgs.each do |msg| begin @codec.decode(msg.data) do |event| msg.attributes.each_pair{|k, v| event[k] = v} if msg.attributes decorate(event) queue << event end .append(msg) rescue StandardError @logger.warn("Error processing message", :message => msg) end end @sub.acknowledge if .length > 0 end end |