Class: HomeQ::SOBS::ServerJob

Inherits:
Job
  • Object
show all
Includes:
Base::Logging, Poolable, Observable
Defined in:
lib/homeq/sobs/server.rb

Overview

Server Job

This class is the server-side representation of a message, or a “job” - a piece of work that somebody sent, that somebody will eventually pick up and do something with.

On the server side, the job lifecycle looks like this:

 put with delay               release with delay
----------------> [DELAYED] <------------.
                      |                  |
                      |(time passes)     |
                      |                  |
put                   v     reserve      |        delete
-----------------> [READY] ---------> [RESERVED] --------> *poof*
                     ^ ^                 | |
                     |  \ release        | |            
                     |   `---------------' |        
                     |                     |                        
                     | kick                |                   
                     |                     |                        
                     |                bury |                   
                 [BURIED] <----------------' 
                     |                          
                     | delete                   
                     `--------> *poof*

In the simplest case, jobs go from start to ready to reserved to poof.

Constant Summary collapse

@@job_count =

A count of the total number of jobs that have had a life in the system.

0

Instance Attribute Summary collapse

Attributes inherited from Job

#job_id, #message, #queue

Instance Method Summary collapse

Methods included from Poolable

included, #recycle

Methods included from Base::Logging

#logger

Methods inherited from Job

#payload

Methods included from Base::Configuration

#config

Methods included from Base::Options

#options

Constructor Details

#initialize(message, source) ⇒ ServerJob

Create a new ServerJob. Called by Foreman#create_job. A Job is based on receipt of a message, from a source, and it is managed by a foreman. The message is assumed to have three args: priority (ignored), delay, and ttr (time to run).



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/homeq/sobs/server.rb', line 95

def initialize(message, source)
  logger.debug4 {
    'INIT'
  }
  super(message, source)
  @@job_count     += 1
  @priority, @delay, @ttr = @message.args.collect { |arg|
    arg.to_i
  }

  # Create a state machine to drive.
  @state = Statemachine.build do
    state :start do
      event :put,                 :ready
      event :put_with_delay,      :delayed
    end
    state :delayed do
      on_entry :start_delay_timer
      event :delay_timeout,       :ready
    end
    state :ready do
      on_entry :broadcast_change
      event :reserve,             :reserved
    end
    state :reserved do
      on_entry :start_run_timer
      event :delete,              :deleted
      event :release,             :ready
      event :run_timeout,         :ready
      event :release_with_delay,  :delayed
      event :bury,                :buried
      on_exit :exit_reserved_state
    end
    state :buried do
      on_entry :broadcast_change
      event :delete,              :deleted
      event :kick,                :ready
    end
    state :deleted do
      on_entry :broadcast_change
    end
  end
  @state.context = self
end

Instance Attribute Details

#delayObject

Delay N seconds before ready



82
83
84
# File 'lib/homeq/sobs/server.rb', line 82

def delay
  @delay
end

#priorityObject

How important? Not implemented.



79
80
81
# File 'lib/homeq/sobs/server.rb', line 79

def priority
  @priority
end

#reserving_connectionObject (readonly)

Connection working on this job



85
86
87
# File 'lib/homeq/sobs/server.rb', line 85

def reserving_connection
  @reserving_connection
end

#ttrObject

Time To Run



76
77
78
# File 'lib/homeq/sobs/server.rb', line 76

def ttr
  @ttr
end

Instance Method Details

#bury(conn) ⇒ Object

Push this job off to the side so it is not available to be worked on.



228
229
230
231
# File 'lib/homeq/sobs/server.rb', line 228

def bury(conn)
  conn.buried(job_id) # may or may not be @reserving_connection
  @state.bury
end

#deinitializeObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/homeq/sobs/server.rb', line 140

def deinitialize
  logger.debug4 {
    'DEINIT'
  }
  # first take care of superclass's vars
  @message = @queue = @job_id = nil
  
  # now ours
  instance_variables.each {|v|
    next if v == "@state"     # keeper.  largely the point of recycling
    next if v =~ /observer/   # keeper.  our observer(s)
    eval "#{v} = nil"
  }
  @state.state = :start
end

#delete(conn) ⇒ Object

Delete this job (poof). A ‘deleted’ reply is sent to the connection that sent the delete (which may or may not be the connection that reserved the job in the first place).



195
196
197
198
# File 'lib/homeq/sobs/server.rb', line 195

def delete(conn)
  conn.deleted(job_id) # may or may not be @reserving_connection
  @state.delete
end

#kickObject

Move this job from the ‘buried’ state (see #bury, above) to the ‘ready’ state.



235
236
237
# File 'lib/homeq/sobs/server.rb', line 235

def kick
  @state.kick
end

#putObject

Takes this job from the start state to ready.



181
182
183
# File 'lib/homeq/sobs/server.rb', line 181

def put
  @state.put
end

#put_with_delayObject

Takes this job from the start state to delayed. As a consequence, with kick off a timer that will move this job from delayed to ready when it expires.



188
189
190
# File 'lib/homeq/sobs/server.rb', line 188

def put_with_delay
  @state.put_with_delay
end

#reinitialize(message, source) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
# File 'lib/homeq/sobs/server.rb', line 156

def reinitialize(message, source)
  # Call super's initialize().
  SOBS::Job.instance_method(:initialize).bind(self).call(message,source)
  @@job_count     += 1
  @priority, @delay, @ttr = @message.args.collect { |arg|
    arg.to_i
  }
  logger.debug4 {
    "REINIT #{@state.state}"
  }
end

#releaseObject

Take this job out of the “being worked on” state and back to availability in the ‘ready’ state.



212
213
214
# File 'lib/homeq/sobs/server.rb', line 212

def release
  @state.release
end

#release_with_delay(priority, delay) ⇒ Object

Take this job out of the “being worked on” state and back to availability in the ‘ready’ state, but after going through a ‘delayed’ state. We kick off a timer that moves the job from ‘delayed’ to ‘ready’ when it expires.



220
221
222
223
224
# File 'lib/homeq/sobs/server.rb', line 220

def release_with_delay(priority, delay)
  @priority = priority
  @delay = delay
  @state.release_with_delay
end

#reserve(conn) ⇒ Object

Mark this job as being processed. The reserving connection is sent a ‘reserved’ reply along with the contents of this job. We store a notion of the reserving connection, and push this job from the ‘ready’ state to the ‘reserved’ state.



204
205
206
207
208
# File 'lib/homeq/sobs/server.rb', line 204

def reserve(conn)
  @state.reserve
  conn.reserved(job_id, payload)
  @reserving_connection = conn
end

#runObject



168
169
170
171
172
173
174
175
176
177
178
# File 'lib/homeq/sobs/server.rb', line 168

def run
  logger.debug4 {
    'LAUNCH'
  }
  broadcast_change
  if delay > 0
    put_with_delay
  else
    put
  end
end