Class: FilesystemQueue::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/filesystem_queue.rb

Overview

A persistent queue system based on the local filesystem Handles

Instance Method Summary collapse

Constructor Details

#initialize(queue_dir) ⇒ Queue

Returns a new instance of Queue.



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/filesystem_queue.rb', line 13

def initialize(queue_dir)
  @queue_dir = queue_dir
  @jobs_dir = File.join(@queue_dir, 'jobs')
  @completed_dir = File.join(@queue_dir, 'completed')
  @failed_dir = File.join(@queue_dir, 'failed')

  [@jobs_dir, @completed_dir, @failed_dir].each do |dir|
    FileUtils.mkdir_p(dir) unless Dir.exist?(dir)
  end

  @index = rebuild_index
end

Instance Method Details

#cleanupObject

CAUTION: Cleanup the queue directory, removing all files and directories



78
79
80
81
82
83
# File 'lib/filesystem_queue.rb', line 78

def cleanup
  [@jobs_dir, @completed_dir, @failed_dir].each do |dir|
    FileUtils.rm_rf(dir)
  end
  FileUtils.rm_rf(@queue_dir)
end

#complete(job_file) ⇒ Object



43
44
45
# File 'lib/filesystem_queue.rb', line 43

def complete(job_file)
  move_job(job_file, @completed_dir)
end

#dequeueObject



33
34
35
36
37
38
39
40
41
# File 'lib/filesystem_queue.rb', line 33

def dequeue
  return nil if @index.empty?

  job_file = @index.shift
  return nil unless File.exist?(job_file)

  job_data = JSON.parse(File.read(job_file), symbolize_names: true)
  [job_file, job_data]
end

#enqueue(job) ⇒ Object



26
27
28
29
30
31
# File 'lib/filesystem_queue.rb', line 26

def enqueue(job)
  timestamp = Time.now.to_f.to_s
  job_file = File.join(@jobs_dir, "job_#{timestamp}.json")
  File.write(job_file, job.to_json)
  @index << job_file
end

#fail(job_file, exception) ⇒ Object



47
48
49
50
# File 'lib/filesystem_queue.rb', line 47

def fail(job_file, exception)
  mark_failed_job(job_file, exception)
  move_job(job_file, @failed_dir)
end

#failed_sizeObject



56
57
58
# File 'lib/filesystem_queue.rb', line 56

def failed_size
  Dir[File.join(@failed_dir, '*')].count { |file| File.file?(file) }
end

#reenqueue_completed_jobsObject



73
74
75
# File 'lib/filesystem_queue.rb', line 73

def reenqueue_completed_jobs
  reenqueue_jobs(@completed_dir)
end

#reenqueue_failed_jobsObject



69
70
71
# File 'lib/filesystem_queue.rb', line 69

def reenqueue_failed_jobs
  reenqueue_jobs(@failed_dir)
end

#retry_failed_jobsObject



60
61
62
63
64
65
66
67
# File 'lib/filesystem_queue.rb', line 60

def retry_failed_jobs
  failed_jobs = Dir.glob(File.join(@failed_dir, '*.json'))
  failed_jobs.each do |job_file|
    new_job_file = File.join(@jobs_dir, File.basename(job_file))
    FileUtils.mv(job_file, new_job_file)
    @index << new_job_file
  end
end

#sizeObject



52
53
54
# File 'lib/filesystem_queue.rb', line 52

def size
  @index.size
end