Class: GemeraldBeanstalk::Beanstalk
- Inherits:
-
Object
- Object
- GemeraldBeanstalk::Beanstalk
show all
- Includes:
- BeanstalkHelper
- Defined in:
- lib/gemerald_beanstalk/beanstalk.rb
Constant Summary
GemeraldBeanstalk::BeanstalkHelper::BAD_FORMAT, GemeraldBeanstalk::BeanstalkHelper::BURIED, GemeraldBeanstalk::BeanstalkHelper::CRLF, GemeraldBeanstalk::BeanstalkHelper::DEADLINE_SOON, GemeraldBeanstalk::BeanstalkHelper::DELETED, GemeraldBeanstalk::BeanstalkHelper::EXPECTED_CRLF, GemeraldBeanstalk::BeanstalkHelper::JOB_INACTIVE_STATES, GemeraldBeanstalk::BeanstalkHelper::JOB_RESERVED_STATES, GemeraldBeanstalk::BeanstalkHelper::JOB_TOO_BIG, GemeraldBeanstalk::BeanstalkHelper::KICKED, GemeraldBeanstalk::BeanstalkHelper::NOT_FOUND, GemeraldBeanstalk::BeanstalkHelper::NOT_IGNORED, GemeraldBeanstalk::BeanstalkHelper::PAUSED, GemeraldBeanstalk::BeanstalkHelper::RELEASED, GemeraldBeanstalk::BeanstalkHelper::TIMED_OUT, GemeraldBeanstalk::BeanstalkHelper::TOUCHED, GemeraldBeanstalk::BeanstalkHelper::UNKNOWN_COMMAND
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#bury(connection, job_id, priority, *args) ⇒ Object
protected
-
#delete(connection, job_id = nil, *args) ⇒ Object
protected
-
#ignore(connection, tube_name) ⇒ Object
protected
-
#initialize(address, maximum_job_size = 2**16) ⇒ Beanstalk
constructor
A new instance of Beanstalk.
-
#kick(connection, limit, *args) ⇒ Object
protected
-
#kick_job(connection, job_id = nil, *args) ⇒ Object
protected
-
#list_tube_used(connection) ⇒ Object
protected
-
#list_tubes(connection) ⇒ Object
protected
-
#list_tubes_watched(connection) ⇒ Object
protected
-
#pause_tube(connection, tube_name, delay) ⇒ Object
protected
-
#peek(connection, job_id = nil, *args) ⇒ Object
protected
-
#peek_buried(connection) ⇒ Object
protected
-
#peek_delayed(connection) ⇒ Object
protected
-
#peek_ready(connection) ⇒ Object
protected
-
#put(connection, priority, delay, ttr, bytes, body) ⇒ Object
protected
-
#quit(connection) ⇒ Object
protected
-
#release(connection, job_id, priority, delay) ⇒ Object
protected
-
#reserve(connection, *args) ⇒ Object
protected
-
#reserve_with_timeout(connection, timeout = 0, *args) ⇒ Object
protected
-
#stats(connection) ⇒ Object
protected
-
#stats_job(connection, job_id = nil, *args) ⇒ Object
protected
-
#stats_tube(connection, tube_name) ⇒ Object
protected
-
#touch(connection, job_id = nil, *args) ⇒ Object
protected
-
#use(connection, tube_name) ⇒ Object
protected
-
#watch(connection, tube_name) ⇒ Object
protected
#active_tubes, #adjust_stats_cmd_put, #adjust_stats_key, #cancel_reservations, #connect, #deadline_pending?, #disconnect, #execute, #find_job, #honor_reservations, included, #next_job, #peek_by_state, #peek_message, #register_job_timeout, #reserve_job, #stats_commands, #stats_connections, #try_dispatch, #tube, #tube_list, #update_state, #update_timeouts, #update_waiting, #uptime, #waiting_connections, #yaml_response
Constructor Details
#initialize(address, maximum_job_size = 2**16) ⇒ Beanstalk
Returns a new instance of Beanstalk.
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 12
def initialize(address, maximum_job_size = 2**16)
@max_job_size = maximum_job_size
@address = address
@connections = ThreadSafe::Array.new
@delayed = ThreadSafe::Array.new
@id = SecureRandom.base64(16)
@jobs = GemeraldBeanstalk::Jobs.new
@mutex = Mutex.new
@paused = ThreadSafe::Array.new
@reserved = ThreadSafe::Cache.new {|reserved, key| reserved[key] = [] }
@stats = ThreadSafe::Hash.new(0)
@tubes = ThreadSafe::Cache.new
@up_at = Time.now.to_f
tube('default', :create_if_missing)
end
|
Instance Attribute Details
#address ⇒ Object
Returns the value of attribute address.
9
10
11
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 9
def address
@address
end
|
#max_job_size ⇒ Object
Returns the value of attribute max_job_size.
9
10
11
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 9
def max_job_size
@max_job_size
end
|
Instance Method Details
#bury(connection, job_id, priority, *args) ⇒ Object
32
33
34
35
36
37
38
39
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 32
def bury(connection, job_id, priority, *args)
adjust_stats_key(:'cmd-bury')
job = find_job(job_id, :only => JOB_RESERVED_STATES)
return NOT_FOUND if job.nil? || !job.bury(connection, priority)
@reserved[connection].delete(job)
return BURIED
end
|
#delete(connection, job_id = nil, *args) ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 42
def delete(connection, job_id = nil, *args)
adjust_stats_key(:'cmd-delete')
job = find_job(job_id)
return NOT_FOUND if job.nil?
original_state = job.state
return NOT_FOUND unless job.delete(connection)
tube(job.tube_name).delete(job)
@jobs[job.id - 1] = nil
@reserved[connection].delete(job) if JOB_RESERVED_STATES.include?(original_state)
return DELETED
end
|
#ignore(connection, tube_name) ⇒ Object
58
59
60
61
62
63
64
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 58
def ignore(connection, tube_name)
adjust_stats_key(:'cmd-ignore')
return NOT_IGNORED if (watched_count = connection.ignore(tube_name)).nil?
tube = tube(tube_name)
tube.ignore unless tube.nil?
return "WATCHING #{watched_count}\r\n"
end
|
#kick(connection, limit, *args) ⇒ Object
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 67
def kick(connection, limit, *args)
adjust_stats_key(:'cmd-kick')
limit = limit.to_i
kicked = 0
JOB_INACTIVE_STATES.each do |job_state|
break if kicked >= limit
until (job = tube(connection.tube_used).next_job(job_state, :peek)).nil?
kicked += 1 if job.kick
break if kicked == limit
end
break if kicked > 0
end
return "KICKED #{kicked}\r\n"
end
|
#kick_job(connection, job_id = nil, *args) ⇒ Object
85
86
87
88
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 85
def kick_job(connection, job_id = nil, *args)
job = find_job(job_id, :only => JOB_INACTIVE_STATES)
return (!job.nil? && job.kick) ? KICKED : NOT_FOUND
end
|
#list_tube_used(connection) ⇒ Object
97
98
99
100
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 97
def list_tube_used(connection)
adjust_stats_key(:'cmd-list-tube-used')
return "USING #{connection.tube_used}\r\n"
end
|
#list_tubes(connection) ⇒ Object
91
92
93
94
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 91
def list_tubes(connection)
adjust_stats_key(:'cmd-list-tubes')
return tube_list(active_tubes.keys)
end
|
#list_tubes_watched(connection) ⇒ Object
103
104
105
106
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 103
def list_tubes_watched(connection)
adjust_stats_key(:'cmd-list-tubes-watched')
return tube_list(connection.tubes_watched)
end
|
#pause_tube(connection, tube_name, delay) ⇒ Object
109
110
111
112
113
114
115
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 109
def pause_tube(connection, tube_name, delay)
adjust_stats_key(:'cmd-paue-tube')
return NOT_FOUND if (tube = tube(tube_name)).nil?
tube.pause(delay.to_i % 2**32)
@paused << tube
return PAUSED
end
|
#peek(connection, job_id = nil, *args) ⇒ Object
118
119
120
121
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 118
def peek(connection, job_id = nil, *args)
adjust_stats_key(:'cmd-peek')
return peek_message(find_job(job_id))
end
|
#peek_buried(connection) ⇒ Object
124
125
126
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 124
def peek_buried(connection)
return peek_by_state(connection, :buried)
end
|
#peek_delayed(connection) ⇒ Object
129
130
131
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 129
def peek_delayed(connection)
return peek_by_state(connection, :delayed)
end
|
#peek_ready(connection) ⇒ Object
134
135
136
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 134
def peek_ready(connection)
return peek_by_state(connection, :ready)
end
|
#put(connection, priority, delay, ttr, bytes, body) ⇒ Object
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 139
def put(connection, priority, delay, ttr, bytes, body)
adjust_stats_key(:'cmd-put')
bytes = bytes.to_i
return JOB_TOO_BIG if bytes > @max_job_size
return EXPECTED_CRLF if body.slice!(-2, 2) != CRLF || body.length != bytes
job = nil
@mutex.synchronize do
job = GemeraldBeanstalk::Job.new(self, @jobs.next_id, connection.tube_used, priority, delay, ttr, bytes, body)
@jobs.enqueue(job)
tube(connection.tube_used).put(job)
end
connection.transmit("INSERTED #{job.id}\r\n")
connection.producer = true
case job.state
when :ready
honor_reservations(job)
when :delayed
@delayed << job
end
return nil
end
|
#quit(connection) ⇒ Object
168
169
170
171
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 168
def quit(connection)
disconnect(connection)
return nil
end
|
#release(connection, job_id, priority, delay) ⇒ Object
174
175
176
177
178
179
180
181
182
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 174
def release(connection, job_id, priority, delay)
adjust_stats_key(:'cmd-release')
job = find_job(job_id, :only => JOB_RESERVED_STATES)
return NOT_FOUND if job.nil? || !job.release(connection, priority, delay)
@reserved[connection].delete(job)
@delayed << job if job.delayed?
return RELEASED
end
|
#reserve(connection, *args) ⇒ Object
185
186
187
188
189
190
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 185
def reserve(connection, *args)
adjust_stats_key(:'cmd-reserve')
return BAD_FORMAT unless args.empty?
reserve_job(connection)
return nil
end
|
#reserve_with_timeout(connection, timeout = 0, *args) ⇒ Object
193
194
195
196
197
198
199
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 193
def reserve_with_timeout(connection, timeout = 0, *args)
adjust_stats_key(:'cmd-reserve-with-timeout')
timeout = timeout.to_i
return nil if reserve_job(connection, timeout) || timeout != 0
connection.wait_timed_out
return TIMED_OUT
end
|
#stats(connection) ⇒ Object
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 202
def stats(connection)
adjust_stats_key(:'cmd-stats')
stats = @jobs.counts_by_state.merge(stats_commands).merge({
'job-timeouts' => @stats[:'job-timeouts'],
'total-jobs' => @jobs.total_jobs,
'max-job-size' => @max_job_size,
'current-tubes' => active_tubes.length,
}).merge(stats_connections).merge({
'pid' => Process.pid,
'version' => GemeraldBeanstalk::VERSION,
'rusage-utime' => 0,
'rusage-stime' => 0,
'uptime' => uptime,
'binlog-oldest-index' => 0,
'binlog-current-index' => 0,
'binlog-records-migrated' => 0,
'binlog-records-written' => 0,
'binlog-max-size' => 10485760,
'id' => @id,
'hostname' => Socket.gethostname,
})
return yaml_response(stats.map{|stat, value| "#{stat}: #{value}" })
end
|
#stats_job(connection, job_id = nil, *args) ⇒ Object
227
228
229
230
231
232
233
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 227
def stats_job(connection, job_id = nil, *args)
adjust_stats_key(:'cmd-stats-job')
job = find_job(job_id)
return NOT_FOUND if job.nil?
return yaml_response(job.stats.map{ |stat, value| "#{stat}: #{value}" })
end
|
#stats_tube(connection, tube_name) ⇒ Object
236
237
238
239
240
241
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 236
def stats_tube(connection, tube_name)
adjust_stats_key(:'cmd-stats-tube')
return NOT_FOUND if (tube = tube(tube_name)).nil?
return yaml_response(tube.stats.map{ |stat, value| "#{stat}: #{value}" })
end
|
#touch(connection, job_id = nil, *args) ⇒ Object
244
245
246
247
248
249
250
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 244
def touch(connection, job_id = nil, *args)
adjust_stats_key(:'cmd-touch')
job = find_job(job_id, :only => JOB_RESERVED_STATES)
return NOT_FOUND if job.nil? || !job.touch(connection)
return TOUCHED
end
|
#use(connection, tube_name) ⇒ Object
253
254
255
256
257
258
259
260
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 253
def use(connection, tube_name)
adjust_stats_key(:'cmd-use')
tube(connection.tube_used).stop_use
tube(tube_name, :create_if_missing).use
connection.use(tube_name)
return "USING #{tube_name}\r\n"
end
|
#watch(connection, tube_name) ⇒ Object
263
264
265
266
267
268
269
270
271
272
273
|
# File 'lib/gemerald_beanstalk/beanstalk.rb', line 263
def watch(connection, tube_name)
adjust_stats_key(:'cmd-watch')
if connection.tubes_watched.include?(tube_name)
watched_count = connection.tubes_watched.length
else
tube(tube_name, :create_if_missing).watch
watched_count = connection.watch(tube_name)
end
return "WATCHING #{watched_count}\r\n"
end
|