Class: Ruote::Exp::ConcurrenceExpression
- Inherits:
-
FlowExpression
- Object
- FlowExpression
- Ruote::Exp::ConcurrenceExpression
- Includes:
- MergeMixin
- Defined in:
- lib/ruote/exp/fe_concurrence.rb
Overview
The ‘concurrence’ expression applies its child branches in parallel (well it makes a best effort to make them run in parallel).
concurrence do
alpha
bravo
end
attributes
The concurrence expression takes a number of attributes that allow for sophisticated control (especially at merge time).
:count
concurrence :count => 1 do
alpha
bravo
end
in that example, the concurrence will terminate as soon as 1 (count) of the branches replies. The other branch will get cancelled.
:count and :wait_for may point to a negative integer, meaning “all but x”.
concurrence :count => -2 do # all the branches replied but 2
# ...
end
:count can be shortened to :c.
:wait_for
This attribute accepts either an integer, either a list of tags.
When used with the integer, it’s equivalent to the :count attribute:
concurrence :wait_for => 1 do
# ...
end
It waits for 1 branch to respond and then moves on (concurrence over).
When used with a string (or an array), it extracts a list of tags and waits for the branches with those tags. Once all the tags have replied, the concurrence is over.
concurrence :wait_for => 'alpha, bravo' do
sequence :tag => 'alpha' do
# ...
end
sequence :tag => 'bravo' do
# ...
end
sequence :tag => 'charly' do
# ...
end
end
This concurrence will be over when the branches alpha and bravo have replied. The charly branch may have replied or not, it doesn’t matter.
:wait_for can be shortened to :wf.
:remaining
As said for :count, the remaining branches get cancelled. By setting :remaining to :forget (or ‘forget’), the remaining branches will continue their execution, forgotten.
concurrence :count => 1, :remaining => :forget do
alpha
bravo
end
:remaining can be shortened to :rem or :r.
The default is ‘cancel’, where all the remaining branches are cancelled while the hand is given back to the main flow.
There is a third setting, ‘wait’. It behaves like ‘cancel’, but the concurrence waits for the cancelled children to reply. The workitems from cancelled branches are merged in as well.
:merge
By default, the workitems override each others. By default, the first workitem to reply will win.
sequence do
concurrence do
alpha
bravo
end
charly
end
In that example, if ‘alpha’ replied first, the workitem that reaches ‘charly’ once ‘bravo’ replied will have the payload as seen/modified by ‘alpha’.
The :merge attribute determines which branch wins the merge.
-
first (default)
-
last
-
highest
-
lowest
highest and lowest refer to the position in the list of branch. It’s useful to set a fixed winner.
concurrence :merge => :highest do
alpha
bravo
end
makes sure that alpha’s version of the workitem wins.
:merge can be shortened to :m.
:merge_type
:override
By default, the merge type is set to ‘override’, which means that the ‘winning’ workitem’s payload supplants all other workitems’ payloads.
:mix
Setting :merge_type to :mix, will actually attempt to merge field by field, making sure that the field value of the winner(s) are used.
:isolate
:isolate will rearrange the resulting workitem payload so that there is a new field for each branch. The name of each field is the index of the branch from ‘0’ to …
:stack
:stack will stack the workitems coming back from the concurrence branches in an array whose order is determined by the :merge attributes. The array is placed in the ‘stack’ field of the resulting workitem. Note that the :stack merge_type also creates a ‘stack_attributes’ field and populates it with the expanded attributes of the concurrence.
Thus
sequence do
concurrence :merge => :highest, :merge_type => :stack do
reviewer1
reviewer2
end
editor
end
will see the ‘editor’ receive a workitem whose fields look like :
{ 'stack' => [{ ... reviewer1 fields ... }, { ... reviewer2 fields ... }],
'stack_attributes' => { 'merge'=> 'highest', 'merge_type' => 'stack' } }
This could prove useful for participant having to deal with multiple merge strategy results.
:union
(Available from ruote 2.3.0)
Will override atomic fields, concat arrays and merge hashes…
The union of those two workitems
{ 'a' => 0, 'b' => [ 'x', 'y' ], 'c' => { 'aa' => 'bb' }
{ 'a' => 1, 'b' => [ 'y', 'z' ], 'c' => { 'cc' => 'dd' }
will be
{ 'a' => 1,
'b' => [ 'x', 'y', 'z' ],
'c' => { 'aa' => 'bb', 'cc' => 'dd' } }
Warning: duplicates in arrays present before the merge will be removed as well.
:concat
(Available from ruote 2.3.0)
Much like :union, but duplicates are not removed. Thus
{ 'a' => 0, 'b' => [ 'x', 'y' ], 'c' => { 'aa' => 'bb' }
{ 'a' => 1, 'b' => [ 'y', 'z' ], 'c' => { 'cc' => 'dd' }
will be
{ 'a' => 1,
'b' => [ 'x', 'y', 'y', 'z' ],
'c' => { 'aa' => 'bb', 'cc' => 'dd' } }
:deep
(Available from ruote 2.3.0)
Identical to :concat but hashes are merged with deep_merge (ActiveSupport flavour).
:ignore
(Available from ruote 2.3.0)
A very simple merge type, the workitems given back by the branches are simply discarded and the workitem as passed to the concurrence expression is used to reply to the parent expression (of the concurrence expression).
:merge_type can be shortened to :mt.
:over_if (and :over_unless) attribute
Like the :count attribute controls how many branches have to reply before a concurrence ends, the :over attribute is used to specify a condition upon which the concurrence will [prematurely] end.
concurrence :over_if => '${f:over}'
alpha
bravo
charly
end
will end the concurrence as soon as one of the branches replies with a workitem whose field ‘over’ is set to true. (the remaining branches will get cancelled unless :remaining => :forget is set).
:over_unless needs no explanation.
Direct Known Subclasses
Constant Summary collapse
- COUNT_R =
/^-?\d+$/
Constants inherited from FlowExpression
FlowExpression::COMMON_ATT_KEYS
Instance Attribute Summary
Attributes inherited from FlowExpression
Instance Method Summary collapse
Methods included from MergeMixin
#merge_workitem, #merge_workitems
Methods inherited from FlowExpression
#ancestor?, #applied_workitem, #att, #att_text, #attribute, #attribute_text, #attributes, #cancel, #cancel_flanks, #compile_atts, #compile_variables, #debug_id, #deflate, #do, do_action, #do_apply, #do_cancel, #do_fail, #do_pause, #do_persist, #do_reply, #do_reply_to_parent, #do_resume, #do_unpersist, dummy, #fei, fetch, from_h, #handle_on_error, #has_attribute, #initial_persist, #initialize, #iterative_var_lookup, #launch_sub, #lookup_val, #lookup_val_prefix, #lookup_variable, #name, names, #parent, #parent_id, #persist_or_raise, #root, #root_id, #set_variable, #to_h, #tree, #tree_children, #try_persist, #try_unpersist, #unpersist_or_raise, #unset_variable, #update_tree, #variables, #wfid
Methods included from WithMeta
Methods included from WithH
Constructor Details
This class inherits a constructor from Ruote::Exp::FlowExpression
Instance Method Details
#apply ⇒ Object
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/ruote/exp/fe_concurrence.rb', line 276 def apply return do_reply_to_parent(h.applied_workitem) if tree_children.empty? # # count and wait_for count = (attribute(:count) || attribute(:c)).to_s count = nil unless COUNT_R.match(count) wf = count || attribute(:wait_for) || attribute(:wf) if COUNT_R.match(wf.to_s) h.ccount = wf.to_i elsif wf h.wait_for = Ruote.comma_split(wf) end # # other attributes h.cmerge = att( [ :merge, :m ], %w[ first last highest lowest ]) h.cmerge_type = att( [ :merge_type, :mt ], %w[ override mix isolate stack union ignore concat deep ]) h.remaining = att( [ :remaining, :rem, :r ], %w[ cancel forget wait ]) h.workitems = (h.cmerge == 'first' || h.cmerge == 'last') ? [] : {} h.over = false apply_children @context.storage.put_msg( 'reply', 'fei' => h.fei, 'workitem' => h.applied_workitem ) if h.ccount == 0 # # force an immediate reply end |
#reply(workitem) ⇒ Object
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 |
# File 'lib/ruote/exp/fe_concurrence.rb', line 320 def reply(workitem) workitem = Ruote.fulldup(workitem) # # since workitem field merging might happen, better to work on # a copy of the workitem (so that history, coming afterwards, # doesn't see a modified version of the workitem) if h.cmerge == 'first' || h.cmerge == 'last' h.workitems << workitem else h.workitems[workitem['fei']['expid']] = workitem end if h.wait_for && tag = workitem['fields']['__left_tag__'] h.wait_for.delete(tag) end over = h.over h.over = over || over?(workitem) if (not over) && h.over # just became 'over' reply_to_parent(nil) elsif h.over && h.remaining == 'wait' reply_to_parent(nil) elsif h.children.empty? do_unpersist || return @context.storage.put_msg( 'ceased', 'wfid' => h.fei['wfid'], 'fei' => h.fei, 'workitem' => workitem) else do_persist end end |