Class: HomeQ::SOBS::Foreman
- Inherits:
-
Object
- Object
- HomeQ::SOBS::Foreman
- Includes:
- Base::Logging
- Defined in:
- lib/homeq/sobs/foreman.rb
Overview
Job Manager
Constant Summary collapse
- MAX_JOBS_TO_DOLE_OUT_PER_TICK =
100
Instance Attribute Summary collapse
-
#jobs ⇒ Object
Returns the value of attribute jobs.
-
#server ⇒ Object
Returns the value of attribute server.
Instance Method Summary collapse
- #bury_job(conn, job_id) ⇒ Object
- #create_job(message, connection) ⇒ Object
- #delete_job(conn, job_id) ⇒ Object
- #dole_out_jobs ⇒ Object
-
#initialize(server) ⇒ Foreman
constructor
A new instance of Foreman.
- #kick(conn, max_jobs) ⇒ Object
- #release_job(conn, job_id, priority, delay) ⇒ Object
- #to_s ⇒ Object
- #update(job, state) ⇒ Object
Methods included from Base::Logging
Constructor Details
#initialize(server) ⇒ Foreman
Returns a new instance of Foreman.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/homeq/sobs/foreman.rb', line 44 def initialize(server) @serial_number = 0 @server = server @jobs = {} # Each element is a [job, job_state, time] tuple @states = { :start => HomeQ::OHash.new, :ready => HomeQ::OHash.new, :deleted => HomeQ::OHash.new, :delayed => HomeQ::OHash.new, :reserved => HomeQ::OHash.new, :buried => HomeQ::OHash.new, } @jobs_created = 0 @jobs_deleted = 0 @jobs_doled_out = 0 @histo = HomeQ::Histogram.new(0,10,0.5) dole_out_jobs end |
Instance Attribute Details
#jobs ⇒ Object
Returns the value of attribute jobs.
42 43 44 |
# File 'lib/homeq/sobs/foreman.rb', line 42 def jobs @jobs end |
#server ⇒ Object
Returns the value of attribute server.
41 42 43 |
# File 'lib/homeq/sobs/foreman.rb', line 41 def server @server end |
Instance Method Details
#bury_job(conn, job_id) ⇒ Object
121 122 123 124 125 126 127 128 |
# File 'lib/homeq/sobs/foreman.rb', line 121 def bury_job(conn, job_id) j = job_id_in_state?(job_id, :reserved) if j j.bury(conn) return end conn.not_found(job_id) end |
#create_job(message, connection) ⇒ Object
63 64 65 66 67 68 |
# File 'lib/homeq/sobs/foreman.rb', line 63 def create_job(, connection) @jobs_created += 1 j = ServerJob.new(, connection) j.add_observer(self) j.run end |
#delete_job(conn, job_id) ⇒ Object
110 111 112 113 114 115 116 117 118 119 |
# File 'lib/homeq/sobs/foreman.rb', line 110 def delete_job(conn, job_id) [:buried, :reserved].each { |state| j = job_id_in_state?(job_id, state) if j j.delete(conn) return end } conn.not_found(job_id) end |
#dole_out_jobs ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/homeq/sobs/foreman.rb', line 141 def dole_out_jobs # To give the reactor (EM) loop a chance to actually run, we # only send out a few jobs at a time. 1.upto(MAX_JOBS_TO_DOLE_OUT_PER_TICK) do return unless ready_jobs? c = find_waiting_connection return unless c j = get_next_ready_job @jobs_doled_out += 1 j.reserve(c) end EventMachine::next_tick { dole_out_jobs } end |
#kick(conn, max_jobs) ⇒ Object
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/homeq/sobs/foreman.rb', line 130 def kick(conn, max_jobs) kicked = [] 0.upto(max_jobs.to_i - 1) { |i| kicked << @states[:buried].shift } conn.kicked(kicked.length) kicked.each { |jid,j| @jobs[jid][0].kick } end |
#release_job(conn, job_id, priority, delay) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/homeq/sobs/foreman.rb', line 96 def release_job(conn, job_id, priority, delay) j = job_id_in_state?(job_id, :reserved) if !j conn.not_found(job_id) return end if delay.to_i > 0 j.release_with_delay(priority.to_i, delay.to_i) else j.release end conn.released(job_id) end |
#to_s ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/homeq/sobs/foreman.rb', line 157 def to_s str = "#{self.class} " str << "Pool size: #{ServerJob.pool.size}\n" str << "created: #{@jobs_created} deleted: #{@jobs_deleted} " str << "doled out: #{@jobs_doled_out}\n" @states.each { |state, oh| str << "#{state.to_s.capitalize} jobs: #{oh.length}\n" if oh.length>0 } str << @histo.report str end |
#update(job, state) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/homeq/sobs/foreman.rb', line 70 def update(job, state) track_job_by_state(job, state) if job.job_id case state when :start job.job_id = unique_job_id @jobs[job.job_id] = [job, state, Time.now] # call now b/c we now have a job_id track_job_by_state(job, state) job.queue.inserted(job.job_id, job..args[3]) when :deleted @histo << (Time.now - @jobs[job.job_id][2]) @jobs.delete(job.job_id) remove_job_from_all_states(job) job.delete_observer(self) job.recycle @jobs_deleted += 1 when :ready dole_out_jobs when :delayed when :reserved when :buried else Base::System.instance.die("Unknown job state #{state}") end end |