Class: LogStash::Outputs::Rados

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/rados.rb

Overview

INFORMATION:

This plugin sends logstash events to Ceph. To use it you need to have a properly configured librados and a valid ceph cluster. Make sure you have permissions to write files on Ceph. Also be sure to run logstash as super user to establish a connection.

This plugin outputs temporary files to “/opt/logstash/rados_temp/”. If you want, you can change the path at the start of register method. These files have a special name, for example:

ls.rados.ip-10-228-27-95.2013-04-18T10.00.tag_hello.part0.txt

ls.rados : indicate logstash plugin rados

“ip-10-228-27-95” : indicates the ip of your machine. “2013-04-18T10.00” : represents the time whenever you specify time_file. “tag_hello” : this indicates the event’s tag. “part0” : this means if you indicate size_file then it will generate more parts if you file.size > size_file.

When a file is full it will be pushed to the pool and will be deleted from the temporary directory.
If a file is empty is not pushed, it is not deleted.

This plugin have a system to restore the previous temporary files if something crash.

Note

:

If you specify size_file and time_file then it will create file for each tag (if specified), when time_file or their size > size_file, it will be triggered then they will be pushed on Rados pool and will delete from local disk. If you don’t specify size_file, but time_file then it will create only one file for each tag (if specified). When time_file it will be triggered then the files will be pushed on Rados and delete from local disk.

If you don’t specify time_file, but size_file then it will create files for each tag (if specified), that will be triggered when their size > size_file, then they will be pushed on Rados pool and will delete from local disk.

If you don’t specific size_file and time_file you have a curios mode. It will create only one file for each tag (if specified). Then the file will be rest on temporary directory and don’t will be pushed on pool until we will restart logstash.

#### Usage: This is an example of logstash config:

source,ruby

