Class: EM::Beanstalk

Inherits:
Object
  • Object
show all
Defined in:
lib/em-beanstalk.rb,
lib/em-beanstalk/job.rb,
lib/em-beanstalk/shell.rb,
lib/em-beanstalk/connection.rb

Defined Under Namespace

Modules: VERSION Classes: Body, Connection, Job, Shell

Constant Summary collapse

Disconnected =
Class.new(RuntimeError)
InvalidCommand =
Class.new(RuntimeError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = nil) ⇒ Beanstalk

Returns a new instance of Beanstalk.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/em-beanstalk.rb', line 25

def initialize(opts = nil)
  @host                   = opts && opts[:host] || 'localhost'
  @port                   = opts && opts[:port] || 11300
  @tube                   = opts && opts[:tube] || 'default'
  @retry_count            = opts && opts[:retry_count] || 5
  @default_priority       = opts && opts[:default_priority] || 65536
  @default_delay          = opts && opts[:default_delay] || 0
  @default_ttr            = opts && opts[:default_ttr] || 300
  @default_timeout        = opts && opts[:timeout] || 5
  @default_error_callback = opts && opts[:default_error_callback] || Proc.new{ |error| puts "ERROR: #{error.inspect}" }
  @raise_on_disconnect    = opts && opts.key?(:raise_on_disconnect) ? opts[:raise_on_disconnect] : true
  
  @watched_tubes          = []

  @data = ""
  @retries = 0
  @in_reserve = false
  @deferrables = []

  @conn = EM::connect(host, port, EM::Beanstalk::Connection) do |conn|
    conn.client = self
    conn.comm_inactivity_timeout = 0
    conn.pending_connect_timeout = @default_timeout
  end

  if @tube
    use(@tube)
    watch(@tube)
  end
end

Instance Attribute Details

#default_delayObject (readonly)

Returns the value of attribute default_delay.



23
24
25
# File 'lib/em-beanstalk.rb', line 23

def default_delay
  @default_delay
end

#default_error_callbackObject (readonly)

Returns the value of attribute default_error_callback.



23
24
25
# File 'lib/em-beanstalk.rb', line 23

def default_error_callback
  @default_error_callback
end

#default_priorityObject (readonly)

Returns the value of attribute default_priority.



23
24
25
# File 'lib/em-beanstalk.rb', line 23

def default_priority
  @default_priority
end

#default_ttrObject (readonly)

Returns the value of attribute default_ttr.



23
24
25
# File 'lib/em-beanstalk.rb', line 23

def default_ttr
  @default_ttr
end

#hostObject

Returns the value of attribute host.



22
23
24
# File 'lib/em-beanstalk.rb', line 22

def host
  @host
end

#portObject

Returns the value of attribute port.



22
23
24
# File 'lib/em-beanstalk.rb', line 22

def port
  @port
end

Instance Method Details

#add_deferrable(&block) ⇒ Object



203
204
205
206
207
# File 'lib/em-beanstalk.rb', line 203

def add_deferrable(&block)
  df = Defer.new(default_error_callback, &block)
  @deferrables.push(df)
  df
end

#closeObject



56
57
58
59
# File 'lib/em-beanstalk.rb', line 56

def close
  @disconnect_manually = true
  @conn.close_connection
end

#connectedObject



185
186
187
# File 'lib/em-beanstalk.rb', line 185

def connected
  @retries = 0
end

#delete(val, &block) ⇒ Object



140
141
142
143
144
# File 'lib/em-beanstalk.rb', line 140

def delete(val, &block)
  return unless val
  @conn.send(:delete, job_id(val))
  add_deferrable(&block)
end

#disconnectedObject



189
190
191
192
193
194
195
196
# File 'lib/em-beanstalk.rb', line 189

def disconnected
  @deferrables.each {|d| d.fail(:disconnected) }
  unless @disconnect_manually
    raise EM::Beanstalk::Disconnected if @retries >= @retry_count && @raise_on_disconnect
    @retries += 1
    EM.add_timer(1) { reconnect }
  end
end

#drain!(&block) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/em-beanstalk.rb', line 61

def drain!(&block)
  stats do |stats|
    stats['current-jobs-ready'].zero? ?
      EM.next_tick(&block) :
      reserve{|job| job.delete{ drain!(&block) }}
  end
end

#each_job(timeout = nil, &block) ⇒ Object



100
101
102
103
104
105
106
107
108
109
# File 'lib/em-beanstalk.rb', line 100

def each_job(timeout = nil, &block)
  work = Proc.new do
    r = reserve(timeout)
    r.callback do |job|
      block.call(job)
      EM.next_tick { work.call }
    end
  end
  work.call
end

#extract_body(bytes, data) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
# File 'lib/em-beanstalk.rb', line 276

def extract_body(bytes, data)
  rem = data[(data.index(/\r\n/) + 2)..-1]
  if rem.length < bytes
    nil
  else
    body = rem[0..(bytes - 1)]
    data = rem[(bytes + 2)..-1]
    data = "" if data.nil?
    Body.new(body, data)
  end
end

#ignore(tube, &block) ⇒ Object



84
85
86
87
88
89
# File 'lib/em-beanstalk.rb', line 84

def ignore(tube, &block)
  return if not @watched_tubes.include?(tube)
  @watched_tubes.delete(tube)
  @conn.send(:ignore, tube)
  add_deferrable(&block)
end

#job_id(val) ⇒ Object



121
122
123
124
125
126
127
128
# File 'lib/em-beanstalk.rb', line 121

def job_id(val)
  case val
  when Job
    val.id
  else
    val
  end
end

#list(type = nil, &block) ⇒ Object



130
131
132
133
134
135
136
137
138
# File 'lib/em-beanstalk.rb', line 130

