Class: LogStash::Inputs::GCS
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::GCS
- Defined in:
- lib/logstash/inputs/gcs.rb
Overview
Stream events from files from a S3 bucket.
Each line from each file generates an event. Files ending in ‘.gz` are handled as gzip’ed files.
Defined Under Namespace
Modules: SinceDB
Instance Method Summary collapse
- #backup_to_bucket(object, key) ⇒ Object
- #backup_to_dir(filename) ⇒ Object
- #list_new_files ⇒ Object
- #process_files(queue) ⇒ Object
- #register ⇒ Object
- #run(queue) ⇒ Object
- #stop ⇒ Object
Instance Method Details
#backup_to_bucket(object, key) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/logstash/inputs/gcs.rb', line 124 def backup_to_bucket(object, key) # TODO (barak) unless @backup_to_bucket.nil? backup_key = "#{@backup_add_prefix}#{key}" if @delete object.move_to(backup_key, :bucket => @backup_bucket) else object.copy_to(backup_key, :bucket => @backup_bucket) end end end |
#backup_to_dir(filename) ⇒ Object
137 138 139 140 141 |
# File 'lib/logstash/inputs/gcs.rb', line 137 def backup_to_dir(filename) unless @backup_to_dir.nil? FileUtils.cp(filename, @backup_to_dir) end end |
#list_new_files ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/logstash/inputs/gcs.rb', line 106 def list_new_files @logger.debug("GCS input: Polling") objects = {} @gcsbucket.files({prefix: @prefix}).each do |file| @logger.debug("GCS input: Found file", :name => file.name) unless ignore_filename?(file.name) if sincedb.newer?(file.updated_at()) objects[file.name] = file.updated_at() @logger.debug("GCS input: Adding to objects[]", :name => file.name) end end end return objects.keys.sort {|a,b| objects[a] <=> objects[b]} end |
#process_files(queue) ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/logstash/inputs/gcs.rb', line 144 def process_files(queue) objects = list_new_files objects.each do |file| if stop? break else @logger.debug("GCS input processing", :bucket => @bucket, :file => file) process_log(queue, file) end end end |
#register ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/logstash/inputs/gcs.rb', line 74 def register require "fileutils" require "digest/md5" @logger.info("Registering GCS input", :bucket => @bucket, :project => @project, :keyfile => @keyfile) @gcs = Gcloud.new(project=@project, keyfile=@keyfile).storage @gcsbucket = @gcs.bucket @bucket unless @backup_to_bucket.nil? @backup_bucket = @gcs.bucket @backup_to_bucket unless @backup_bucket @gcs.create_bucket(@backup_to_bucket) end end unless @backup_to_dir.nil? Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir) end FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory) end |
#run(queue) ⇒ Object
98 99 100 101 102 103 |
# File 'lib/logstash/inputs/gcs.rb', line 98 def run(queue) @current_thread = Thread.current Stud.interval(@interval, sleep_then_run: false) do process_files(queue) end end |
#stop ⇒ Object
158 159 160 161 162 163 |
# File 'lib/logstash/inputs/gcs.rb', line 158 def stop # @current_thread is initialized in the `#run` method, # this variable is needed because the `#stop` is a called in another thread # than the `#run` method and requiring us to call stop! with a explicit thread. Stud.stop!(@current_thread) end |