output {

rados{
  mypool => "mypool"             (required)
  size_file => 2048                        (optional)
  time_file => 5                           (optional)
}

Constant Summary collapse

TEMPFILE_EXTENSION =
"txt"
RADOS_INVALID_CHARACTERS =
/[\^`><]/

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#clusterObject (readonly)

Exposed attributes for testing purpose.



109
110
111
# File 'lib/logstash/outputs/rados.rb', line 109

def cluster
  @cluster
end

#page_counterObject (readonly)

Returns the value of attribute page_counter.



107
108
109
# File 'lib/logstash/outputs/rados.rb', line 107

def page_counter
  @page_counter
end

#tempfileObject

Exposed attributes for testing purpose.



106
107
108
# File 'lib/logstash/outputs/rados.rb', line 106

def tempfile
  @tempfile
end

Instance Method Details

#closeObject



261
262
263
264
265
266
267
268
269
# File 'lib/logstash/outputs/rados.rb', line 261

def close
  shutdown_upload_workers
  @periodic_rotation_thread.stop! if @periodic_rotation_thread

  @file_rotation_lock.synchronize do
    @tempfile.close unless @tempfile.nil? && @tempfile.closed?
  end
  @cluster.close
end

#create_temporary_fileObject



138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/logstash/outputs/rados.rb', line 138

def create_temporary_file
  filename = File.join(@temporary_directory, get_temporary_filename(@page_counter))

  @logger.debug("RADOS: Creating a new temporary file", :filename => filename)

  @file_rotation_lock.synchronize do
    unless @tempfile.nil?
      @tempfile.close
    end

    @tempfile = File.open(filename, "a")
  end
end

#get_temporary_filename(page_counter = 0) ⇒ Object



216
217
218
219
220
221
222
223
224
225
# File 'lib/logstash/outputs/rados.rb', line 216

def get_temporary_filename(page_counter = 0)
  current_time = Time.now
  filename = "ls.rados.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}"

  if @tags.size > 0
    return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
  else
    return "#{filename}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
  end
end

#move_file_to_pool(file) ⇒ Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/logstash/outputs/rados.rb', line 194

def move_file_to_pool(file)
  if !File.zero?(file)
    write_on_pool(file)
    @logger.debug("RADOS: file was put on the upload thread", :filename => File.basename(file), :pool => @pool)
  end

  begin
    File.delete(file)
  rescue Errno::ENOENT
    # Something else deleted the file, logging but not raising the issue
    @logger.warn("RADOS: Cannot delete the temporary file since it doesn't exist on disk", :filename => File.basename(file))
  rescue Errno::EACCES
    @logger.error("RADOS: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
  end
end

#periodic_intervalObject



211
212
213
# File 'lib/logstash/outputs/rados.rb', line 211

def periodic_interval
  @time_file * 60
end

#receive(event) ⇒ Object



228
229
230
231
# File 'lib/logstash/outputs/rados.rb', line 228

def receive(event)

  @codec.encode(event)
end

#registerObject



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/logstash/outputs/rados.rb', line 153

def register
  require "ceph-ruby"
  # required if using ruby version < 2.0
  # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby
  workers_not_supported

  @cluster = CephRuby::Cluster.new
  @upload_queue = Queue.new
  @file_rotation_lock = Mutex.new

  if @prefix && @prefix =~ RADOS_INVALID_CHARACTERS
    @logger.error("RADOS: prefix contains invalid characters", :prefix => @prefix, :contains => RADOS_INVALID_CHARACTERS)
    raise LogStash::ConfigurationError, "RADOS: prefix contains invalid characters"
  end

  if !Dir.exist?(@temporary_directory)
    FileUtils.mkdir_p(@temporary_directory)
  end
  restore_from_crashes if @restore == true
  reset_page_counter
  create_temporary_file
  configure_periodic_rotation if time_file != 0
  configure_upload_workers

  @codec.on_event do |event, encoded_event|
    handle_event(encoded_event)
  end
end

#restore_from_crashesObject



183
184
185
186
187
188
189
190
191
# File 'lib/logstash/outputs/rados.rb', line 183

def restore_from_crashes
  @logger.debug("RADOS: is attempting to verify previous crashes...")

  Dir[File.join(@temporary_directory, "*.#{TEMPFILE_EXTENSION}")].each do |file|
    name_file = File.basename(file)
    @logger.warn("RADOS: have found temporary file the upload process crashed, uploading file to Rados.", :filename => name_file)
    move_file_to_pool_async(file)
  end
end

#rotate_events_log?Boolean

Returns:

  • (Boolean)


234
235
236
237
238
# File 'lib/logstash/outputs/rados.rb', line 234

def rotate_events_log?
  @file_rotation_lock.synchronize do
    @tempfile.size > @size_file
  end
end

#write_events_to_multiple_files?Boolean

Returns:

  • (Boolean)


241
242
243
# File 'lib/logstash/outputs/rados.rb', line 241

def write_events_to_multiple_files?
  @size_file > 0
end

#write_on_pool(file) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/logstash/outputs/rados.rb', line 113

def write_on_pool(file)
  rados_pool = @cluster.pool(@pool)
  rados_pool.open
  remote_filename = "#{@prefix}#{File.basename(file)}"

  @logger.debug("RADOS: ready to write file in pool", :remote_filename => remote_filename, :pool => @pool)

  File.open(file, 'r') do |fileIO|
    begin
      # prepare for write the file
      object = rados_pool.rados_object(remote_filename)
      object.write(0, fileIO.read)
    rescue SystemCallError => error
      @logger.error("RADOS: CEPH error", :error => error)
      raise LogStash::Error, "CEPH Configuration Error, #{error}"
    ensure
      rados_pool.close
    end
  end

  @logger.debug("RADOS: has written remote file in pool", :remote_filename => remote_filename, :pool  => @pool)
end

#write_to_tempfile(event) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/logstash/outputs/rados.rb', line 246

def write_to_tempfile(event)
  begin
    @logger.debug("RADOS: put event into tempfile ", :tempfile => File.basename(@tempfile))

    @file_rotation_lock.synchronize do
      @tempfile.syswrite(event)
    end
  rescue Errno::ENOSPC
    @logger.error("RADOS: No space left in temporary directory", :temporary_directory => @temporary_directory)
    close
  end
end