Module: GemeraldBeanstalk::BeanstalkHelper

Included in:
Beanstalk
Defined in:
lib/gemerald_beanstalk/beanstalk_helper.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

BAD_FORMAT =
"BAD_FORMAT\r\n"
BURIED =
"BURIED\r\n"
CRLF =
"\r\n"
DEADLINE_SOON =
"DEADLINE_SOON\r\n"
DELETED =
"DELETED\r\n"
EXPECTED_CRLF =
"EXPECTED_CRLF\r\n"
JOB_TOO_BIG =
"JOB_TOO_BIG\r\n"
KICKED =
"KICKED\r\n"
NOT_FOUND =
"NOT_FOUND\r\n"
NOT_IGNORED =
"NOT_IGNORED\r\n"
PAUSED =
"PAUSED\r\n"
RELEASED =
"RELEASED\r\n"
TIMED_OUT =
"TIMED_OUT\r\n"
TOUCHED =
"TOUCHED\r\n"
UNKNOWN_COMMAND =
"UNKNOWN_COMMAND\r\n"
JOB_INACTIVE_STATES =
GemeraldBeanstalk::Job::INACTIVE_STATES
JOB_RESERVED_STATES =
GemeraldBeanstalk::Job::RESERVED_STATES

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(beanstalk) ⇒ Object



23
24
25
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 23

def self.included(beanstalk)
  beanstalk.extend(ClassMethods)
end

Instance Method Details

#active_tubesObject (private)



79
80
81
82
83
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 79

def active_tubes
  tubes = {}
  @tubes.each_pair { |tube_name, tube| tubes[tube_name] = tube if tube.active? }
  return tubes
end

#adjust_stats_cmd_putObject

ease handling of odd case where put can return BAD_FORMAT but increment stats



38
39
40
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 38

def adjust_stats_cmd_put
  adjust_stats_key(:'cmd-put')
end

#adjust_stats_key(key, adjustment = 1) ⇒ Object (private)



86
87
88
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 86

def adjust_stats_key(key, adjustment = 1)
  @stats[key] += adjustment
end

#cancel_reservations(connection) ⇒ Object (private)



91
92
93
94
95
96
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 91

def cancel_reservations(connection)
  connection.tubes_watched.each do |tube_name|
    tube(tube_name).cancel_reservation(connection)
  end
  return connection
end

#connect(connection = nil) ⇒ Object



43
44
45
46
47
48
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 43

def connect(connection = nil)
  beanstalk_connection = GemeraldBeanstalk::Connection.new(self, connection)
  @connections << beanstalk_connection
  adjust_stats_key(:'total-connections')
  return beanstalk_connection
end

#deadline_pending?(connection) ⇒ Boolean (private)

Returns:

  • (Boolean)


99
100
101
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 99

def deadline_pending?(connection)
  return @reserved[connection].any?(&:deadline_pending?)
end

#disconnect(connection) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 51

def disconnect(connection)
  connection.close_connection
  tube(connection.tube_used).stop_use
  connection.tubes_watched.each do |watched_tube|
    tube(watched_tube).ignore
    connection.ignore(watched_tube, :force)
  end
  @reserved[connection].each do |job|
    job.release(connection, job.priority, 0, false)
  end
  @reserved.delete(connection)
  @connections.delete(connection)
end

#execute(command) ⇒ Object



66
67
68
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 66

def execute(command)
  return send(command.method_name, *command.arguments)
end

#find_job(job_id, options = {}) ⇒ Object (private)



104
105
106
107
108
109
110
111
112
113
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 104

def find_job(job_id, options = {})
  return unless (job_id = job_id.to_i) > 0
  only = Array(options[:only])
  except = Array(options[:except]).unshift(:deleted)

  job = @jobs[job_id - 1]

  return nil if job.nil? || except.include?(job.state)
  return (only.empty? || only.include?(job.state)) ? job : nil
end

#honor_reservations(job_or_tube) ⇒ Object (private)



116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 116

def honor_reservations(job_or_tube)
  if job_or_tube.is_a?(GemeraldBeanstalk::Job)
    job = job_or_tube
    tube = tube(job.tube_name)
  elsif job_or_tube.is_a?(GemeraldBeanstalk::Tube)
    tube = job_or_tube
    job = tube.next_job
  end

  while job && (next_reservation = tube.next_reservation)
    next unless try_dispatch(next_reservation, job)
    job = tube.next_job
  end
end

#next_job(connection, state = :ready) ⇒ Object (private)



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 132

def next_job(connection, state = :ready)
  best_candidate = nil
  connection.tubes_watched.each do |tube_name|
    candidate = tube(tube_name).next_job(state)
    next if candidate.nil?

    best_candidate = candidate if best_candidate.nil? || candidate < best_candidate
  end

  return best_candidate
end

#peek_by_state(connection, state) ⇒ Object (private)



145
146
147
148
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 145

def peek_by_state(connection, state)
  adjust_stats_key(:"cmd-peek-#{state}")
  return peek_message(tube(connection.tube_used).next_job(state, :peek))
end

#peek_message(job) ⇒ Object (private)



151
152
153
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 151

def peek_message(job)
  job.nil? ? NOT_FOUND : "FOUND #{job.id} #{job.bytes}\r\n#{job.body}\r\n"
end

#register_job_timeout(connection, job) ⇒ Object



71
72
73
74
75
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 71

