Class: GemeraldBeanstalk::Job
- Inherits:
-
Object
- Object
- GemeraldBeanstalk::Job
- Defined in:
- lib/gemerald_beanstalk/job.rb
Constant Summary collapse
- MAX_JOB_PRIORITY =
2**32
- INACTIVE_STATES =
[:buried, :delayed]
- RESERVED_STATES =
[:deadline_pending, :reserved]
- UPDATE_STATES =
[:deadline_pending, :delayed, :reserved]
Instance Attribute Summary collapse
-
#beanstalk ⇒ Object
readonly
Returns the value of attribute beanstalk.
-
#body ⇒ Object
Returns the value of attribute body.
-
#buried_at ⇒ Object
Returns the value of attribute buried_at.
-
#bytes ⇒ Object
Returns the value of attribute bytes.
-
#created_at ⇒ Object
Returns the value of attribute created_at.
-
#delay ⇒ Object
Returns the value of attribute delay.
-
#id ⇒ Object
Returns the value of attribute id.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#ready_at ⇒ Object
Returns the value of attribute ready_at.
-
#reserved_at ⇒ Object
readonly
Returns the value of attribute reserved_at.
-
#reserved_by ⇒ Object
readonly
Returns the value of attribute reserved_by.
-
#timeout_at ⇒ Object
readonly
Returns the value of attribute timeout_at.
-
#ttr ⇒ Object
Returns the value of attribute ttr.
-
#tube_name ⇒ Object
Returns the value of attribute tube_name.
Instance Method Summary collapse
- #<(other_job) ⇒ Object
- #<=>(other_job) ⇒ Object
- #buried? ⇒ Boolean
- #bury(connection, priority, *args) ⇒ Object
-
#deadline_approaching(*args) ⇒ Object
Must look at @state to avoid infinite recursion.
- #deadline_pending? ⇒ Boolean
- #delayed? ⇒ Boolean
- #delete(connection, *args) ⇒ Object
-
#initialize(beanstalk, id, tube_name, priority, delay, ttr, bytes, body) ⇒ Job
constructor
A new instance of Job.
- #kick(*args) ⇒ Object
- #ready? ⇒ Boolean
- #release(connection, priority, delay, increment_stats = true, *args) ⇒ Object
- #reserve(connection, *args) ⇒ Object
- #reserved_by_connection?(connection) ⇒ Boolean
- #reset_reserve_state ⇒ Object
- #state ⇒ Object
- #stats ⇒ Object
- #time_left(current_time = Time.now.to_f) ⇒ Object
-
#timed_out(*args) ⇒ Object
Must reference @state to avoid infinite recursion.
- #timed_out? ⇒ Boolean
- #touch(connection) ⇒ Object
Constructor Details
#initialize(beanstalk, id, tube_name, priority, delay, ttr, bytes, body) ⇒ Job
Returns a new instance of Job.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/gemerald_beanstalk/job.rb', line 82 def initialize(beanstalk, id, tube_name, priority, delay, ttr, bytes, body) priority, delay, ttr = priority.to_i, delay.to_i, ttr.to_i @beanstalk = beanstalk @stats_hash = Hash.new(0) self.id = id self.tube_name = tube_name self.priority = priority % MAX_JOB_PRIORITY self.delay = delay self.ttr = ttr == 0 ? 1 : ttr self.bytes = bytes self.body = body self.created_at = Time.now.to_f self.ready_at = self.created_at + delay @state = delay > 0 ? :delayed : :ready end |
Instance Attribute Details
#beanstalk ⇒ Object (readonly)
Returns the value of attribute beanstalk.
9 10 11 |
# File 'lib/gemerald_beanstalk/job.rb', line 9 def beanstalk @beanstalk end |
#body ⇒ Object
Returns the value of attribute body.
10 11 12 |
# File 'lib/gemerald_beanstalk/job.rb', line 10 def body @body end |
#buried_at ⇒ Object
Returns the value of attribute buried_at.
10 11 12 |
# File 'lib/gemerald_beanstalk/job.rb', line 10 def buried_at @buried_at end |
#bytes ⇒ Object
Returns the value of attribute bytes.
10 11 12 |
# File 'lib/gemerald_beanstalk/job.rb', line 10 def bytes @bytes end |
#created_at ⇒ Object
Returns the value of attribute created_at.
10 11 12 |
# File 'lib/gemerald_beanstalk/job.rb', line 10 def created_at @created_at end |
#delay ⇒ Object
Returns the value of attribute delay.
10 11 12 |
# File 'lib/gemerald_beanstalk/job.rb', line 10 def delay @delay end |
#id ⇒ Object
Returns the value of attribute id.
10 11 12 |
# File 'lib/gemerald_beanstalk/job.rb', line 10 def id @id end |
#priority ⇒ Object
Returns the value of attribute priority.
10 11 12 |
# File 'lib/gemerald_beanstalk/job.rb', line 10 def priority @priority end |
#ready_at ⇒ Object
Returns the value of attribute ready_at.
10 11 12 |
# File 'lib/gemerald_beanstalk/job.rb', line 10 def ready_at @ready_at end |
#reserved_at ⇒ Object (readonly)
Returns the value of attribute reserved_at.
9 10 11 |
# File 'lib/gemerald_beanstalk/job.rb', line 9 def reserved_at @reserved_at end |
#reserved_by ⇒ Object (readonly)
Returns the value of attribute reserved_by.
9 10 11 |
# File 'lib/gemerald_beanstalk/job.rb', line 9 def reserved_by @reserved_by end |
#timeout_at ⇒ Object (readonly)
Returns the value of attribute timeout_at.
9 10 11 |
# File 'lib/gemerald_beanstalk/job.rb', line 9 def timeout_at @timeout_at end |
#ttr ⇒ Object
Returns the value of attribute ttr.
10 11 12 |
# File 'lib/gemerald_beanstalk/job.rb', line 10 def ttr @ttr end |
#tube_name ⇒ Object
Returns the value of attribute tube_name.
10 11 12 |
# File 'lib/gemerald_beanstalk/job.rb', line 10 def tube_name @tube_name end |
Instance Method Details
#<(other_job) ⇒ Object
14 15 16 |
# File 'lib/gemerald_beanstalk/job.rb', line 14 def <(other_job) return (self <=> other_job) == -1 end |
#<=>(other_job) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/gemerald_beanstalk/job.rb', line 19 def <=>(other_job) raise 'Cannot compare job with nil' if other_job.nil? current_state = state raise 'Cannot compare jobs with different states' if current_state != other_job.state case current_state when :ready return -1 if self.priority < other_job.priority || self.priority == other_job.priority && self.created_at < other_job.created_at when :delayed return -1 if self.ready_at < other_job.ready_at when :buried return -1 if self.buried_at < other_job.buried_at else raise "Cannot compare job with state of #{current_state}" end return 1 end |
#buried? ⇒ Boolean
39 40 41 |
# File 'lib/gemerald_beanstalk/job.rb', line 39 def buried? return state == :buried end |
#bury(connection, priority, *args) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/gemerald_beanstalk/job.rb', line 44 def bury(connection, priority, *args) return false unless reserved_by_connection?(connection) reset_reserve_state @state = :buried @stats_hash[:'buries'] += 1 self.priority = priority.to_i self.buried_at = Time.now.to_f self.ready_at = nil return true end |
#deadline_approaching(*args) ⇒ Object
Must look at @state to avoid infinite recursion
58 59 60 61 62 |
# File 'lib/gemerald_beanstalk/job.rb', line 58 def deadline_approaching(*args) return false unless @state == :reserved @state = :deadline_pending return true end |
#deadline_pending? ⇒ Boolean
65 66 67 |
# File 'lib/gemerald_beanstalk/job.rb', line 65 def deadline_pending? return state == :deadline_pending end |
#delayed? ⇒ Boolean
70 71 72 |
# File 'lib/gemerald_beanstalk/job.rb', line 70 def delayed? return state == :delayed end |
#delete(connection, *args) ⇒ Object
75 76 77 78 79 |
# File 'lib/gemerald_beanstalk/job.rb', line 75 def delete(connection, *args) return false if RESERVED_STATES.include?(state) && !reserved_by_connection?(connection) @state = :deleted return true end |
#kick(*args) ⇒ Object
100 101 102 103 104 105 106 107 108 |
# File 'lib/gemerald_beanstalk/job.rb', line 100 def kick(*args) return false unless INACTIVE_STATES.include?(state) @state = :ready @stats_hash[:'kicks'] += 1 self.ready_at = Time.now.to_f self.buried_at = nil return true end |
#ready? ⇒ Boolean
111 112 113 |
# File 'lib/gemerald_beanstalk/job.rb', line 111 def ready? return state == :ready end |
#release(connection, priority, delay, increment_stats = true, *args) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/gemerald_beanstalk/job.rb', line 116 def release(connection, priority, delay, increment_stats = true, *args) return false unless reserved_by_connection?(connection) delay = delay.to_i reset_reserve_state @state = delay > 0 ? :delayed : :ready @stats_hash[:'releases'] += 1 if increment_stats self.priority = priority.to_i self.delay = delay self.ready_at = Time.now.to_f + delay return true end |
#reserve(connection, *args) ⇒ Object
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/gemerald_beanstalk/job.rb', line 130 def reserve(connection, *args) return false unless ready? @state = :reserved @stats_hash[:'reserves'] += 1 @reserved_by = connection @reserved_at = Time.now.to_f @timeout_at = @reserved_at + self.ttr return true end |
#reserved_by_connection?(connection) ⇒ Boolean
142 143 144 |
# File 'lib/gemerald_beanstalk/job.rb', line 142 def reserved_by_connection?(connection) return RESERVED_STATES.include?(state) && self.reserved_by == connection ? true : false end |
#reset_reserve_state ⇒ Object
147 148 149 150 151 |
# File 'lib/gemerald_beanstalk/job.rb', line 147 def reset_reserve_state @timeout_at = nil @reserved_at = nil @reserved_by = nil end |
#state ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/gemerald_beanstalk/job.rb', line 154 def state return @state unless UPDATE_STATES.include?(@state) now = Time.now.to_f if @state == :delayed && self.ready_at <= now @state = :ready elsif RESERVED_STATES.include?(@state) # Rescue from timeout being reset by other thread if (now > self.timeout_at rescue false) timed_out elsif (@state == :reserved && now + 1 > self.timeout_at rescue false) deadline_approaching end end return @state end |
#stats ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/gemerald_beanstalk/job.rb', line 173 def stats now = Time.now.to_f current_state = state return { 'id' => self.id, 'tube' => self.tube_name, 'state' => current_state == :deadline_pending ? 'reserved' : current_state.to_s, 'pri' => self.priority, 'age' => (now - self.created_at).to_i, 'delay' => self.delay.to_i, 'ttr' => self.ttr, 'time-left' => time_left(now), 'file' => 0, 'reserves' => @stats_hash[:'reserves'], 'timeouts' => @stats_hash[:'timeouts'], 'releases' => @stats_hash[:'releases'], 'buries' => @stats_hash[:'buries'], 'kicks' => @stats_hash[:'kicks'], } end |
#time_left(current_time = Time.now.to_f) ⇒ Object
195 196 197 198 199 200 201 202 |
# File 'lib/gemerald_beanstalk/job.rb', line 195 def time_left(current_time = Time.now.to_f) if self.timeout_at time_left = self.timeout_at - current_time elsif self.ready_at time_left = self.ready_at - current_time end return time_left.to_i end |
#timed_out(*args) ⇒ Object
Must reference @state to avoid infinite recursion
206 207 208 209 210 211 212 213 214 |
# File 'lib/gemerald_beanstalk/job.rb', line 206 def timed_out(*args) return false unless RESERVED_STATES.include?(@state) @state = :ready @stats_hash[:'timeouts'] += 1 connection = self.reserved_by reset_reserve_state self.beanstalk.register_job_timeout(connection, self) return true end |
#timed_out? ⇒ Boolean
217 218 219 |
# File 'lib/gemerald_beanstalk/job.rb', line 217 def timed_out? return state == :timed_out end |
#touch(connection) ⇒ Object
222 223 224 225 226 227 |
# File 'lib/gemerald_beanstalk/job.rb', line 222 def touch(connection) return false unless reserved_by_connection?(connection) @state = :reserved @timeout_at = Time.now.to_f + self.ttr return true end |