Class: GemeraldBeanstalk::Beanstalk

Inherits:
Object
  • Object
show all
Includes:
BeanstalkHelper
Defined in:
lib/gemerald_beanstalk/beanstalk.rb

Constant Summary

Constants included from BeanstalkHelper

GemeraldBeanstalk::BeanstalkHelper::BAD_FORMAT, GemeraldBeanstalk::BeanstalkHelper::BURIED, GemeraldBeanstalk::BeanstalkHelper::CRLF, GemeraldBeanstalk::BeanstalkHelper::DEADLINE_SOON, GemeraldBeanstalk::BeanstalkHelper::DELETED, GemeraldBeanstalk::BeanstalkHelper::EXPECTED_CRLF, GemeraldBeanstalk::BeanstalkHelper::JOB_INACTIVE_STATES, GemeraldBeanstalk::BeanstalkHelper::JOB_RESERVED_STATES, GemeraldBeanstalk::BeanstalkHelper::JOB_TOO_BIG, GemeraldBeanstalk::BeanstalkHelper::KICKED, GemeraldBeanstalk::BeanstalkHelper::NOT_FOUND, GemeraldBeanstalk::BeanstalkHelper::NOT_IGNORED, GemeraldBeanstalk::BeanstalkHelper::PAUSED, GemeraldBeanstalk::BeanstalkHelper::RELEASED, GemeraldBeanstalk::BeanstalkHelper::TIMED_OUT, GemeraldBeanstalk::BeanstalkHelper::TOUCHED, GemeraldBeanstalk::BeanstalkHelper::UNKNOWN_COMMAND

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BeanstalkHelper

#active_tubes, #adjust_stats_cmd_put, #adjust_stats_key, #cancel_reservations, #connect, #deadline_pending?, #disconnect, #execute, #find_job, #honor_reservations, included, #next_job, #peek_by_state, #peek_message, #register_job_timeout, #reserve_job, #stats_commands, #stats_connections, #try_dispatch, #tube, #tube_list, #update_state, #update_timeouts, #update_waiting, #uptime, #waiting_connections, #yaml_response

Constructor Details

#initialize(address, maximum_job_size = 2**16) ⇒ Beanstalk

Returns a new instance of Beanstalk.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 12

def initialize(address, maximum_job_size = 2**16)
  @max_job_size = maximum_job_size
  @address = address
  @connections = ThreadSafe::Array.new
  @delayed = ThreadSafe::Array.new
  @id = SecureRandom.base64(16)
  @jobs = GemeraldBeanstalk::Jobs.new
  @mutex = Mutex.new
  @paused = ThreadSafe::Array.new
  @reserved = ThreadSafe::Cache.new {|reserved, key| reserved[key] = [] }
  @stats = ThreadSafe::Hash.new(0)
  @tubes = ThreadSafe::Cache.new
  @up_at = Time.now.to_f

  tube('default', :create_if_missing)
end

Instance Attribute Details

#addressObject (readonly)

Returns the value of attribute address.



9
10
11
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 9

def address
  @address
end

#max_job_sizeObject (readonly)

Returns the value of attribute max_job_size.



9
10
11
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 9

def max_job_size
  @max_job_size
end

Instance Method Details

#bury(connection, job_id, priority, *args) ⇒ Object (protected)



32
33
34
35
36
37
38
39
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 32

def bury(connection, job_id, priority, *args)
  adjust_stats_key(:'cmd-bury')
  job = find_job(job_id, :only => JOB_RESERVED_STATES)
  return NOT_FOUND if job.nil? || !job.bury(connection, priority)

  @reserved[connection].delete(job)
  return BURIED
end

#delete(connection, job_id = nil, *args) ⇒ Object (protected)



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 42

def delete(connection, job_id = nil, *args)
  adjust_stats_key(:'cmd-delete')
  job = find_job(job_id)
  return NOT_FOUND if job.nil?

  original_state = job.state
  return NOT_FOUND unless job.delete(connection)

  tube(job.tube_name).delete(job)
  @jobs[job.id - 1] = nil
  @reserved[connection].delete(job) if JOB_RESERVED_STATES.include?(original_state)

  return DELETED
end

