Class: ThreadJob::Memory::Store
- Defined in:
- lib/thread_job/backends/memory/store.rb
Instance Method Summary collapse
- #complete_job(queue_name, job_id) ⇒ Object
- #fail_job(queue_name, job_id) ⇒ Object
- #get_job(queue_name, job_id) ⇒ Object
-
#initialize(max_retries = 10, logger = Logger.new(STDOUT)) ⇒ Store
constructor
A new instance of Store.
- #poll_for_job(queue_name) ⇒ Object
- #save_job(queue_name, job_name, job) ⇒ Object
Constructor Details
#initialize(max_retries = 10, logger = Logger.new(STDOUT)) ⇒ Store
15 16 17 18 19 20 21 |
# File 'lib/thread_job/backends/memory/store.rb', line 15 def initialize(max_retries=10, logger=Logger.new(STDOUT)) @jobs = {} @failed_jobs = {} @mutex = Mutex.new @logger = logger @max_retries = max_retries end |
Instance Method Details
#complete_job(queue_name, job_id) ⇒ Object
75 76 77 78 79 80 81 82 83 |
# File 'lib/thread_job/backends/memory/store.rb', line 75 def complete_job(queue_name, job_id) @mutex.synchronize { job = get_job(queue_name, job_id) if job @jobs[queue_name].delete(job) @logger.info("[MemoryStore] job: '#{job.job_name}' has been completed and removed from the queue") end } end |
#fail_job(queue_name, job_id) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/thread_job/backends/memory/store.rb', line 85 def fail_job(queue_name, job_id) @mutex.synchronize { job = get_job(queue_name, job_id) if job job.status = FAILED job.attempts += 1 if job.attempts == @max_retries @failed_jobs[queue_name].push(job) @jobs[queue_name].delete(job) @logger.warn("[MemoryStore] job: '#{job.job_name}' has failed the reached the maximum amount of retries (#{@max_retries}) and is being removed from the queue.") else @logger.info("[MemoryStore] failed job: '#{job.job_name}' has been requeued and attempted #{job.attempts} times") end end } end |
#get_job(queue_name, job_id) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/thread_job/backends/memory/store.rb', line 59 def get_job(queue_name, job_id) found_job = false if @jobs[queue_name] != nil @jobs[queue_name].each do |job| if job.id == job_id found_job = true return job end end end @logger.warn("[MemoryStore] unable to get job: #{job_id} from queue: #{queue_name}") return nil end |
#poll_for_job(queue_name) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/thread_job/backends/memory/store.rb', line 42 def poll_for_job(queue_name) @jobs[queue_name] ||= [] @logger.debug("[MemoryStore] Polling for jobs, #{@jobs[queue_name].length} in the queue") @mutex.synchronize { @jobs[queue_name].each do |record| if record.status == AVAILABLE || (record.status == FAILED && record.attempts < @max_retries) record.status = WORKING @logger.debug("[MemoryStore] Sending job '#{record.job_name}' to the thread pool for work") return {id: record.id, job: record.job, job_name: record.job_name} end end } return nil end |
#save_job(queue_name, job_name, job) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/thread_job/backends/memory/store.rb', line 23 def save_job(queue_name, job_name, job) @mutex.synchronize { queued_jobs = @jobs[queue_name] ||= [] failed_queue_jobs = @failed_jobs[queue_name] ||= [] rec = Memory::Record.new rec.attempts = 0 rec.id = queued_jobs.count + 1 rec.job_name = job_name rec.job = job rec.status = AVAILABLE rec.queue_name = queue_name queued_jobs.push(rec) } @logger.info("[MemoryStore] Saved #{job_name}") end |