Class: S3reamer::DirectoryStreamer
- Inherits:
-
Object
- Object
- S3reamer::DirectoryStreamer
- Defined in:
- lib/s3reamer/directory_streamer.rb
Constant Summary collapse
- DEFAULT_OPTIONS =
{ pool_size: 4, log_level: Logger::INFO, reader_sleep_interval: 1, reader_timeout: 10, encryption_key: nil, log_file: STDOUT, filters: [] }
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ DirectoryStreamer
constructor
A new instance of DirectoryStreamer.
- #stream_directory(directory:, bucket:) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ DirectoryStreamer
Returns a new instance of DirectoryStreamer.
25 26 27 28 29 |
# File 'lib/s3reamer/directory_streamer.rb', line 25 def initialize( = {}) @options = DEFAULT_OPTIONS.merge() @log = Logger.new([:log_file]) @log.level = [:log_level] end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
23 24 25 |
# File 'lib/s3reamer/directory_streamer.rb', line 23 def @options end |
Instance Method Details
#stream_directory(directory:, bucket:) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/s3reamer/directory_streamer.rb', line 31 def stream_directory(directory:, bucket:) file_statuses = ThreadSafe::Hash.new dir_watch = INotify::Notifier.new pool = Thread.pool([:pool_size]) log.info "Setting up watch on: #{directory}" dir_watch.watch(directory, :open, :close, :recursive) do |e| filename = e.absolute_name log.debug "Events #{e.flags.inspect} received for: #{filename}" # Don't process directories next unless File.exists?(filename) and !File.directory?(filename) # If this is an "open" event, we should only process it if we haven't # already started on this file. next if e.flags.include?(:open) and file_statuses.include?(filename) if [:filters].any? && [:filters].none? { |x| File.fnmatch(x, filename) } log.debug "Skipping event for file not matching filters: #{filename}" next end # If this is a "close" event, we should update the status to inform the # worker thread if e.flags.include?(:close) and file_statuses.include?(filename) file_statuses[filename] = :close next end log.info "File opened: #{filename}" file_statuses[filename] = :open pool.process { log.debug "Starting process for: #{filename}" begin prefix = Pathname.new(filename) prefix = prefix.relative_path_from(Pathname.new(directory)) prefix = "#{[:prefix]}#{prefix}" obj = bucket.object(prefix) io = S3reamer::S3WriteStream.new(obj) rescue Exception => e log.error "Error initializing S3 streamer: #{e}\n#{e.backtrace.join("\n")}" raise e end log.debug "Initialized S3 streamer" open(filename) do |file| stopped = false size = 0 last_successful_read = Time.now # Start with bytes_read != 0 to force at least one read of the file. # This addresses the race condition caused by files being opened and # closed quickly. bytes_read = -1 # Go until the file has closed, or until we've not seen any new # bytes written to the file past some threshold (specified by # options[:reader_timeout]). while (file_statuses[filename] == :open || bytes_read != 0) && (last_successful_read + [:reader_timeout]) > Time.now b = file.read bytes_read = b.length io.write(b) # If we read any bytes, reset the time at which we last saw new # bytes in the file. This prevents the read timeout condition from # triggering. if bytes_read > 0 log.debug "Read #{bytes_read} bytes: #{filename}" last_successful_read = Time.now end sleep [:reader_sleep_interval] unless file_statuses[filename] != :open end log.info "File closed. Completing S3 upload: #{filename}" end begin io.close rescue Exception => e log.error "Error completing S3 upload: #{e}:\n#{e.backtrace.join("\n")}" end file_statuses.delete(filename) } end dir_watch.run pool.shutdown end |