#ignore(connection, tube_name) ⇒ Object (protected)



58
59
60
61
62
63
64
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 58

def ignore(connection, tube_name)
  adjust_stats_key(:'cmd-ignore')
  return NOT_IGNORED if (watched_count = connection.ignore(tube_name)).nil?
  tube = tube(tube_name)
  tube.ignore unless tube.nil?
  return "WATCHING #{watched_count}\r\n"
end

#kick(connection, limit, *args) ⇒ Object (protected)



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 67

def kick(connection, limit, *args)
  adjust_stats_key(:'cmd-kick')
  limit = limit.to_i
  kicked = 0
  JOB_INACTIVE_STATES.each do |job_state|
    # GTE to handle negative limits
    break if kicked >= limit
    until (job = tube(connection.tube_used).next_job(job_state, :peek)).nil?
      kicked += 1 if job.kick
      break if kicked == limit
    end
    break if kicked > 0
  end

  return "KICKED #{kicked}\r\n"
end

#kick_job(connection, job_id = nil, *args) ⇒ Object (protected)



85
86
87
88
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 85

def kick_job(connection, job_id = nil, *args)
  job = find_job(job_id, :only => JOB_INACTIVE_STATES)
  return (!job.nil? && job.kick) ? KICKED : NOT_FOUND
end

#list_tube_used(connection) ⇒ Object (protected)



97
98
99
100
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 97

def list_tube_used(connection)
  adjust_stats_key(:'cmd-list-tube-used')
  return "USING #{connection.tube_used}\r\n"
end

#list_tubes(connection) ⇒ Object (protected)



91
92
93
94
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 91

def list_tubes(connection)
  adjust_stats_key(:'cmd-list-tubes')
  return tube_list(active_tubes.keys)
end

#list_tubes_watched(connection) ⇒ Object (protected)



103
104
105
106
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 103

def list_tubes_watched(connection)
  adjust_stats_key(:'cmd-list-tubes-watched')
  return tube_list(connection.tubes_watched)
end

#pause_tube(connection, tube_name, delay) ⇒ Object (protected)



109
110
111
112
113
114
115
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 109

def pause_tube(connection, tube_name, delay)
  adjust_stats_key(:'cmd-paue-tube')
  return NOT_FOUND if (tube = tube(tube_name)).nil?
  tube.pause(delay.to_i % 2**32)
  @paused << tube
  return PAUSED
end

#peek(connection, job_id = nil, *args) ⇒ Object (protected)



118
119
120
121
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 118

def peek(connection, job_id = nil, *args)
  adjust_stats_key(:'cmd-peek')
  return peek_message(find_job(job_id))
end

#peek_buried(connection) ⇒ Object (protected)



124
125
126
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 124

def peek_buried(connection)
  return peek_by_state(connection, :buried)
end

#peek_delayed(connection) ⇒ Object (protected)



129
130
131
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 129

def peek_delayed(connection)
  return peek_by_state(connection, :delayed)
end

#peek_ready(connection) ⇒ Object (protected)



134
135
136
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 134

def peek_ready(connection)
  return peek_by_state(connection, :ready)
end

#put(connection, priority, delay, ttr, bytes, body) ⇒ Object (protected)



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 139

def put(connection, priority, delay, ttr, bytes, body)
  adjust_stats_key(:'cmd-put')
  bytes = bytes.to_i
  return JOB_TOO_BIG if bytes > @max_job_size
  return EXPECTED_CRLF if body.slice!(-2, 2) != CRLF || body.length != bytes

  job = nil
  # Ensure job insertion order and ID
  @mutex.synchronize do
    job = GemeraldBeanstalk::Job.new(self, @jobs.next_id, connection.tube_used, priority, delay, ttr, bytes, body)
    @jobs.enqueue(job)
    tube(connection.tube_used).put(job)
  end

  # Send async so client doesn't wait while we check if job can be immediately dispatched
  connection.transmit("INSERTED #{job.id}\r\n")

  connection.producer = true

  case job.state
  when :ready
    honor_reservations(job)
  when :delayed
    @delayed << job
  end
  return nil
end

#quit(connection) ⇒ Object (protected)



168
169
170
171
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 168

def quit(connection)
  disconnect(connection)
  return nil
