Class: EM::Beanstalk
- Inherits:
-
Object
show all
- Defined in:
- lib/em-beanstalk.rb,
lib/em-beanstalk/job.rb,
lib/em-beanstalk/shell.rb,
lib/em-beanstalk/connection.rb
Defined Under Namespace
Modules: VERSION
Classes: Body, Connection, Job, Shell
Constant Summary
collapse
- Disconnected =
Class.new(RuntimeError)
- InvalidCommand =
Class.new(RuntimeError)
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#add_deferrable(&block) ⇒ Object
-
#close ⇒ Object
-
#connected ⇒ Object
-
#delete(val, &block) ⇒ Object
-
#disconnected ⇒ Object
-
#drain!(&block) ⇒ Object
-
#each_job(timeout = nil, &block) ⇒ Object
-
#extract_body(bytes, data) ⇒ Object
-
#ignore(tube, &block) ⇒ Object
-
#initialize(opts = nil) ⇒ Beanstalk
constructor
A new instance of Beanstalk.
-
#job_id(val) ⇒ Object
-
#list(type = nil, &block) ⇒ Object
-
#on_error(&block) ⇒ Object
-
#peek(id, &block) ⇒ Object
-
#put(msg, opts = nil, &block) ⇒ Object
-
#received(data) ⇒ Object
-
#reconnect ⇒ Object
-
#release(val, opts = nil, &block) ⇒ Object
-
#reserve(timeout = nil, &block) ⇒ Object
-
#stats(type = nil, val = nil, &block) ⇒ Object
-
#use(tube, &block) ⇒ Object
-
#watch(tube, &block) ⇒ Object
Constructor Details
#initialize(opts = nil) ⇒ Beanstalk
Returns a new instance of Beanstalk.
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/em-beanstalk.rb', line 25
def initialize(opts = nil)
@host = opts && opts[:host] || 'localhost'
@port = opts && opts[:port] || 11300
@tube = opts && opts[:tube] || 'default'
@retry_count = opts && opts[:retry_count] || 5
@default_priority = opts && opts[:default_priority] || 65536
@default_delay = opts && opts[:default_delay] || 0
@default_ttr = opts && opts[:default_ttr] || 300
@default_timeout = opts && opts[:timeout] || 5
@default_error_callback = opts && opts[:default_error_callback] || Proc.new{ |error| puts "ERROR: #{error.inspect}" }
@raise_on_disconnect = opts && opts.key?(:raise_on_disconnect) ? opts[:raise_on_disconnect] : true
@watched_tubes = []
@data = ""
@retries = 0
@in_reserve = false
@deferrables = []
@conn = EM::connect(host, port, EM::Beanstalk::Connection) do |conn|
conn.client = self
conn.comm_inactivity_timeout = 0
conn.pending_connect_timeout = @default_timeout
end
if @tube
use(@tube)
watch(@tube)
end
end
|
Instance Attribute Details
#default_delay ⇒ Object
Returns the value of attribute default_delay.
23
24
25
|
# File 'lib/em-beanstalk.rb', line 23
def default_delay
@default_delay
end
|
#default_error_callback ⇒ Object
Returns the value of attribute default_error_callback.
23
24
25
|
# File 'lib/em-beanstalk.rb', line 23
def default_error_callback
@default_error_callback
end
|
#default_priority ⇒ Object
Returns the value of attribute default_priority.
23
24
25
|
# File 'lib/em-beanstalk.rb', line 23
def default_priority
@default_priority
end
|
#default_ttr ⇒ Object
Returns the value of attribute default_ttr.
23
24
25
|
# File 'lib/em-beanstalk.rb', line 23
def default_ttr
@default_ttr
end
|
#host ⇒ Object
Returns the value of attribute host.
22
23
24
|
# File 'lib/em-beanstalk.rb', line 22
def host
@host
end
|
#port ⇒ Object
Returns the value of attribute port.
22
23
24
|
# File 'lib/em-beanstalk.rb', line 22
def port
@port
end
|
Instance Method Details
#add_deferrable(&block) ⇒ Object
203
204
205
206
207
|
# File 'lib/em-beanstalk.rb', line 203
def add_deferrable(&block)
df = Defer.new(default_error_callback, &block)
@deferrables.push(df)
df
end
|
#close ⇒ Object
56
57
58
59
|
# File 'lib/em-beanstalk.rb', line 56
def close
@disconnect_manually = true
@conn.close_connection
end
|
#connected ⇒ Object
185
186
187
|
# File 'lib/em-beanstalk.rb', line 185
def connected
@retries = 0
end
|
#delete(val, &block) ⇒ Object
140
141
142
143
144
|
# File 'lib/em-beanstalk.rb', line 140
def delete(val, &block)
return unless val
@conn.send(:delete, job_id(val))
add_deferrable(&block)
end
|
#disconnected ⇒ Object
189
190
191
192
193
194
195
196
|
# File 'lib/em-beanstalk.rb', line 189
def disconnected
@deferrables.each {|d| d.fail(:disconnected) }
unless @disconnect_manually
raise EM::Beanstalk::Disconnected if @retries >= @retry_count && @raise_on_disconnect
@retries += 1
EM.add_timer(1) { reconnect }
end
end
|
#drain!(&block) ⇒ Object
61
62
63
64
65
66
67
|
# File 'lib/em-beanstalk.rb', line 61
def drain!(&block)
stats do |stats|
stats['current-jobs-ready'].zero? ?
EM.next_tick(&block) :
reserve{|job| job.delete{ drain!(&block) }}
end
end
|
#each_job(timeout = nil, &block) ⇒ Object
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/em-beanstalk.rb', line 100
def each_job(timeout = nil, &block)
work = Proc.new do
r = reserve(timeout)
r.callback do |job|
block.call(job)
EM.next_tick { work.call }
end
end
work.call
end
|
#extract_body(bytes, data) ⇒ Object
276
277
278
279
280
281
282
283
284
285
286
|
# File 'lib/em-beanstalk.rb', line 276
def extract_body(bytes, data)
rem = data[(data.index(/\r\n/) + 2)..-1]
if rem.length < bytes
nil
else
body = rem[0..(bytes - 1)]
data = rem[(bytes + 2)..-1]
data = "" if data.nil?
Body.new(body, data)
end
end
|
#ignore(tube, &block) ⇒ Object
84
85
86
87
88
89
|
# File 'lib/em-beanstalk.rb', line 84
def ignore(tube, &block)
return if not @watched_tubes.include?(tube)
@watched_tubes.delete(tube)
@conn.send(:ignore, tube)
add_deferrable(&block)
end
|
#job_id(val) ⇒ Object
121
122
123
124
125
126
127
128
|
# File 'lib/em-beanstalk.rb', line 121
def job_id(val)
case val
when Job
val.id
else
val
end
end
|
#list(type = nil, &block) ⇒ Object
130
131
132
133
134
135
136
137
138
|
# File 'lib/em-beanstalk.rb', line 130
def list(type = nil, &block)
case(type)
when :tube, :tubes, nil then @conn.send(:'list-tubes')
when :use, :used then @conn.send(:'list-tube-used')
when :watch, :watched then @conn.send(:'list-tubes-watched')
else raise EM::Beanstalk::InvalidCommand.new
end
add_deferrable(&block)
end
|
#on_error(&block) ⇒ Object
209
210
211
|
# File 'lib/em-beanstalk.rb', line 209
def on_error(&block)
@error_callback = block
end
|
#peek(id, &block) ⇒ Object
146
147
148
149
150
151
152
153
154
|
# File 'lib/em-beanstalk.rb', line 146
def peek(id, &block)
case id
when :ready then @conn.send(:'peek-ready')
when :delayed then @conn.send(:'peek-delayed')
when :buried then @conn.send(:'peek-buried')
else @conn.send(:'peek', id)
end
add_deferrable(&block)
end
|
#put(msg, opts = nil, &block) ⇒ Object
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
# File 'lib/em-beanstalk.rb', line 156
def put(msg, opts = nil, &block)
case msg
when Job
priority = opts && opts[:priority] || msg.priority
delay = opts && opts[:delay] || msg.delay
ttr = opts && opts[:ttr] || msg.ttr
body = msg.body
else
priority = opts && opts[:priority] || default_priority
delay = opts && opts[:delay] || default_delay
ttr = opts && opts[:ttr] || default_ttr
body = msg.to_s
end
priority = default_priority if priority < 0
priority = 2 ** 32 if priority > (2 ** 32)
delay = default_delay if delay < 0
ttr = default_ttr if ttr < 0
@conn.send_with_data(:put, body, priority, delay, ttr, body.size)
add_deferrable(&block)
end
|
#received(data) ⇒ Object
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
|
# File 'lib/em-beanstalk.rb', line 213
def received(data)
@data << data
until @data.empty?
idx = @data.index(/(.*?)\r\n/)
break if idx.nil?
first = $1
case (first)
when /^DELETED/
df = @deferrables.shift
df.succeed
when /^INSERTED\s+(\d+)/
df = @deferrables.shift
df.succeed($1.to_i)
when /^RELEASED/
df = @deferrables.shift
df.succeed
when /^BURIED\s+(\d+)/
df = @deferrables.shift
df.fail(:buried, $1.to_i)
when /^USING\s+(.*)/
df = @deferrables.shift
df.succeed($1)
when /^WATCHING\s+(\d+)/
df = @deferrables.shift
df.succeed($1.to_i)
when /^OK\s+(\d+)/
bytes = $1.to_i
if body = extract_body(bytes, @data)
@data = body.data
df = @deferrables.shift
df.succeed(YAML.load(body.body))
next
else
break
end
when /^(RESERVED|FOUND)\s+(\d+)\s+(\d+)/
id = $2.to_i
bytes = $3.to_i
if body = extract_body(bytes, @data)
@data = body.data
df = @deferrables.shift
job = EM::Beanstalk::Job.new(self, id, body.body)
df.succeed(job)
next
else
break
end
when /^(OUT_OF_MEMORY|INTERNAL_ERROR|DRAINING|BAD_FORMAT|UNKNOWN_COMMAND|EXPECTED_CRLF|JOB_TOO_BIG|DEADLINE_SOON|TIMED_OUT|NOT_FOUND)/
puts "... got error, calling df."
df = @deferrables.shift
df.fail($1.downcase.to_sym)
@data = @data[($1.length + 2)..-1]
else
break
end
@data.slice!(0, first.size + 2)
end
end
|
#reconnect ⇒ Object
198
199
200
201
|
# File 'lib/em-beanstalk.rb', line 198
def reconnect
@disconnect_manually = false
@conn.reconnect(host, port)
end
|
#release(val, opts = nil, &block) ⇒ Object
179
180
181
182
183
|
# File 'lib/em-beanstalk.rb', line 179
def release(val, opts = nil, &block)
return if val.nil?
@conn.send(:release, job_id(val), priority.to_i, delay.to_i)
add_deferrable(&block)
end
|
#reserve(timeout = nil, &block) ⇒ Object
91
92
93
94
95
96
97
98
|
# File 'lib/em-beanstalk.rb', line 91
def reserve(timeout = nil, &block)
if timeout
@conn.send(:'reserve-with-timeout', timeout)
else
@conn.send(:reserve)
end
add_deferrable(&block)
end
|
#stats(type = nil, val = nil, &block) ⇒ Object
111
112
113
114
115
116
117
118
119
|
# File 'lib/em-beanstalk.rb', line 111
def stats(type = nil, val = nil, &block)
case(type)
when nil then @conn.send(:stats)
when :tube then @conn.send(:'stats-tube', val)
when :job then @conn.send(:'stats-job', job_id(val))
else raise EM::Beanstalk::InvalidCommand.new
end
add_deferrable(&block)
end
|
#use(tube, &block) ⇒ Object
69
70
71
72
73
74
75
|
# File 'lib/em-beanstalk.rb', line 69
def use(tube, &block)
return if @used_tube == tube
@used_tube = tube
@conn.send(:use, tube)
add_deferrable(&block)
end
|
#watch(tube, &block) ⇒ Object
77
78
79
80
81
82
|
# File 'lib/em-beanstalk.rb', line 77
def watch(tube, &block)
return if @watched_tubes.include?(tube)
@watched_tubes.push(tube)
@conn.send(:watch, tube)
add_deferrable(&block)
end
|