Method: Exekutor::Internal::Reserver#reserve
- Defined in:
- lib/exekutor/internal/reserver.rb
#reserve(limit) ⇒ Array<Hash>?
Reserves pending jobs
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/exekutor/internal/reserver.rb', line 23 def reserve(limit) return unless limit.positive? results = Exekutor::Job.connection.exec_query <<~SQL, ACTION_NAME, [@worker_id, limit], prepare: true UPDATE exekutor_jobs SET worker_id = $1, status = 'e' WHERE id IN ( SELECT id FROM exekutor_jobs WHERE scheduled_at <= now() AND "status"='p'#{" AND #{@reserve_filter_sql}" if @reserve_filter_sql} ORDER BY priority#{" DESC" if Exekutor.config.inverse_priority?}, scheduled_at, enqueued_at FOR UPDATE SKIP LOCKED LIMIT $2 ) RETURNING "id", "payload", "options", "scheduled_at" SQL return unless results&.length&.positive? parse_jobs results end |