Class: Ruote::ProcessObserver
- Inherits:
-
Object
- Object
- Ruote::ProcessObserver
- Defined in:
- lib/ruote/util/process_observer.rb
Overview
A base class for process observers, just to provide convenience. It (heavily) sugar coats the Ruote::Observer and translate the messages into actions. Each such action is provided with pre-distilled information relevant for processes.
Example implementation
class WebsocketSubscriber < Ruote::ProcessObserver
# override initialize to warm-up a websocket client
def initialize(context, ={})
super
@client = WebsocketClient.new()
end
# tell the listeners that a new process launched
def on_launch(wfid, opts)
@client.publish(
"/process/launch",
{ :name => opts[:workitem].wf_name,
:wfid => wfid,
:definition => opts[:pdef],
}
)
end
# tell the listeners that a new process ended
def on_end(wfid)
@client.publish("/process/#{wfid}", { :end => true })
end
end
Actions
The ProcessObserver adheres closely to the message actions, it calls the following methods:
- on_launch
-
When a process or sub-process starts
- on_terminated
-
When a process ends
- on_error_intercepted
-
When an error was intercepted
- on_cancel
-
When a process or sub-process was canceled
- on_dispatch
-
When a participant is dispatched
- on_receive
-
Whenever a workitem is received
And others, but if you are interested in those; you might be better of using the more low-level Ruote::Observer
Arguments
The methods are called with (wfid[, options])
You can provide a method-signature like:
def on_launch(wfid, options)
def on_launch(wfid)
If the ProcessObserver cannot call the method with the options, it tries to call without options
Options
The following options are provided:
- :workitem
-
The workitem, if available
- :action
-
The original name of the action
- :child
-
Boolean; This is an event of a child, or sub-flow
- :error
-
The intercepted error (only provided with #on_error_intercepted)
- :pdef
-
The (sub-)process definition (only provided with #on_launch)
- :variables
-
The process variables, if available
- :flavour
-
The flavour of canceling (only on_cancel)
Error handling
If anywhere in your implementation an action raises an error, it is caught by the ProcessObserver and silently ignored.
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#filtered_actions ⇒ Object
readonly
Returns the value of attribute filtered_actions.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
-
#initialize(context, options = {}) ⇒ ProcessObserver
constructor
A new instance of ProcessObserver.
-
#on_msg(msg) ⇒ Object
:nodoc:.
Constructor Details
#initialize(context, options = {}) ⇒ ProcessObserver
103 104 105 106 107 108 109 110 |
# File 'lib/ruote/util/process_observer.rb', line 103 def initialize(context, ={}) @filtered_actions = .delete(:filtered_actions) @filtered_actions ||= [] @filtered_actions |= %w[dispatched participant_registered variable_set] @context = context @options = end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
101 102 103 |
# File 'lib/ruote/util/process_observer.rb', line 101 def context @context end |
#filtered_actions ⇒ Object (readonly)
Returns the value of attribute filtered_actions.
101 102 103 |
# File 'lib/ruote/util/process_observer.rb', line 101 def filtered_actions @filtered_actions end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
101 102 103 |
# File 'lib/ruote/util/process_observer.rb', line 101 def @options end |
Instance Method Details
#on_msg(msg) ⇒ Object
:nodoc:
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/ruote/util/process_observer.rb', line 112 def on_msg(msg) # :nodoc: return if @filtered_actions.include? msg['action'] wfid = msg['wfid'] child = false if !wfid && msg['parent_id'] wfid = msg['parent_id']['wfid'] child = true end wfid ||= Ruote.extract_wfid(msg) return if !wfid workitem = begin if msg['workitem'] Ruote::Workitem.new(Rufus::Json.dup(msg['workitem'])) else Ruote::Workitem.new({}) end rescue Ruote::Workitem.new({}) end data = { :workitem => workitem, :action => msg['action'], :child => child, :variables => msg['variables'], } # the prelimenary method name method = msg['action'].split('_').first # change method or fields based on the action case msg['action'] when 'launch' data[:pdef] = msg['tree'] when 'cancel' data[:flavour] = msg['flavour'] when 'error_intercepted' error = Kernel.const_get(msg['error']['class']).new(msg['error']['message']) error.set_backtrace msg['error']['trace'] data[:error] = error method = msg['action'] end callback = "on_#{method}" if self.respond_to?(callback) args = [ wfid ] args << data if self.method(callback).arity.abs == 2 self.send(callback, *args) end return rescue return end |