def register_job_timeout(connection, job)
  @reserved[connection].delete(job)
  adjust_stats_key(:'job-timeouts')
  honor_reservations(job)
end

#reserve_job(connection, timeout = 0) ⇒ Object (private)



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 156

def reserve_job(connection, timeout = 0)
  connection.worker = true

  if deadline_pending?(connection)
    connection.transmit(DEADLINE_SOON)
    return true
  end

  connection.tubes_watched.each do |tube_name|
    tube(tube_name).reserve(connection)
  end
  connection.wait(timeout <= 0 ? nil : Time.now.to_f + timeout)

  dispatched = false
  while !dispatched
    break if (job = next_job(connection)).nil?
    dispatched = try_dispatch(connection, job)
  end

  return dispatched
end

#stats_commandsObject (private)



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 179

def stats_commands
  return {
    'cmd-put' => @stats[:'cmd-put'],
    'cmd-peek' => @stats[:'cmd-peek'],
    'cmd-peek-ready' => @stats[:'cmd-peek-ready'],
    'cmd-peek-delayed' => @stats[:'cmd-peek-delayed'],
    'cmd-peek-buried' => @stats[:'cmd-peek-buried'],
    'cmd-reserve' => @stats[:'cmd-reserve'],
    'cmd-reserve-with-timeout' => @stats[:'cmd-reserve-with-timeout'],
    'cmd-delete' => @stats[:'cmd-delete'],
    'cmd-release' => @stats[:'cmd-release'],
    'cmd-use' => @stats[:'cmd-use'],
    'cmd-watch' => @stats[:'cmd-watch'],
    'cmd-ignore' => @stats[:'cmd-ignore'],
    'cmd-bury' => @stats[:'cmd-bury'],
    'cmd-kick' => @stats[:'cmd-kick'],
    'cmd-touch' => @stats[:'cmd-touch'],
    'cmd-stats' => @stats[:'cmd-stats'],
    'cmd-stats-job' => @stats[:'cmd-stats-job'],
    'cmd-stats-tube' => @stats[:'cmd-stats-tube'],
    'cmd-list-tubes' => @stats[:'cmd-list-tubes'],
    'cmd-list-tube-used' => @stats[:'cmd-list-tube-used'],
    'cmd-list-tubes-watched' => @stats[:'cmd-list-tubes-watched'],
    'cmd-pause-tube' => @stats[:'cmd-pause-tube'],
  }
end

#stats_connectionsObject (private)



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 207

def stats_connections
  conn_stats = {
    'current-connections' => @connections.length,
    'current-producers' => 0,
    'current-workers' => 0,
    'current-waiting' => 0,
    'total-connections' => @stats[:'total-connections']
  }
  @connections.each do |connection|
    conn_stats['current-producers'] += 1 if connection.producer?
    conn_stats['current-waiting'] += 1 if connection.waiting?
    conn_stats['current-workers'] += 1 if connection.worker?
  end
  return conn_stats
end

#try_dispatch(connection, job) ⇒ Object (private)



224
225
226
227
228
229
230
231
232
233
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 224

def try_dispatch(connection, job)
  connection.mutex.synchronize do
    # Make sure connection still waiting and job not claimed
    return false unless connection.waiting? && job.reserve(connection)
    connection.transmit("RESERVED #{job.id} #{job.bytes}\r\n#{job.body}\r\n")
    cancel_reservations(connection)
  end
  @reserved[connection] << job
  return true
end

#tube(tube_name, create_if_missing = false) ⇒ Object (private)



236
237
238
239
240
241
242
243
244
245
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 236

def tube(tube_name, create_if_missing = false)
  tube = @tubes[tube_name]

  return tube unless tube.nil? || tube.deactivated?

  return @tubes[tube_name] = GemeraldBeanstalk::Tube.new(tube_name) if create_if_missing

  @tubes.delete(tube_name) unless tube.nil?
  return nil
end

#tube_list(tube_list) ⇒ Object (private)



248
249
250
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 248

def tube_list(tube_list)
  return yaml_response(tube_list.map { |key| "- #{key}" })
end

#update_stateObject (private)



253
254
255
256
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 253

def update_state
  update_waiting
  update_timeouts
end

#update_timeoutsObject (private)



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 259

def update_timeouts
  @reserved.values.flatten.each(&:state)
  @delayed.keep_if do |job|
    case job.state
    when :delayed
      true
    when :ready
      honor_reservations(job)
      false
    else
      false
    end
  end
  @paused.keep_if do |tube|
    if tube.paused?
      true
    else
      honor_reservations(tube)
      false
    end
  end
end

#update_waitingObject (private)



283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 283

def update_waiting
  waiting_connections.each do |connection|
    if connection.waiting? && deadline_pending?(connection)
      message_for_connection = DEADLINE_SOON
    elsif connection.timed_out?
      message_for_connection = TIMED_OUT
    else
      next
    end

    cancel_reservations(connection)
    connection.transmit(message_for_connection)
  end
end

#uptimeObject (private)



299
300
301
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 299

def uptime
  (Time.now.to_f - @up_at).to_i
end

#waiting_connectionsObject (private)



304
305
306
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 304

def waiting_connections
  return @connections.select {|connection| connection.waiting? || connection.timed_out? }
end

#yaml_response(data) ⇒ Object (private)



309
310
311
312
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 309

def yaml_response(data)
  response = %w[---].concat(data).join("\n")
  return "OK #{response.bytesize}\r\n#{response}\r\n"
end