Class: FilesystemQueue::Queue
- Inherits:
-
Object
- Object
- FilesystemQueue::Queue
- Defined in:
- lib/filesystem_queue.rb
Overview
A persistent queue system based on the local filesystem Handles
Instance Method Summary collapse
-
#cleanup ⇒ Object
CAUTION: Cleanup the queue directory, removing all files and directories.
- #complete(job_file) ⇒ Object
- #dequeue ⇒ Object
- #enqueue(job) ⇒ Object
- #fail(job_file, exception) ⇒ Object
- #failed_size ⇒ Object
-
#initialize(queue_dir) ⇒ Queue
constructor
A new instance of Queue.
- #reenqueue_completed_jobs ⇒ Object
- #reenqueue_failed_jobs ⇒ Object
- #retry_failed_jobs ⇒ Object
- #size ⇒ Object
Constructor Details
#initialize(queue_dir) ⇒ Queue
Returns a new instance of Queue.
13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/filesystem_queue.rb', line 13 def initialize(queue_dir) @queue_dir = queue_dir @jobs_dir = File.join(@queue_dir, 'jobs') @completed_dir = File.join(@queue_dir, 'completed') @failed_dir = File.join(@queue_dir, 'failed') [@jobs_dir, @completed_dir, @failed_dir].each do |dir| FileUtils.mkdir_p(dir) unless Dir.exist?(dir) end @index = rebuild_index end |
Instance Method Details
#cleanup ⇒ Object
CAUTION: Cleanup the queue directory, removing all files and directories
78 79 80 81 82 83 |
# File 'lib/filesystem_queue.rb', line 78 def cleanup [@jobs_dir, @completed_dir, @failed_dir].each do |dir| FileUtils.rm_rf(dir) end FileUtils.rm_rf(@queue_dir) end |
#complete(job_file) ⇒ Object
43 44 45 |
# File 'lib/filesystem_queue.rb', line 43 def complete(job_file) move_job(job_file, @completed_dir) end |
#dequeue ⇒ Object
33 34 35 36 37 38 39 40 41 |
# File 'lib/filesystem_queue.rb', line 33 def dequeue return nil if @index.empty? job_file = @index.shift return nil unless File.exist?(job_file) job_data = JSON.parse(File.read(job_file), symbolize_names: true) [job_file, job_data] end |
#enqueue(job) ⇒ Object
26 27 28 29 30 31 |
# File 'lib/filesystem_queue.rb', line 26 def enqueue(job) = Time.now.to_f.to_s job_file = File.join(@jobs_dir, "job_#{}.json") File.write(job_file, job.to_json) @index << job_file end |
#fail(job_file, exception) ⇒ Object
47 48 49 50 |
# File 'lib/filesystem_queue.rb', line 47 def fail(job_file, exception) mark_failed_job(job_file, exception) move_job(job_file, @failed_dir) end |
#failed_size ⇒ Object
56 57 58 |
# File 'lib/filesystem_queue.rb', line 56 def failed_size Dir[File.join(@failed_dir, '*')].count { |file| File.file?(file) } end |
#reenqueue_completed_jobs ⇒ Object
73 74 75 |
# File 'lib/filesystem_queue.rb', line 73 def reenqueue_completed_jobs reenqueue_jobs(@completed_dir) end |
#reenqueue_failed_jobs ⇒ Object
69 70 71 |
# File 'lib/filesystem_queue.rb', line 69 def reenqueue_failed_jobs reenqueue_jobs(@failed_dir) end |
#retry_failed_jobs ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/filesystem_queue.rb', line 60 def retry_failed_jobs failed_jobs = Dir.glob(File.join(@failed_dir, '*.json')) failed_jobs.each do |job_file| new_job_file = File.join(@jobs_dir, File.basename(job_file)) FileUtils.mv(job_file, new_job_file) @index << new_job_file end end |
#size ⇒ Object
52 53 54 |
# File 'lib/filesystem_queue.rb', line 52 def size @index.size end |