Class: GemeraldBeanstalk::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/gemerald_beanstalk/job.rb

Constant Summary collapse

MAX_JOB_PRIORITY =
2**32
INACTIVE_STATES =
[:buried, :delayed]
RESERVED_STATES =
[:deadline_pending, :reserved]
UPDATE_STATES =
[:deadline_pending, :delayed, :reserved]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(beanstalk, id, tube_name, priority, delay, ttr, bytes, body) ⇒ Job

Returns a new instance of Job.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/gemerald_beanstalk/job.rb', line 82

def initialize(beanstalk, id, tube_name, priority, delay, ttr, bytes, body)
  priority, delay, ttr = priority.to_i, delay.to_i, ttr.to_i
  @beanstalk = beanstalk
  @stats_hash = Hash.new(0)
  self.id = id
  self.tube_name = tube_name
  self.priority = priority % MAX_JOB_PRIORITY
  self.delay = delay
  self.ttr = ttr == 0 ? 1 : ttr
  self.bytes = bytes
  self.body = body
  self.created_at = Time.now.to_f
  self.ready_at = self.created_at + delay

  @state = delay > 0 ? :delayed : :ready
end

Instance Attribute Details

#beanstalkObject (readonly)

Returns the value of attribute beanstalk.



9
10
11
# File 'lib/gemerald_beanstalk/job.rb', line 9

def beanstalk
  @beanstalk
end

#bodyObject

Returns the value of attribute body.



10
11
12
# File 'lib/gemerald_beanstalk/job.rb', line 10

def body
  @body
end

#buried_atObject

Returns the value of attribute buried_at.



10
11
12
# File 'lib/gemerald_beanstalk/job.rb', line 10

def buried_at
  @buried_at
end

#bytesObject

Returns the value of attribute bytes.



10
11
12
# File 'lib/gemerald_beanstalk/job.rb', line 10

def bytes
  @bytes
end

#created_atObject

Returns the value of attribute created_at.



10
11
12
# File 'lib/gemerald_beanstalk/job.rb', line 10

def created_at
  @created_at
end

#delayObject

Returns the value of attribute delay.



10
11
12
# File 'lib/gemerald_beanstalk/job.rb', line 10

def delay
  @delay
end

#idObject

Returns the value of attribute id.



10
11
12
# File 'lib/gemerald_beanstalk/job.rb', line 10

def id
  @id
end

#priorityObject

Returns the value of attribute priority.



10
11
12
# File 'lib/gemerald_beanstalk/job.rb', line 10

def priority
  @priority
end

#ready_atObject

Returns the value of attribute ready_at.



10
11
12
# File 'lib/gemerald_beanstalk/job.rb', line 10

def ready_at
  @ready_at
end

#reserved_atObject (readonly)

Returns the value of attribute reserved_at.



9
10
11
# File 'lib/gemerald_beanstalk/job.rb', line 9

def reserved_at
  @reserved_at
end

#reserved_byObject (readonly)

Returns the value of attribute reserved_by.



9
10
11
# File 'lib/gemerald_beanstalk/job.rb', line 9

def reserved_by
  @reserved_by
end

#timeout_atObject (readonly)

Returns the value of attribute timeout_at.



9
10
11
# File 'lib/gemerald_beanstalk/job.rb', line 9

def timeout_at
  @timeout_at
end

#ttrObject

Returns the value of attribute ttr.



10
11
12
# File 'lib/gemerald_beanstalk/job.rb', line 10

def ttr
  @ttr
end

#tube_nameObject

Returns the value of attribute tube_name.



10
11
12
# File 'lib/gemerald_beanstalk/job.rb', line 10

def tube_name
  @tube_name
end

Instance Method Details

#<(other_job) ⇒ Object



14
15
16
# File 'lib/gemerald_beanstalk/job.rb', line 14

def <(other_job)
  return (self <=> other_job) == -1
end

#<=>(other_job) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/gemerald_beanstalk/job.rb', line 19

def <=>(other_job)
  raise 'Cannot compare job with nil' if other_job.nil?
  current_state = state
  raise 'Cannot compare jobs with different states' if current_state != other_job.state

  case current_state
  when :ready
    return -1 if self.priority < other_job.priority ||
      self.priority == other_job.priority && self.created_at < other_job.created_at
  when :delayed
    return -1 if self.ready_at < other_job.ready_at
  when :buried
    return -1 if self.buried_at < other_job.buried_at
  else
    raise "Cannot compare job with state of #{current_state}"
  end
  return 1
end

#buried?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/gemerald_beanstalk/job.rb', line 39

def buried?
  return state == :buried
end

#bury(connection, priority, *args) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/gemerald_beanstalk/job.rb', line 44

def bury(connection, priority, *args)
  return false unless reserved_by_connection?(connection)

  reset_reserve_state
  @state = :buried
  @stats_hash[:'buries'] += 1
  self.priority = priority.to_i
  self.buried_at = Time.now.to_f
  self.ready_at = nil
  return true
end

#deadline_approaching(*args) ⇒ Object

Must look at @state to avoid infinite recursion



58
59
60
61
62
# File 'lib/gemerald_beanstalk/job.rb', line 58

def deadline_approaching(*args)
  return false unless @state == :reserved
  @state = :deadline_pending
  return true
