Class: LogStash::Inputs::GCS

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/gcs.rb

Overview

Stream events from files from a S3 bucket.

Each line from each file generates an event. Files ending in ‘.gz` are handled as gzip’ed files.

Defined Under Namespace

Modules: SinceDB

Instance Method Summary collapse

Instance Method Details

#backup_to_bucket(object, key) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
# File 'lib/logstash/inputs/gcs.rb', line 124

def backup_to_bucket(object, key)
  # TODO (barak)
  unless @backup_to_bucket.nil?
    backup_key = "#{@backup_add_prefix}#{key}"
    if @delete
      object.move_to(backup_key, :bucket => @backup_bucket)
    else
      object.copy_to(backup_key, :bucket => @backup_bucket)
    end
  end
end

#backup_to_dir(filename) ⇒ Object



137
138
139
140
141
# File 'lib/logstash/inputs/gcs.rb', line 137

def backup_to_dir(filename)
  unless @backup_to_dir.nil?
    FileUtils.cp(filename, @backup_to_dir)
  end
end

#list_new_filesObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/logstash/inputs/gcs.rb', line 106

def list_new_files
  @logger.debug("GCS input: Polling")

  objects = {}
  @gcsbucket.files({prefix: @prefix}).each do |file|
    @logger.debug("GCS input: Found file", :name => file.name)

    unless ignore_filename?(file.name)
      if sincedb.newer?(file.updated_at())
        objects[file.name] = file.updated_at()
        @logger.debug("GCS input: Adding to objects[]", :name => file.name)
      end
    end
  end
  return objects.keys.sort {|a,b| objects[a] <=> objects[b]}
end

#process_files(queue) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/logstash/inputs/gcs.rb', line 144

def process_files(queue)
  objects = list_new_files

  objects.each do |file|
    if stop?
      break
    else
      @logger.debug("GCS input processing", :bucket => @bucket, :file => file)
      process_log(queue, file)
    end
  end
end

#registerObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/logstash/inputs/gcs.rb', line 74

def register
  require "fileutils"
  require "digest/md5"

  @logger.info("Registering GCS input", :bucket => @bucket, :project => @project, :keyfile => @keyfile)

  @gcs = Gcloud.new(project=@project, keyfile=@keyfile).storage 
  @gcsbucket = @gcs.bucket @bucket

  unless @backup_to_bucket.nil?
    @backup_bucket = @gcs.bucket @backup_to_bucket
    unless @backup_bucket
      @gcs.create_bucket(@backup_to_bucket)
    end
  end

  unless @backup_to_dir.nil?
    Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir)
  end

  FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory)
end

#run(queue) ⇒ Object



98
99
100
101
102
103
# File 'lib/logstash/inputs/gcs.rb', line 98

def run(queue)
  @current_thread = Thread.current
  Stud.interval(@interval, sleep_then_run: false) do
    process_files(queue)
  end
end

#stopObject



158
159
160
161
162
163
# File 'lib/logstash/inputs/gcs.rb', line 158

def stop
  # @current_thread is initialized in the `#run` method,
  # this variable is needed because the `#stop` is a called in another thread
  # than the `#run` method and requiring us to call stop! with a explicit thread.
  Stud.stop!(@current_thread)
end