Method: Exekutor::Internal::Reserver#reserve

Defined in:
lib/exekutor/internal/reserver.rb

#reserve(limit) ⇒ Array<Hash>?

Reserves pending jobs

Parameters:

  • limit (Integer)

    the number of jobs to reserve

Returns:

  • (Array<Hash>, nil)

    the reserved jobs, or nil if no jobs were reserved



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/exekutor/internal/reserver.rb', line 23

def reserve(limit)
  return unless limit.positive?

  results = Exekutor::Job.connection.exec_query <<~SQL, ACTION_NAME, [@worker_id, limit], prepare: true
    UPDATE exekutor_jobs SET worker_id = $1, status = 'e' WHERE id IN (
       SELECT id FROM exekutor_jobs
          WHERE scheduled_at <= now() AND "status"='p'#{" AND #{@reserve_filter_sql}" if @reserve_filter_sql}
          ORDER BY priority#{" DESC" if Exekutor.config.inverse_priority?}, scheduled_at, enqueued_at
          FOR UPDATE SKIP LOCKED
          LIMIT $2
    ) RETURNING "id", "payload", "options", "scheduled_at"
  SQL
  return unless results&.length&.positive?

  parse_jobs results
end