Module: Ruote::LocalParticipant
- Includes:
- ReceiverMixin
- Included in:
- BlockParticipant, CodeParticipant, EngineParticipant, NoOpParticipant, NullParticipant, Participant, RevParticipant, SmtpParticipant, StorageParticipant
- Defined in:
- lib/ruote/part/local_participant.rb
Overview
Provides methods for ‘local’ participants.
Assumes the class that includes this module has a #context method that points to the worker or engine ruote context.
It’s “local” because it has access to the ruote storage.
Instance Attribute Summary collapse
-
#context ⇒ Object
The engine context, it’s a local participant so it knows about the context in which the engine operates…
-
#fei ⇒ Object
Returns the current fei (Ruote::FlowExpressionId).
-
#flavour ⇒ Object
Usually set right before a call to #on_cancel or #cancel.
-
#workitem(fei = nil) ⇒ Object
Returns the current workitem if no fei is given.
Instance Method Summary collapse
-
#_accept?(wi) ⇒ Boolean
Test shortcut, alleviates the need to set the workitem before calling accept?.
-
#_dont_thread?(wi) ⇒ Boolean
(also: #_do_not_thread, #_do_not_thread?)
Test shortcut, alleviates the need to set the workitem before calling dont_thread?, do_not_thread? or do_not_thread.
-
#_on_cancel(fei, flavour) ⇒ Object
(also: #_cancel)
Test shortcut, alleviates the need to set fei and flavour before calling cancel / on_consume.
-
#_on_reply(wi) ⇒ Object
Test shortcut, alleviates the need to set the workitem before calling on_reply.
-
#_on_workitem(wi) ⇒ Object
(also: #_consume)
Test shortcut, alleviates the need to set the workitem before calling consume / on_workitem.
-
#_rtimeout(wi) ⇒ Object
Test shortcut, alleviates the need to set the workitem before calling rtimeout.
-
#applied_workitem(_fei = nil) ⇒ Object
Returns the workitem as was applied when the Ruote::ParticipantExpression was reached.
-
#fexp(wi_or_fei = nil) ⇒ Object
Returns the Ruote::ParticipantExpression that corresponds with this participant.
-
#is_cancelled? ⇒ Boolean
(also: #is_canceled?)
Returns true if the underlying participant expression is gone or cancelling.
-
#is_gone? ⇒ Boolean
Returns true if the underlying participant expression is ‘gone’ (probably cancelled somehow).
-
#lookup_variable(key) ⇒ Object
A shortcut for.
-
#participant_name ⇒ Object
Up until ruote 2.3.0, the participant name had to be fetched from the workitem.
-
#re_dispatch(wi = nil, opts = nil) ⇒ Object
(also: #reject)
Use this method to re_dispatch the workitem.
-
#reply_to_engine(wi = workitem) ⇒ Object
(also: #reply)
Participant implementations call this method when their #on_workitem (#consume) methods are done and they want to hand back the workitem to the engine so that the flow can resume.
-
#unschedule_re_dispatch(fei = nil) ⇒ Object
Cancels the scheduled re_dispatch, if any.
Methods included from ReceiverMixin
#fetch_flow_expression, #fetch_workitem, #flunk, #launch, #receive, #sign
Instance Attribute Details
#context ⇒ Object
The engine context, it’s a local participant so it knows about the context in which the engine operates…
46 47 48 |
# File 'lib/ruote/part/local_participant.rb', line 46 def context @context end |
#fei ⇒ Object
Returns the current fei (Ruote::FlowExpressionId).
75 76 77 78 |
# File 'lib/ruote/part/local_participant.rb', line 75 def fei @fei ? @fei : @workitem.fei end |
#flavour ⇒ Object
Usually set right before a call to #on_cancel or #cancel
58 59 60 |
# File 'lib/ruote/part/local_participant.rb', line 58 def flavour @flavour end |
#workitem(fei = nil) ⇒ Object
Returns the current workitem if no fei is given. If a fei is given, it will return the applied workitem for that fei (if any).
The optional fei is mostly here for backward compatibility (with 2.2.0)
66 67 68 69 70 71 |
# File 'lib/ruote/part/local_participant.rb', line 66 def workitem(fei=nil) return fetch_workitem(fei) if fei @workitem ? @workitem : applied_workitem end |
Instance Method Details
#_accept?(wi) ⇒ Boolean
Test shortcut, alleviates the need to set the workitem before calling accept?
266 267 268 |
# File 'lib/ruote/part/local_participant.rb', line 266 def _accept?(wi) Ruote.participant_send(self, :accept?, 'workitem' => wi) end |
#_dont_thread?(wi) ⇒ Boolean Also known as: _do_not_thread, _do_not_thread?
Test shortcut, alleviates the need to set the workitem before calling dont_thread?, do_not_thread? or do_not_thread.
273 274 275 276 277 278 |
# File 'lib/ruote/part/local_participant.rb', line 273 def _dont_thread?(wi) Ruote.participant_send( self, [ :dont_thread?, :do_not_thread?, :do_not_thread ], 'workitem' => wi) end |
#_on_cancel(fei, flavour) ⇒ Object Also known as: _cancel
Test shortcut, alleviates the need to set fei and flavour before calling cancel / on_consume.
250 251 252 253 |
# File 'lib/ruote/part/local_participant.rb', line 250 def _on_cancel(fei, flavour) Ruote.participant_send( self, [ :on_cancel, :cancel ], 'fei' => fei, 'flavour' => flavour) end |
#_on_reply(wi) ⇒ Object
Test shortcut, alleviates the need to set the workitem before calling on_reply.
259 260 261 |
# File 'lib/ruote/part/local_participant.rb', line 259 def _on_reply(wi) Ruote.participant_send(self, :on_reply, 'workitem' => wi) end |
#_on_workitem(wi) ⇒ Object Also known as: _consume
Test shortcut, alleviates the need to set the workitem before calling consume / on_workitem.
241 242 243 244 |
# File 'lib/ruote/part/local_participant.rb', line 241 def _on_workitem(wi) Ruote.participant_send( self, [ :on_workitem, :consume ], 'workitem' => wi) end |
#_rtimeout(wi) ⇒ Object
Test shortcut, alleviates the need to set the workitem before calling rtimeout.
285 286 287 |
# File 'lib/ruote/part/local_participant.rb', line 285 def _rtimeout(wi) Ruote.participant_send(self, :rtimeout, 'workitem' => wi) end |
#applied_workitem(_fei = nil) ⇒ Object
Returns the workitem as was applied when the Ruote::ParticipantExpression was reached.
If the _fei arg is specified, it will return the corresponding applied workitem. This args is mostly here for backward compatibility.
97 98 99 100 |
# File 'lib/ruote/part/local_participant.rb', line 97 def applied_workitem(_fei=nil) fetch_workitem(_fei || fei) end |
#fexp(wi_or_fei = nil) ⇒ Object
Returns the Ruote::ParticipantExpression that corresponds with this participant.
If a wi_or_fei arg is given, will return the corresponding flow expression. This arg is mostly here for backward compatibility.
86 87 88 89 |
# File 'lib/ruote/part/local_participant.rb', line 86 def fexp(wi_or_fei=nil) flow_expression(wi_or_fei || fei) end |
#is_cancelled? ⇒ Boolean Also known as: is_canceled?
Returns true if the underlying participant expression is gone or cancelling.
300 301 302 303 304 305 306 307 |
# File 'lib/ruote/part/local_participant.rb', line 300 def is_cancelled? if fe = fexp return fe.h.state == 'cancelling' else true end end |
#is_gone? ⇒ Boolean
Returns true if the underlying participant expression is ‘gone’ (probably cancelled somehow).
292 293 294 295 |
# File 'lib/ruote/part/local_participant.rb', line 292 def is_gone? fexp.nil? end |
#lookup_variable(key) ⇒ Object
A shortcut for
fexp.lookup_variable(key)
120 121 122 123 |
# File 'lib/ruote/part/local_participant.rb', line 120 def lookup_variable(key) fexp.lookup_variable(key) end |
#participant_name ⇒ Object
Up until ruote 2.3.0, the participant name had to be fetched from the workitem. This is a shortcut, it lets you write participant code that look like
def on_workitem
(workitem.fields['supervisors'] || []) << participant_name
reply
end
111 112 113 114 |
# File 'lib/ruote/part/local_participant.rb', line 111 def participant_name workitem.participant_name end |
#re_dispatch(wi = nil, opts = nil) ⇒ Object Also known as: reject
Use this method to re_dispatch the workitem.
It takes two options :in and :at for “later re_dispatch”.
Look at the unschedule_re_dispatch method for an example of participant implementation that uses re_dispatch.
Without one of those options, the method is a “reject”.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/ruote/part/local_participant.rb', line 148 def re_dispatch(wi=nil, opts=nil) wi, opts = [ nil, wi ] if wi.is_a?(Hash) && opts.nil? wi ||= workitem() opts ||= {} wi.h['re_dispatch_count'] = wi.h['re_dispatch_count'].to_s.to_i + 1 msg = { 'action' => 'dispatch', 'fei' => wi.h.fei, 'workitem' => wi.h, 'participant_name' => wi.participant_name } if t = opts[:in] || opts[:at] sched_id = @context.storage.put_schedule('at', wi.h.fei, t, msg) exp = fexp(wi) exp.h['re_dispatch_sched_id'] = sched_id exp.try_persist else @context.storage.put_msg('dispatch', msg) end end |
#reply_to_engine(wi = workitem) ⇒ Object Also known as:
Participant implementations call this method when their #on_workitem (#consume) methods are done and they want to hand back the workitem to the engine so that the flow can resume.
the (wi=workitem) is mostly for backward compatibility (or for passing a totally different workitem to the engine).
132 133 134 135 |
# File 'lib/ruote/part/local_participant.rb', line 132 def reply_to_engine(wi=workitem) receive(wi) end |
#unschedule_re_dispatch(fei = nil) ⇒ Object
Cancels the scheduled re_dispatch, if any.
An example of ‘retrying participant’ :
class RetryParticipant
include Ruote::LocalParticipant
def initialize(opts)
@opts = opts
end
def on_workitem
begin
do_the_job
reply
rescue
re_dispatch(:in => @opts['delay'] || '1s')
end
end
def cancel
unschedule_re_dispatch
end
end
Note how unschedule_re_dispatch is used in the cancel method. Warning, this example could loop forever…
205 206 207 208 209 210 |
# File 'lib/ruote/part/local_participant.rb', line 205 def unschedule_re_dispatch(fei=nil) if s = fexp.h['re_dispatch_sched_id'] @context.storage.delete_schedule(s) end end |