end

#deadline_pending?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/gemerald_beanstalk/job.rb', line 65

def deadline_pending?
  return state == :deadline_pending
end

#delayed?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/gemerald_beanstalk/job.rb', line 70

def delayed?
  return state == :delayed
end

#delete(connection, *args) ⇒ Object



75
76
77
78
79
# File 'lib/gemerald_beanstalk/job.rb', line 75

def delete(connection, *args)
  return false if RESERVED_STATES.include?(state) && !reserved_by_connection?(connection)
  @state = :deleted
  return true
end

#kick(*args) ⇒ Object



100
101
102
103
104
105
106
107
108
# File 'lib/gemerald_beanstalk/job.rb', line 100

def kick(*args)
  return false unless INACTIVE_STATES.include?(state)

  @state = :ready
  @stats_hash[:'kicks'] += 1
  self.ready_at = Time.now.to_f
  self.buried_at = nil
  return true
end

#ready?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/gemerald_beanstalk/job.rb', line 111

def ready?
  return state == :ready
end

#release(connection, priority, delay, increment_stats = true, *args) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/gemerald_beanstalk/job.rb', line 116

def release(connection, priority, delay, increment_stats = true, *args)
  return false unless reserved_by_connection?(connection)

  delay = delay.to_i
  reset_reserve_state
  @state = delay > 0 ? :delayed : :ready
  @stats_hash[:'releases'] += 1 if increment_stats
  self.priority = priority.to_i
  self.delay = delay
  self.ready_at = Time.now.to_f + delay
  return true
end

#reserve(connection, *args) ⇒ Object



130
131
132
133
134
135
136
137
138
139
# File 'lib/gemerald_beanstalk/job.rb', line 130

def reserve(connection, *args)
  return false unless ready?

  @state = :reserved
  @stats_hash[:'reserves'] += 1
  @reserved_by = connection
  @reserved_at = Time.now.to_f
  @timeout_at = @reserved_at + self.ttr
  return true
end

#reserved_by_connection?(connection) ⇒ Boolean

Returns:

  • (Boolean)


142
143
144
# File 'lib/gemerald_beanstalk/job.rb', line 142

def reserved_by_connection?(connection)
  return RESERVED_STATES.include?(state) && self.reserved_by == connection ? true : false
end

#reset_reserve_stateObject



147
148
149
150
151
# File 'lib/gemerald_beanstalk/job.rb', line 147

def reset_reserve_state
  @timeout_at = nil
  @reserved_at = nil
  @reserved_by = nil
end

#stateObject



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/gemerald_beanstalk/job.rb', line 154

def state
  return @state unless UPDATE_STATES.include?(@state)

  now = Time.now.to_f
  if @state == :delayed && self.ready_at <= now
    @state = :ready
  elsif RESERVED_STATES.include?(@state)
    # Rescue from timeout being reset by other thread
    if (now > self.timeout_at rescue false)
      timed_out
    elsif (@state == :reserved && now + 1 > self.timeout_at rescue false)
      deadline_approaching
    end
  end

  return @state
end

#statsObject



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/gemerald_beanstalk/job.rb', line 173

def stats
  now = Time.now.to_f
  current_state = state
  return {
    'id' => self.id,
    'tube' => self.tube_name,
    'state' => current_state == :deadline_pending ? 'reserved' : current_state.to_s,
    'pri' => self.priority,
    'age' => (now - self.created_at).to_i,
    'delay' => self.delay.to_i,
    'ttr' => self.ttr,
    'time-left' => time_left(now),
    'file' => 0,
    'reserves' => @stats_hash[:'reserves'],
    'timeouts' => @stats_hash[:'timeouts'],
    'releases' => @stats_hash[:'releases'],
    'buries' => @stats_hash[:'buries'],
    'kicks' => @stats_hash[:'kicks'],
  }
end

#time_left(current_time = Time.now.to_f) ⇒ Object



195
196
197
198
199
200
201
202
# File 'lib/gemerald_beanstalk/job.rb', line 195

def time_left(current_time = Time.now.to_f)
  if self.timeout_at
    time_left = self.timeout_at - current_time
  elsif self.ready_at
    time_left = self.ready_at - current_time
  end
  return time_left.to_i
end

#timed_out(*args) ⇒ Object

Must reference @state to avoid infinite recursion



206
207
208
209
210
211
212
213
214
# File 'lib/gemerald_beanstalk/job.rb', line 206

def timed_out(*args)
  return false unless RESERVED_STATES.include?(@state)
  @state = :ready
  @stats_hash[:'timeouts'] += 1
  connection = self.reserved_by
  reset_reserve_state
  self.beanstalk.register_job_timeout(connection, self)
  return true
end

#timed_out?Boolean

Returns:

  • (Boolean)


217
218
219
# File 'lib/gemerald_beanstalk/job.rb', line 217

def timed_out?
  return state == :timed_out
end

#touch(connection) ⇒ Object



222
223
224
225
226
227
# File 'lib/gemerald_beanstalk/job.rb', line 222

def touch(connection)
  return false unless reserved_by_connection?(connection)
  @state = :reserved
  @timeout_at = Time.now.to_f + self.ttr
  return true
end