Class: HomeQ::SOBS::ServerJob
- Includes:
- Base::Logging, Poolable, Observable
- Defined in:
- lib/homeq/sobs/server.rb
Overview
Server Job
This class is the server-side representation of a message, or a “job” - a piece of work that somebody sent, that somebody will eventually pick up and do something with.
On the server side, the job lifecycle looks like this:
put with delay release with delay
----------------> [DELAYED] <------------.
| |
|(time passes) |
| |
put v reserve | delete
-----------------> [READY] ---------> [RESERVED] --------> *poof*
^ ^ | |
| \ release | |
| `---------------' |
| |
| kick |
| |
| bury |
[BURIED] <----------------'
|
| delete
`--------> *poof*
In the simplest case, jobs go from start to ready to reserved to poof.
Constant Summary collapse
- @@job_count =
A count of the total number of jobs that have had a life in the system.
0
Instance Attribute Summary collapse
-
#delay ⇒ Object
Delay N seconds before ready.
-
#priority ⇒ Object
How important? Not implemented.
-
#reserving_connection ⇒ Object
readonly
Connection working on this job.
-
#ttr ⇒ Object
Time To Run.
Attributes inherited from Job
Instance Method Summary collapse
-
#bury(conn) ⇒ Object
Push this job off to the side so it is not available to be worked on.
- #deinitialize ⇒ Object
-
#delete(conn) ⇒ Object
Delete this job (poof).
-
#initialize(message, source) ⇒ ServerJob
constructor
Create a new ServerJob.
-
#kick ⇒ Object
Move this job from the ‘buried’ state (see #bury, above) to the ‘ready’ state.
-
#put ⇒ Object
Takes this job from the start state to ready.
-
#put_with_delay ⇒ Object
Takes this job from the start state to delayed.
- #reinitialize(message, source) ⇒ Object
-
#release ⇒ Object
Take this job out of the “being worked on” state and back to availability in the ‘ready’ state.
-
#release_with_delay(priority, delay) ⇒ Object
Take this job out of the “being worked on” state and back to availability in the ‘ready’ state, but after going through a ‘delayed’ state.
-
#reserve(conn) ⇒ Object
Mark this job as being processed.
- #run ⇒ Object
Methods included from Poolable
Methods included from Base::Logging
Methods inherited from Job
Methods included from Base::Configuration
Methods included from Base::Options
Constructor Details
#initialize(message, source) ⇒ ServerJob
Create a new ServerJob. Called by Foreman#create_job. A Job is based on receipt of a message, from a source, and it is managed by a foreman. The message is assumed to have three args: priority (ignored), delay, and ttr (time to run).
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/homeq/sobs/server.rb', line 95 def initialize(, source) logger.debug4 { 'INIT' } super(, source) @@job_count += 1 @priority, @delay, @ttr = @message.args.collect { |arg| arg.to_i } # Create a state machine to drive. @state = Statemachine.build do state :start do event :put, :ready event :put_with_delay, :delayed end state :delayed do on_entry :start_delay_timer event :delay_timeout, :ready end state :ready do on_entry :broadcast_change event :reserve, :reserved end state :reserved do on_entry :start_run_timer event :delete, :deleted event :release, :ready event :run_timeout, :ready event :release_with_delay, :delayed event :bury, :buried on_exit :exit_reserved_state end state :buried do on_entry :broadcast_change event :delete, :deleted event :kick, :ready end state :deleted do on_entry :broadcast_change end end @state.context = self end |
Instance Attribute Details
#delay ⇒ Object
Delay N seconds before ready
82 83 84 |
# File 'lib/homeq/sobs/server.rb', line 82 def delay @delay end |
#priority ⇒ Object
How important? Not implemented.
79 80 81 |
# File 'lib/homeq/sobs/server.rb', line 79 def priority @priority end |
#reserving_connection ⇒ Object (readonly)
Connection working on this job
85 86 87 |
# File 'lib/homeq/sobs/server.rb', line 85 def reserving_connection @reserving_connection end |
#ttr ⇒ Object
Time To Run
76 77 78 |
# File 'lib/homeq/sobs/server.rb', line 76 def ttr @ttr end |
Instance Method Details
#bury(conn) ⇒ Object
Push this job off to the side so it is not available to be worked on.
228 229 230 231 |
# File 'lib/homeq/sobs/server.rb', line 228 def bury(conn) conn.buried(job_id) # may or may not be @reserving_connection @state.bury end |
#deinitialize ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/homeq/sobs/server.rb', line 140 def deinitialize logger.debug4 { 'DEINIT' } # first take care of superclass's vars @message = @queue = @job_id = nil # now ours instance_variables.each {|v| next if v == "@state" # keeper. largely the point of recycling next if v =~ /observer/ # keeper. our observer(s) eval "#{v} = nil" } @state.state = :start end |
#delete(conn) ⇒ Object
Delete this job (poof). A ‘deleted’ reply is sent to the connection that sent the delete (which may or may not be the connection that reserved the job in the first place).
195 196 197 198 |
# File 'lib/homeq/sobs/server.rb', line 195 def delete(conn) conn.deleted(job_id) # may or may not be @reserving_connection @state.delete end |
#kick ⇒ Object
Move this job from the ‘buried’ state (see #bury, above) to the ‘ready’ state.
235 236 237 |
# File 'lib/homeq/sobs/server.rb', line 235 def kick @state.kick end |
#put ⇒ Object
Takes this job from the start state to ready.
181 182 183 |
# File 'lib/homeq/sobs/server.rb', line 181 def put @state.put end |
#put_with_delay ⇒ Object
Takes this job from the start state to delayed. As a consequence, with kick off a timer that will move this job from delayed to ready when it expires.
188 189 190 |
# File 'lib/homeq/sobs/server.rb', line 188 def put_with_delay @state.put_with_delay end |
#reinitialize(message, source) ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/homeq/sobs/server.rb', line 156 def reinitialize(, source) # Call super's initialize(). SOBS::Job.instance_method(:initialize).bind(self).call(,source) @@job_count += 1 @priority, @delay, @ttr = @message.args.collect { |arg| arg.to_i } logger.debug4 { "REINIT #{@state.state}" } end |
#release ⇒ Object
Take this job out of the “being worked on” state and back to availability in the ‘ready’ state.
212 213 214 |
# File 'lib/homeq/sobs/server.rb', line 212 def release @state.release end |
#release_with_delay(priority, delay) ⇒ Object
Take this job out of the “being worked on” state and back to availability in the ‘ready’ state, but after going through a ‘delayed’ state. We kick off a timer that moves the job from ‘delayed’ to ‘ready’ when it expires.
220 221 222 223 224 |
# File 'lib/homeq/sobs/server.rb', line 220 def release_with_delay(priority, delay) @priority = priority @delay = delay @state.release_with_delay end |
#reserve(conn) ⇒ Object
Mark this job as being processed. The reserving connection is sent a ‘reserved’ reply along with the contents of this job. We store a notion of the reserving connection, and push this job from the ‘ready’ state to the ‘reserved’ state.
204 205 206 207 208 |
# File 'lib/homeq/sobs/server.rb', line 204 def reserve(conn) @state.reserve conn.reserved(job_id, payload) @reserving_connection = conn end |
#run ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/homeq/sobs/server.rb', line 168 def run logger.debug4 { 'LAUNCH' } broadcast_change if delay > 0 put_with_delay else put end end |