Class: HomeQ::SOBS::Foreman

Inherits:
Object
  • Object
show all
Includes:
Base::Logging
Defined in:
lib/homeq/sobs/foreman.rb

Overview

Job Manager

Constant Summary collapse

MAX_JOBS_TO_DOLE_OUT_PER_TICK =
100

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Base::Logging

#logger

Constructor Details

#initialize(server) ⇒ Foreman

Returns a new instance of Foreman.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/homeq/sobs/foreman.rb', line 44

def initialize(server)
  @serial_number = 0
  @server = server
  @jobs = {}  # Each element is a [job, job_state, time] tuple
  @states = {
    :start => HomeQ::OHash.new,
    :ready => HomeQ::OHash.new,
    :deleted => HomeQ::OHash.new,
    :delayed => HomeQ::OHash.new,
    :reserved => HomeQ::OHash.new,
    :buried => HomeQ::OHash.new,
  }
  @jobs_created 	= 0
  @jobs_deleted 	= 0
  @jobs_doled_out = 0
  @histo          = HomeQ::Histogram.new(0,10,0.5)
  dole_out_jobs
end

Instance Attribute Details

#jobsObject

Returns the value of attribute jobs.



42
43
44
# File 'lib/homeq/sobs/foreman.rb', line 42

def jobs
  @jobs
end

#serverObject

Returns the value of attribute server.



41
42
43
# File 'lib/homeq/sobs/foreman.rb', line 41

def server
  @server
end

Instance Method Details

#bury_job(conn, job_id) ⇒ Object



121
122
123
124
125
126
127
128
# File 'lib/homeq/sobs/foreman.rb', line 121

def bury_job(conn, job_id)
  j = job_id_in_state?(job_id, :reserved)
  if j
    j.bury(conn)
    return
  end
  conn.not_found(job_id)
end

#create_job(message, connection) ⇒ Object



63
64
65
66
67
68
# File 'lib/homeq/sobs/foreman.rb', line 63

def create_job(message, connection)
  @jobs_created += 1
  j = ServerJob.new(message, connection)
  j.add_observer(self)
  j.run
end

#delete_job(conn, job_id) ⇒ Object



110
111
112
113
114
115
116
117
118
119
# File 'lib/homeq/sobs/foreman.rb', line 110

def delete_job(conn, job_id)
  [:buried, :reserved].each { |state|
    j = job_id_in_state?(job_id, state)
    if j
      j.delete(conn)
      return
    end
  }
  conn.not_found(job_id)
end

#dole_out_jobsObject



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/homeq/sobs/foreman.rb', line 141

def dole_out_jobs
  # To give the reactor (EM) loop a chance to actually run, we
  # only send out a few jobs at a time.      
  1.upto(MAX_JOBS_TO_DOLE_OUT_PER_TICK) do
    return unless ready_jobs?
    c = find_waiting_connection
    return unless c
    j = get_next_ready_job
    @jobs_doled_out += 1
    j.reserve(c)
  end
  EventMachine::next_tick {
    dole_out_jobs
  }
end

#kick(conn, max_jobs) ⇒ Object



130
131
132
133
134
135
136
137
138
139
# File 'lib/homeq/sobs/foreman.rb', line 130

def kick(conn, max_jobs)
  kicked = []
  0.upto(max_jobs.to_i - 1) { |i|
    kicked << @states[:buried].shift
  }
  conn.kicked(kicked.length)
  kicked.each { |jid,j|
    @jobs[jid][0].kick
  }
end

#release_job(conn, job_id, priority, delay) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/homeq/sobs/foreman.rb', line 96

def release_job(conn, job_id, priority, delay)
  j = job_id_in_state?(job_id, :reserved)
  if !j
    conn.not_found(job_id)
    return
  end
  if delay.to_i > 0
    j.release_with_delay(priority.to_i, delay.to_i)
  else
    j.release
  end
  conn.released(job_id)
end

#to_sObject



157
158
159
160
161
162
163
164
165
166
167
# File 'lib/homeq/sobs/foreman.rb', line 157

def to_s
  str = "#{self.class} "
  str << "Pool size: #{ServerJob.pool.size}\n"
  str << "created: #{@jobs_created} deleted: #{@jobs_deleted} "
  str << "doled out: #{@jobs_doled_out}\n"
  @states.each { |state, oh|
    str << "#{state.to_s.capitalize} jobs: #{oh.length}\n" if oh.length>0
  }
  str << @histo.report
  str
end

#update(job, state) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/homeq/sobs/foreman.rb', line 70

def update(job, state)
  track_job_by_state(job, state) if job.job_id
  case state
  when :start
    job.job_id = unique_job_id
    @jobs[job.job_id] = [job, state, Time.now]
    # call now b/c we now have a job_id
    track_job_by_state(job, state)
    job.queue.inserted(job.job_id, job.message.args[3])
  when :deleted
    @histo << (Time.now - @jobs[job.job_id][2])
    @jobs.delete(job.job_id)
    remove_job_from_all_states(job)
    job.delete_observer(self)
    job.recycle
    @jobs_deleted += 1
  when :ready
    dole_out_jobs
  when :delayed
  when :reserved
  when :buried
  else
    Base::System.instance.die("Unknown job state #{state}")
  end
end