def list(type = nil, &block)
  case(type)
  when :tube, :tubes, nil then  @conn.send(:'list-tubes')
  when :use, :used        then  @conn.send(:'list-tube-used')
  when :watch, :watched   then  @conn.send(:'list-tubes-watched')
  else                          raise EM::Beanstalk::InvalidCommand.new
  end
  add_deferrable(&block)
end

#on_error(&block) ⇒ Object



209
210
211
# File 'lib/em-beanstalk.rb', line 209

def on_error(&block)
  @error_callback = block
end

#peek(id, &block) ⇒ Object



146
147
148
149
150
151
152
153
154
# File 'lib/em-beanstalk.rb', line 146

def peek(id, &block)
  case id
  when :ready   then  @conn.send(:'peek-ready')
  when :delayed then  @conn.send(:'peek-delayed')
  when :buried  then  @conn.send(:'peek-buried')
  else                @conn.send(:'peek', id)
  end
  add_deferrable(&block)
end

#put(msg, opts = nil, &block) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/em-beanstalk.rb', line 156

def put(msg, opts = nil, &block)
  case msg
  when Job
    priority = opts && opts[:priority] || msg.priority
    delay = opts && opts[:delay] || msg.delay
    ttr = opts && opts[:ttr] || msg.ttr
    body = msg.body
  else
    priority = opts && opts[:priority] || default_priority
    delay = opts && opts[:delay] || default_delay
    ttr = opts && opts[:ttr] || default_ttr
    body = msg.to_s
  end

  priority = default_priority if priority < 0
  priority = 2 ** 32 if priority > (2 ** 32)
  delay = default_delay if delay < 0
  ttr = default_ttr if ttr < 0

  @conn.send_with_data(:put, body, priority, delay, ttr, body.size)
  add_deferrable(&block)
end

#received(data) ⇒ Object



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/em-beanstalk.rb', line 213

def received(data)
  @data << data
  
  until @data.empty?
    idx = @data.index(/(.*?)\r\n/)
    break if idx.nil?

    first = $1
  
    case (first)
    when /^DELETED/
      df = @deferrables.shift
      df.succeed
    when /^INSERTED\s+(\d+)/
      df = @deferrables.shift
      df.succeed($1.to_i)
    when /^RELEASED/
      df = @deferrables.shift
      df.succeed
    when /^BURIED\s+(\d+)/
      df = @deferrables.shift
      df.fail(:buried, $1.to_i)
    when /^USING\s+(.*)/
      df = @deferrables.shift
      df.succeed($1)
    when /^WATCHING\s+(\d+)/
      df = @deferrables.shift
      df.succeed($1.to_i)
    when /^OK\s+(\d+)/
      bytes = $1.to_i
      if body = extract_body(bytes, @data)
        @data = body.data
        df = @deferrables.shift
        df.succeed(YAML.load(body.body))
        next
      else
        break
      end
    when /^(RESERVED|FOUND)\s+(\d+)\s+(\d+)/
      id = $2.to_i
      bytes = $3.to_i
      if body = extract_body(bytes, @data)
        @data = body.data
        df = @deferrables.shift
        job = EM::Beanstalk::Job.new(self, id, body.body)
        df.succeed(job)
        next
      else
        break
      end
    # error state
    when /^(OUT_OF_MEMORY|INTERNAL_ERROR|DRAINING|BAD_FORMAT|UNKNOWN_COMMAND|EXPECTED_CRLF|JOB_TOO_BIG|DEADLINE_SOON|TIMED_OUT|NOT_FOUND)/
      puts "... got error, calling df."
      df = @deferrables.shift
      df.fail($1.downcase.to_sym)
      @data = @data[($1.length + 2)..-1]
    else
      break
    end
    @data.slice!(0, first.size + 2)
  end
end

#reconnectObject



198
199
200
201
# File 'lib/em-beanstalk.rb', line 198

def reconnect
  @disconnect_manually = false
  @conn.reconnect(host, port)
end

#release(val, opts = nil, &block) ⇒ Object



179
180
181
182
183
# File 'lib/em-beanstalk.rb', line 179

def release(val, opts = nil, &block)
  return if val.nil?
  @conn.send(:release, job_id(val), priority.to_i, delay.to_i)
  add_deferrable(&block)
end

#reserve(timeout = nil, &block) ⇒ Object



91
92
93
94
95
96
97
98
# File 'lib/em-beanstalk.rb', line 91

def reserve(timeout = nil, &block)
  if timeout
    @conn.send(:'reserve-with-timeout', timeout)
  else
    @conn.send(:reserve)
  end
  add_deferrable(&block)
end

#stats(type = nil, val = nil, &block) ⇒ Object



111
112
113
114
115
116
117
118
119
# File 'lib/em-beanstalk.rb', line 111

def stats(type = nil, val = nil, &block)
  case(type)
  when nil        then @conn.send(:stats)
  when :tube      then @conn.send(:'stats-tube', val)
  when :job       then @conn.send(:'stats-job', job_id(val))
  else                 raise EM::Beanstalk::InvalidCommand.new
  end
  add_deferrable(&block)
end

#use(tube, &block) ⇒ Object



69
70
71
72
73
74
75
# File 'lib/em-beanstalk.rb', line 69

def use(tube, &block)
  return if @used_tube == tube
  @used_tube = tube
  @conn.send(:use, tube)
  add_deferrable(&block)

end

#watch(tube, &block) ⇒ Object



77
78
79
80
81
82
# File 'lib/em-beanstalk.rb', line 77

def watch(tube, &block)
  return if @watched_tubes.include?(tube)
  @watched_tubes.push(tube)
  @conn.send(:watch, tube)
  add_deferrable(&block)
end