Class: Tap::App
Overview
App coordinates the setup and running of tasks, and provides an interface to the application directory structure. All tasks have an App (by default App.instance) through which tasks access access application-wide resources like the logger, executable queue, aggregator, and dependencies.
Running Tasks
Task enque command are forwarded to App#enq:
t0 = Task.intern {|task, input| "#{input}.0" }
t0.enq('a')
app.enq(t0, 'b')
app.run
app.results(t0) # => ['a.0', 'b.0']
When a task completes, the results will be passed to the task on_complete block, if set, or be collected into an Aggregator (aggregated results may be accessed per-task, as shown above); on_complete blocks typically execute or enque other tasks, allowing the construction of imperative workflows:
# clear the previous results
app.aggregator.clear
t1 = Task.intern {|task, input| "#{input}.1" }
t0.on_complete {|_result| t1.enq(_result) }
t0.enq 'c'
app.run
app.results(t0) # => []
app.results(t1) # => ['c.0.1']
Here t0 has no results because the on_complete block passed them to t1 in a simple sequence.
Dependencies
Tasks allow the construction of dependency-based workflows such that a dependent task only executes after its dependencies have been resolved.
runlist = []
t0 = Task.intern {|task| runlist << task }
t1 = Task.intern {|task| runlist << task }
t0.depends_on(t1)
t0.enq
app.run
runlist # => [t1, t0]
Once a dependency is resolved, it will not execute again:
t0.enq
app.run
runlist # => [t1, t0, t0]
Batching
Tasks can be batched, allowing the same input to be enqued to multiple tasks at once.
t0 = Task.intern {|task, input| "#{input}.0" }
t1 = Task.intern {|task, input| "#{input}.1" }
t0.batch_with(t1)
t0.enq 'a'
t1.enq 'b'
app.run
app.results(t0) # => ['a.0', 'b.0']
app.results(t1) # => ['a.1', 'b.1']
Executables
App can enque and run any Executable object. Arbitrary methods may be made into Executables using Object#_method. The mq (method enq) method generates and enques methods in one step.
array = []
# longhand
m = array._method(:push)
m.enq(1)
# shorthand
app.mq(array, :push, 2)
array.empty? # => true
app.run
array # => [1, 2]
Auditing
All results are audited to track how a given input evolves during a workflow. To illustrate auditing, consider and addition workflow that ends in eights.
add_one = Tap::Task.intern({}, 'add_one') {|task, input| input += 1 }
add_five = Tap::Task.intern({}, 'add_five') {|task, input| input += 5 }
add_one.on_complete do |_result|
# _result is the audit; use the _current method
# to get the current value in the audit trail
current_value = _result._current
if current_value < 3
add_one.enq(_result)
else
add_five.enq(_result)
end
end
add_one.enq(0)
add_one.enq(1)
add_one.enq(2)
app.run
app.results(add_five) # => [8,8,8]
Although the results are indistinguishable, each achieved the final value through a different series of tasks. With auditing you can see how each input came to the final value of 8:
# app.results returns the actual result values
# app._results returns the audits for these values
app._results(add_five).each do |_result|
puts "How #{_result._original} became #{_result._current}:"
puts _result._to_s
puts
end
Prints:
How 2 became 8:
o-[] 2
o-[add_one] 3
o-[add_five] 8
How 1 became 8:
o-[] 1
o-[add_one] 2
o-[add_one] 3
o-[add_five] 8
How 0 became 8:
o-[] 0
o-[add_one] 1
o-[add_one] 2
o-[add_one] 3
o-[add_five] 8
See Tap::Support::Audit for more details.
Defined Under Namespace
Modules: State Classes: TerminateError
Constant Summary collapse
- DEFAULT_LOGGER =
The default App logger writes to $stdout at level INFO.
Logger.new($stdout)
Constants inherited from Root
Class Attribute Summary collapse
-
.instance ⇒ Object
Returns the current instance of App.
Instance Attribute Summary collapse
-
#aggregator ⇒ Object
readonly
A Tap::Support::Aggregator to collect the results of methods that have no on_complete block.
-
#dependencies ⇒ Object
readonly
A Tap::Support::Dependencies to track dependencies.
-
#logger ⇒ Object
The shared logger.
-
#queue ⇒ Object
readonly
The application queue.
-
#state ⇒ Object
readonly
The state of the application (see App::State).
Attributes inherited from Root
Attributes included from Support::Configurable
Instance Method Summary collapse
-
#_results(*tasks) ⇒ Object
Returns all aggregated, audited results for the specified tasks.
-
#config_filepath(name) ⇒ Object
Returns the configuration filepath for the specified task name, File.join(app, task_name + “.yml”).
-
#debug? ⇒ Boolean
True if debug or the global variable $DEBUG is true.
-
#enq(task, *inputs) ⇒ Object
Enques the task (or Executable) with the inputs.
-
#info ⇒ Object
Returns an information string for the App.
-
#initialize(config = {}, logger = DEFAULT_LOGGER) ⇒ App
constructor
Creates a new App with the given configuration.
- #inspect ⇒ Object
-
#log(action, msg = "", level = Logger::INFO) ⇒ Object
Logs the action and message at the input level (default INFO).
-
#mq(object, method_name, *inputs) ⇒ Object
Method enque.
-
#ready ⇒ Object
Sets state = State::READY unless the app is running.
-
#results(*tasks) ⇒ Object
Returns all aggregated results for the specified tasks.
-
#run ⇒ Object
Sequentially calls execute with the [executable, inputs] pairs in queue; run continues until the queue is empty and then returns self.
-
#stop ⇒ Object
Signals a running application to stop executing tasks in the queue by setting state = State::STOP.
-
#terminate ⇒ Object
Signals a running application to terminate execution by setting state = State::TERMINATE.
Methods inherited from Root
#[], #[]=, #absolute_paths, #absolute_paths=, #chdir, chdir, #directories=, expanded_path?, #filepath, glob, #glob, minimal_match?, minimize, path_root_type, #relative_filepath, relative_filepath, #root=, sglob, split, translate, #translate, vglob, #vglob
Methods included from Support::Versions
#compare_versions, #deversion, #increment, #version
Methods included from Support::Configurable
included, #initialize_copy, #reconfigure
Constructor Details
#initialize(config = {}, logger = DEFAULT_LOGGER) ⇒ App
Creates a new App with the given configuration.
213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/tap/app.rb', line 213 def initialize(config={}, logger=DEFAULT_LOGGER) super() @state = State::READY @queue = Support::ExecutableQueue.new @aggregator = Support::Aggregator.new @dependencies = Support::Dependencies.new initialize_config(config) self.logger = logger end |
Class Attribute Details
Instance Attribute Details
#aggregator ⇒ Object (readonly)
A Tap::Support::Aggregator to collect the results of methods that have no on_complete block
183 184 185 |
# File 'lib/tap/app.rb', line 183 def aggregator @aggregator end |
#dependencies ⇒ Object (readonly)
A Tap::Support::Dependencies to track dependencies.
186 187 188 |
# File 'lib/tap/app.rb', line 186 def dependencies @dependencies end |
#logger ⇒ Object
The shared logger
173 174 175 |
# File 'lib/tap/app.rb', line 173 def logger @logger end |
#queue ⇒ Object (readonly)
The application queue
176 177 178 |
# File 'lib/tap/app.rb', line 176 def queue @queue end |
#state ⇒ Object (readonly)
The state of the application (see App::State)
179 180 181 |
# File 'lib/tap/app.rb', line 179 def state @state end |
Instance Method Details
#_results(*tasks) ⇒ Object
Returns all aggregated, audited results for the specified tasks.
Results are joined into a single array. Arrays of tasks are allowed as inputs. See results.
361 362 363 |
# File 'lib/tap/app.rb', line 361 def _results(*tasks) aggregator.retrieve_all(*tasks.flatten) end |
#config_filepath(name) ⇒ Object
Returns the configuration filepath for the specified task name, File.join(app, task_name + “.yml”). Returns nil if task_name is nil.
256 257 258 |
# File 'lib/tap/app.rb', line 256 def config_filepath(name) name == nil ? nil : filepath('config', "#{name}.yml") end |
#debug? ⇒ Boolean
True if debug or the global variable $DEBUG is true.
233 234 235 |
# File 'lib/tap/app.rb', line 233 def debug? debug || $DEBUG end |
#enq(task, *inputs) ⇒ Object
Enques the task (or Executable) with the inputs. If the task is batched, then each task in task.batch will be enqued with the inputs. Returns task.
338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/tap/app.rb', line 338 def enq(task, *inputs) case task when Tap::Task raise ArgumentError, "not assigned to enqueing app: #{task}" unless task.app == self task.enq(*inputs) when Support::Executable queue.enq(task, inputs) else raise ArgumentError, "not a Task or Executable: #{task}" end task end |
#info ⇒ Object
332 333 334 |
# File 'lib/tap/app.rb', line 332 def info "state: #{state} (#{State.state_str(state)}) queue: #{queue.size} results: #{aggregator.size}" end |
#inspect ⇒ Object
384 385 386 |
# File 'lib/tap/app.rb', line 384 def inspect "#<#{self.class.to_s}:#{object_id} root: #{root} >" end |
#log(action, msg = "", level = Logger::INFO) ⇒ Object
Logs the action and message at the input level (default INFO).
Logging is suppressed if quiet is true.
249 250 251 |
# File 'lib/tap/app.rb', line 249 def log(action, msg="", level=Logger::INFO) logger.add(level, msg, action.to_s) if !quiet || verbose end |
#mq(object, method_name, *inputs) ⇒ Object
Method enque. Enques the specified method from object with the inputs. Returns the enqued method.
353 354 355 356 |
# File 'lib/tap/app.rb', line 353 def mq(object, method_name, *inputs) m = object._method(method_name) enq(m, *inputs) end |
#ready ⇒ Object
Sets state = State::READY unless the app is running. Returns self.
261 262 263 264 |
# File 'lib/tap/app.rb', line 261 def ready @state = State::READY unless state == State::RUN self end |
#results(*tasks) ⇒ Object
Returns all aggregated results for the specified tasks. Results are joined into a single array. Arrays of tasks are allowed as inputs.
t0 = Task.intern {|task, input| "#{input}.0" }
t1 = Task.intern {|task, input| "#{input}.1" }
t2 = Task.intern {|task, input| "#{input}.2" }
t1.batch_with(t2)
t0.enq(0)
t1.enq(1)
app.run
app.results(t0, t1.batch) # => ["0.0", "1.1", "1.2"]
app.results(t1, t0) # => ["1.1", "0.0"]
380 381 382 |
# File 'lib/tap/app.rb', line 380 def results(*tasks) _results(tasks).collect {|_result| _result._current} end |
#run ⇒ Object
Sequentially calls execute with the [executable, inputs] pairs in queue; run continues until the queue is empty and then returns self.
Run State
Run checks the state of self before executing a method. If state changes from State::RUN, the following behaviors result:
- State::STOP
-
No more executables will be executed; the current executable will continute to completion.
- State::TERMINATE
-
No more executables will be executed and the currently running executable will be discontinued as described in terminate.
Calls to run when the state is not State::READY do nothing and return immediately.
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/tap/app.rb', line 282 def run return self unless state == State::READY @state = State::RUN # TODO: log starting run begin until queue.empty? || state != State::RUN executable, inputs = queue.deq executable._execute(*inputs) end rescue(TerminateError) # gracefully fail for termination errors rescue(Exception) # handle other errors accordingly raise if debug? log($!.class, $!.) ensure @state = State::READY end # TODO: log run complete self end |
#stop ⇒ Object
Signals a running application to stop executing tasks in the queue by setting state = State::STOP. The task currently executing will continue uninterrupted to completion.
Does nothing unless state is State::RUN.
311 312 313 314 |
# File 'lib/tap/app.rb', line 311 def stop @state = State::STOP if state == State::RUN self end |
#terminate ⇒ Object
Signals a running application to terminate execution by setting state = State::TERMINATE. In this state, an executing task will then raise a TerminateError upon check_terminate, thus allowing the invocation of task-specific termination, perhaps performing rollbacks. (see Tap::Support::Executable#check_terminate).
Does nothing if state == State::READY.
323 324 325 326 |
# File 'lib/tap/app.rb', line 323 def terminate @state = State::TERMINATE unless state == State::READY self end |