end

#release(connection, job_id, priority, delay) ⇒ Object (protected)



174
175
176
177
178
179
180
181
182
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 174

def release(connection, job_id, priority, delay)
  adjust_stats_key(:'cmd-release')
  job = find_job(job_id, :only => JOB_RESERVED_STATES)
  return NOT_FOUND if job.nil? || !job.release(connection, priority, delay)

  @reserved[connection].delete(job)
  @delayed << job if job.delayed?
  return RELEASED
end

#reserve(connection, *args) ⇒ Object (protected)



185
186
187
188
189
190
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 185

def reserve(connection, *args)
  adjust_stats_key(:'cmd-reserve')
  return BAD_FORMAT unless args.empty?
  reserve_job(connection)
  return nil
end

#reserve_with_timeout(connection, timeout = 0, *args) ⇒ Object (protected)



193
194
195
196
197
198
199
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 193

def reserve_with_timeout(connection, timeout = 0, *args)
  adjust_stats_key(:'cmd-reserve-with-timeout')
  timeout = timeout.to_i
  return nil if reserve_job(connection, timeout) || timeout != 0
  connection.wait_timed_out
  return TIMED_OUT
end

#stats(connection) ⇒ Object (protected)



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 202

def stats(connection)
  adjust_stats_key(:'cmd-stats')
  stats = @jobs.counts_by_state.merge(stats_commands).merge({
    'job-timeouts' => @stats[:'job-timeouts'],
    'total-jobs' => @jobs.total_jobs,
    'max-job-size' => @max_job_size,
    'current-tubes' => active_tubes.length,
  }).merge(stats_connections).merge({
    'pid' => Process.pid,
    'version' => GemeraldBeanstalk::VERSION,
    'rusage-utime' => 0,
    'rusage-stime' => 0,
    'uptime' => uptime,
    'binlog-oldest-index' => 0,
    'binlog-current-index' => 0,
    'binlog-records-migrated' => 0,
    'binlog-records-written' => 0,
    'binlog-max-size' => 10485760,
    'id' => @id,
    'hostname' => Socket.gethostname,
  })
  return yaml_response(stats.map{|stat, value| "#{stat}: #{value}" })
end

#stats_job(connection, job_id = nil, *args) ⇒ Object (protected)



227
228
229
230
231
232
233
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 227

def stats_job(connection, job_id = nil, *args)
  adjust_stats_key(:'cmd-stats-job')
  job = find_job(job_id)
  return NOT_FOUND if job.nil?

  return yaml_response(job.stats.map{ |stat, value| "#{stat}: #{value}" })
end

#stats_tube(connection, tube_name) ⇒ Object (protected)



236
237
238
239
240
241
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 236

def stats_tube(connection, tube_name)
  adjust_stats_key(:'cmd-stats-tube')
  return NOT_FOUND if (tube = tube(tube_name)).nil?

  return yaml_response(tube.stats.map{ |stat, value| "#{stat}: #{value}" })
end

#touch(connection, job_id = nil, *args) ⇒ Object (protected)



244
245
246
247
248
249
250
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 244

def touch(connection, job_id = nil, *args)
  adjust_stats_key(:'cmd-touch')
  job = find_job(job_id, :only => JOB_RESERVED_STATES)
  return NOT_FOUND if job.nil? || !job.touch(connection)

  return TOUCHED
end

#use(connection, tube_name) ⇒ Object (protected)



253
254
255
256
257
258
259
260
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 253

def use(connection, tube_name)
  adjust_stats_key(:'cmd-use')
  tube(connection.tube_used).stop_use
  tube(tube_name, :create_if_missing).use
  connection.use(tube_name)

  return "USING #{tube_name}\r\n"
end

#watch(connection, tube_name) ⇒ Object (protected)



263
264
265
266
267
268
269
270
271
272
273
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 263

def watch(connection, tube_name)
  adjust_stats_key(:'cmd-watch')
  if connection.tubes_watched.include?(tube_name)
    watched_count = connection.tubes_watched.length
  else
    tube(tube_name, :create_if_missing).watch
    watched_count = connection.watch(tube_name)
  end

  return "WATCHING #{watched_count}\r\n"
end