Module: GemeraldBeanstalk::BeanstalkHelper
- Included in:
- Beanstalk
- Defined in:
- lib/gemerald_beanstalk/beanstalk_helper.rb
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- BAD_FORMAT =
"BAD_FORMAT\r\n"
- BURIED =
"BURIED\r\n"
- CRLF =
"\r\n"
- DEADLINE_SOON =
"DEADLINE_SOON\r\n"
- DELETED =
"DELETED\r\n"
- EXPECTED_CRLF =
"EXPECTED_CRLF\r\n"
- JOB_TOO_BIG =
"JOB_TOO_BIG\r\n"
- KICKED =
"KICKED\r\n"
- NOT_FOUND =
"NOT_FOUND\r\n"
- NOT_IGNORED =
"NOT_IGNORED\r\n"
- PAUSED =
"PAUSED\r\n"
- RELEASED =
"RELEASED\r\n"
- TIMED_OUT =
"TIMED_OUT\r\n"
- TOUCHED =
"TOUCHED\r\n"
- UNKNOWN_COMMAND =
"UNKNOWN_COMMAND\r\n"
- JOB_INACTIVE_STATES =
GemeraldBeanstalk::Job::INACTIVE_STATES
- JOB_RESERVED_STATES =
GemeraldBeanstalk::Job::RESERVED_STATES
Class Method Summary collapse
Instance Method Summary collapse
- #active_tubes ⇒ Object private
-
#adjust_stats_cmd_put ⇒ Object
ease handling of odd case where put can return BAD_FORMAT but increment stats.
- #adjust_stats_key(key, adjustment = 1) ⇒ Object private
- #cancel_reservations(connection) ⇒ Object private
- #connect(connection = nil) ⇒ Object
- #deadline_pending?(connection) ⇒ Boolean private
- #disconnect(connection) ⇒ Object
- #execute(command) ⇒ Object
- #find_job(job_id, options = {}) ⇒ Object private
- #honor_reservations(job_or_tube) ⇒ Object private
- #next_job(connection, state = :ready) ⇒ Object private
- #peek_by_state(connection, state) ⇒ Object private
- #peek_message(job) ⇒ Object private
- #register_job_timeout(connection, job) ⇒ Object
- #reserve_job(connection, timeout = 0) ⇒ Object private
- #stats_commands ⇒ Object private
- #stats_connections ⇒ Object private
- #try_dispatch(connection, job) ⇒ Object private
- #tube(tube_name, create_if_missing = false) ⇒ Object private
- #tube_list(tube_list) ⇒ Object private
- #update_state ⇒ Object private
- #update_timeouts ⇒ Object private
- #update_waiting ⇒ Object private
- #uptime ⇒ Object private
- #waiting_connections ⇒ Object private
- #yaml_response(data) ⇒ Object private
Class Method Details
.included(beanstalk) ⇒ Object
23 24 25 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 23 def self.included(beanstalk) beanstalk.extend(ClassMethods) end |
Instance Method Details
#active_tubes ⇒ Object (private)
79 80 81 82 83 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 79 def active_tubes tubes = {} @tubes.each_pair { |tube_name, tube| tubes[tube_name] = tube if tube.active? } return tubes end |
#adjust_stats_cmd_put ⇒ Object
ease handling of odd case where put can return BAD_FORMAT but increment stats
38 39 40 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 38 def adjust_stats_cmd_put adjust_stats_key(:'cmd-put') end |
#adjust_stats_key(key, adjustment = 1) ⇒ Object (private)
86 87 88 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 86 def adjust_stats_key(key, adjustment = 1) @stats[key] += adjustment end |
#cancel_reservations(connection) ⇒ Object (private)
91 92 93 94 95 96 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 91 def cancel_reservations(connection) connection.tubes_watched.each do |tube_name| tube(tube_name).cancel_reservation(connection) end return connection end |
#connect(connection = nil) ⇒ Object
43 44 45 46 47 48 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 43 def connect(connection = nil) beanstalk_connection = GemeraldBeanstalk::Connection.new(self, connection) @connections << beanstalk_connection adjust_stats_key(:'total-connections') return beanstalk_connection end |
#deadline_pending?(connection) ⇒ Boolean (private)
99 100 101 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 99 def deadline_pending?(connection) return @reserved[connection].any?(&:deadline_pending?) end |
#disconnect(connection) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 51 def disconnect(connection) connection.close_connection tube(connection.tube_used).stop_use connection.tubes_watched.each do |watched_tube| tube(watched_tube).ignore connection.ignore(watched_tube, :force) end @reserved[connection].each do |job| job.release(connection, job.priority, 0, false) end @reserved.delete(connection) @connections.delete(connection) end |
#execute(command) ⇒ Object
66 67 68 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 66 def execute(command) return send(command.method_name, *command.arguments) end |
#find_job(job_id, options = {}) ⇒ Object (private)
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 104 def find_job(job_id, = {}) return unless (job_id = job_id.to_i) > 0 only = Array([:only]) except = Array([:except]).unshift(:deleted) job = @jobs[job_id - 1] return nil if job.nil? || except.include?(job.state) return (only.empty? || only.include?(job.state)) ? job : nil end |
#honor_reservations(job_or_tube) ⇒ Object (private)
116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 116 def honor_reservations(job_or_tube) if job_or_tube.is_a?(GemeraldBeanstalk::Job) job = job_or_tube tube = tube(job.tube_name) elsif job_or_tube.is_a?(GemeraldBeanstalk::Tube) tube = job_or_tube job = tube.next_job end while job && (next_reservation = tube.next_reservation) next unless try_dispatch(next_reservation, job) job = tube.next_job end end |
#next_job(connection, state = :ready) ⇒ Object (private)
132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 132 def next_job(connection, state = :ready) best_candidate = nil connection.tubes_watched.each do |tube_name| candidate = tube(tube_name).next_job(state) next if candidate.nil? best_candidate = candidate if best_candidate.nil? || candidate < best_candidate end return best_candidate end |
#peek_by_state(connection, state) ⇒ Object (private)
145 146 147 148 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 145 def peek_by_state(connection, state) adjust_stats_key(:"cmd-peek-#{state}") return (tube(connection.tube_used).next_job(state, :peek)) end |
#peek_message(job) ⇒ Object (private)
151 152 153 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 151 def (job) job.nil? ? NOT_FOUND : "FOUND #{job.id} #{job.bytes}\r\n#{job.body}\r\n" end |
#register_job_timeout(connection, job) ⇒ Object
71 72 73 74 75 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 71 def register_job_timeout(connection, job) @reserved[connection].delete(job) adjust_stats_key(:'job-timeouts') honor_reservations(job) end |
#reserve_job(connection, timeout = 0) ⇒ Object (private)
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 156 def reserve_job(connection, timeout = 0) connection.worker = true if deadline_pending?(connection) connection.transmit(DEADLINE_SOON) return true end connection.tubes_watched.each do |tube_name| tube(tube_name).reserve(connection) end connection.wait(timeout <= 0 ? nil : Time.now.to_f + timeout) dispatched = false while !dispatched break if (job = next_job(connection)).nil? dispatched = try_dispatch(connection, job) end return dispatched end |
#stats_commands ⇒ Object (private)
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 179 def stats_commands return { 'cmd-put' => @stats[:'cmd-put'], 'cmd-peek' => @stats[:'cmd-peek'], 'cmd-peek-ready' => @stats[:'cmd-peek-ready'], 'cmd-peek-delayed' => @stats[:'cmd-peek-delayed'], 'cmd-peek-buried' => @stats[:'cmd-peek-buried'], 'cmd-reserve' => @stats[:'cmd-reserve'], 'cmd-reserve-with-timeout' => @stats[:'cmd-reserve-with-timeout'], 'cmd-delete' => @stats[:'cmd-delete'], 'cmd-release' => @stats[:'cmd-release'], 'cmd-use' => @stats[:'cmd-use'], 'cmd-watch' => @stats[:'cmd-watch'], 'cmd-ignore' => @stats[:'cmd-ignore'], 'cmd-bury' => @stats[:'cmd-bury'], 'cmd-kick' => @stats[:'cmd-kick'], 'cmd-touch' => @stats[:'cmd-touch'], 'cmd-stats' => @stats[:'cmd-stats'], 'cmd-stats-job' => @stats[:'cmd-stats-job'], 'cmd-stats-tube' => @stats[:'cmd-stats-tube'], 'cmd-list-tubes' => @stats[:'cmd-list-tubes'], 'cmd-list-tube-used' => @stats[:'cmd-list-tube-used'], 'cmd-list-tubes-watched' => @stats[:'cmd-list-tubes-watched'], 'cmd-pause-tube' => @stats[:'cmd-pause-tube'], } end |
#stats_connections ⇒ Object (private)
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 207 def stats_connections conn_stats = { 'current-connections' => @connections.length, 'current-producers' => 0, 'current-workers' => 0, 'current-waiting' => 0, 'total-connections' => @stats[:'total-connections'] } @connections.each do |connection| conn_stats['current-producers'] += 1 if connection.producer? conn_stats['current-waiting'] += 1 if connection.waiting? conn_stats['current-workers'] += 1 if connection.worker? end return conn_stats end |
#try_dispatch(connection, job) ⇒ Object (private)
224 225 226 227 228 229 230 231 232 233 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 224 def try_dispatch(connection, job) connection.mutex.synchronize do # Make sure connection still waiting and job not claimed return false unless connection.waiting? && job.reserve(connection) connection.transmit("RESERVED #{job.id} #{job.bytes}\r\n#{job.body}\r\n") cancel_reservations(connection) end @reserved[connection] << job return true end |
#tube(tube_name, create_if_missing = false) ⇒ Object (private)
236 237 238 239 240 241 242 243 244 245 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 236 def tube(tube_name, create_if_missing = false) tube = @tubes[tube_name] return tube unless tube.nil? || tube.deactivated? return @tubes[tube_name] = GemeraldBeanstalk::Tube.new(tube_name) if create_if_missing @tubes.delete(tube_name) unless tube.nil? return nil end |
#tube_list(tube_list) ⇒ Object (private)
248 249 250 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 248 def tube_list(tube_list) return yaml_response(tube_list.map { |key| "- #{key}" }) end |
#update_state ⇒ Object (private)
253 254 255 256 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 253 def update_state update_waiting update_timeouts end |
#update_timeouts ⇒ Object (private)
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 259 def update_timeouts @reserved.values.flatten.each(&:state) @delayed.keep_if do |job| case job.state when :delayed true when :ready honor_reservations(job) false else false end end @paused.keep_if do |tube| if tube.paused? true else honor_reservations(tube) false end end end |
#update_waiting ⇒ Object (private)
283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 283 def update_waiting waiting_connections.each do |connection| if connection.waiting? && deadline_pending?(connection) = DEADLINE_SOON elsif connection.timed_out? = TIMED_OUT else next end cancel_reservations(connection) connection.transmit() end end |
#uptime ⇒ Object (private)
299 300 301 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 299 def uptime (Time.now.to_f - @up_at).to_i end |
#waiting_connections ⇒ Object (private)
304 305 306 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 304 def waiting_connections return @connections.select {|connection| connection.waiting? || connection.timed_out? } end |
#yaml_response(data) ⇒ Object (private)
309 310 311 312 |
# File 'lib/gemerald_beanstalk/beanstalk_helper.rb', line 309 def yaml_response(data) response = %w[---].concat(data).join("\n") return "OK #{response.bytesize}\r\n#{response}\r\n" end |