Module: Ruote::LocalParticipant

Overview

Provides methods for ‘local’ participants.

Assumes the class that includes this module has a #context method that points to the worker or engine ruote context.

It’s “local” because it has access to the ruote storage.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ReceiverMixin

#fetch_flow_expression, #fetch_workitem, #flunk, #launch, #receive, #sign

Instance Attribute Details

#contextObject

The engine context, it’s a local participant so it knows about the context in which the engine operates…



46
47
48
# File 'lib/ruote/part/local_participant.rb', line 46

def context
  @context
end

#feiObject

Returns the current fei (Ruote::FlowExpressionId).



75
76
77
78
# File 'lib/ruote/part/local_participant.rb', line 75

def fei

  @fei ? @fei : @workitem.fei
end

#flavourObject

Usually set right before a call to #on_cancel or #cancel



58
59
60
# File 'lib/ruote/part/local_participant.rb', line 58

def flavour
  @flavour
end

#workitem(fei = nil) ⇒ Object

Returns the current workitem if no fei is given. If a fei is given, it will return the applied workitem for that fei (if any).

The optional fei is mostly here for backward compatibility (with 2.2.0)



66
67
68
69
70
71
# File 'lib/ruote/part/local_participant.rb', line 66

def workitem(fei=nil)

  return fetch_workitem(fei) if fei

  @workitem ? @workitem : applied_workitem
end

Instance Method Details

#_accept?(wi) ⇒ Boolean

Test shortcut, alleviates the need to set the workitem before calling accept?



266
267
268
# File 'lib/ruote/part/local_participant.rb', line 266

def _accept?(wi)
  Ruote.participant_send(self, :accept?, 'workitem' => wi)
end

#_dont_thread?(wi) ⇒ Boolean Also known as: _do_not_thread, _do_not_thread?

Test shortcut, alleviates the need to set the workitem before calling dont_thread?, do_not_thread? or do_not_thread.



273
274
275
276
277
278
# File 'lib/ruote/part/local_participant.rb', line 273

def _dont_thread?(wi)
  Ruote.participant_send(
    self,
    [ :dont_thread?, :do_not_thread?, :do_not_thread ],
    'workitem' => wi)
end

#_on_cancel(fei, flavour) ⇒ Object Also known as: _cancel

Test shortcut, alleviates the need to set fei and flavour before calling cancel / on_consume.



250
251
252
253
# File 'lib/ruote/part/local_participant.rb', line 250

def _on_cancel(fei, flavour)
  Ruote.participant_send(
    self, [ :on_cancel, :cancel ], 'fei' => fei, 'flavour' => flavour)
end

#_on_reply(wi) ⇒ Object

Test shortcut, alleviates the need to set the workitem before calling on_reply.



259
260
261
# File 'lib/ruote/part/local_participant.rb', line 259

def _on_reply(wi)
  Ruote.participant_send(self, :on_reply, 'workitem' => wi)
end

#_on_workitem(wi) ⇒ Object Also known as: _consume

Test shortcut, alleviates the need to set the workitem before calling consume / on_workitem.



241
242
243
244
# File 'lib/ruote/part/local_participant.rb', line 241

def _on_workitem(wi)
  Ruote.participant_send(
    self, [ :on_workitem, :consume ], 'workitem' => wi)
end

#_rtimeout(wi) ⇒ Object

Test shortcut, alleviates the need to set the workitem before calling rtimeout.



285
286
287
# File 'lib/ruote/part/local_participant.rb', line 285

def _rtimeout(wi)
  Ruote.participant_send(self, :rtimeout, 'workitem' => wi)
end

#applied_workitem(_fei = nil) ⇒ Object

Returns the workitem as was applied when the Ruote::ParticipantExpression was reached.

If the _fei arg is specified, it will return the corresponding applied workitem. This args is mostly here for backward compatibility.



97
98
99
100
# File 'lib/ruote/part/local_participant.rb', line 97

def applied_workitem(_fei=nil)

  fetch_workitem(_fei || fei)
end

#fexp(wi_or_fei = nil) ⇒ Object

Returns the Ruote::ParticipantExpression that corresponds with this participant.

If a wi_or_fei arg is given, will return the corresponding flow expression. This arg is mostly here for backward compatibility.



86
87
88
89
# File 'lib/ruote/part/local_participant.rb', line 86

def fexp(wi_or_fei=nil)

  flow_expression(wi_or_fei || fei)
end

#is_cancelled?Boolean Also known as: is_canceled?

Returns true if the underlying participant expression is gone or cancelling.



300
301
302
303
304
305
306
307
# File 'lib/ruote/part/local_participant.rb', line 300

def is_cancelled?

  if fe = fexp
    return fe.h.state == 'cancelling'
  else
    true
  end
end

#is_gone?Boolean

Returns true if the underlying participant expression is ‘gone’ (probably cancelled somehow).



292
293
294
295
# File 'lib/ruote/part/local_participant.rb', line 292

def is_gone?

  fexp.nil?
end

#lookup_variable(key) ⇒ Object

A shortcut for

fexp.lookup_variable(key)


120
121
122
123
# File 'lib/ruote/part/local_participant.rb', line 120

def lookup_variable(key)

  fexp.lookup_variable(key)
end

#participant_nameObject

Up until ruote 2.3.0, the participant name had to be fetched from the workitem. This is a shortcut, it lets you write participant code that look like

def on_workitem
  (workitem.fields['supervisors'] || []) << participant_name
  reply
end


111
112
113
114
# File 'lib/ruote/part/local_participant.rb', line 111

def participant_name

  workitem.participant_name
end

#re_dispatch(wi = nil, opts = nil) ⇒ Object Also known as: reject

Use this method to re_dispatch the workitem.

It takes two options :in and :at for “later re_dispatch”.

Look at the unschedule_re_dispatch method for an example of participant implementation that uses re_dispatch.

Without one of those options, the method is a “reject”.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/ruote/part/local_participant.rb', line 148

def re_dispatch(wi=nil, opts=nil)

  wi, opts = [ nil, wi ] if wi.is_a?(Hash) && opts.nil?
  wi ||= workitem()
  opts ||= {}

  wi.h['re_dispatch_count'] = wi.h['re_dispatch_count'].to_s.to_i + 1

  msg = {
    'action' => 'dispatch',
    'fei' => wi.h.fei,
    'workitem' => wi.h,
    'participant_name' => wi.participant_name
  }

  if t = opts[:in] || opts[:at]

    sched_id = @context.storage.put_schedule('at', wi.h.fei, t, msg)

    exp = fexp(wi)
    exp.h['re_dispatch_sched_id'] = sched_id
    exp.try_persist

  else

    @context.storage.put_msg('dispatch', msg)
  end
end

#reply_to_engine(wi = workitem) ⇒ Object Also known as: reply

Participant implementations call this method when their #on_workitem (#consume) methods are done and they want to hand back the workitem to the engine so that the flow can resume.

the (wi=workitem) is mostly for backward compatibility (or for passing a totally different workitem to the engine).



132
133
134
135
# File 'lib/ruote/part/local_participant.rb', line 132

def reply_to_engine(wi=workitem)

  receive(wi)
end

#unschedule_re_dispatch(fei = nil) ⇒ Object

Cancels the scheduled re_dispatch, if any.

An example of ‘retrying participant’ :

class RetryParticipant
  include Ruote::LocalParticipant

  def initialize(opts)
    @opts = opts
  end

  def on_workitem
    begin
      do_the_job
      reply
    rescue
      re_dispatch(:in => @opts['delay'] || '1s')
    end
  end

  def cancel
    unschedule_re_dispatch
  end
end

Note how unschedule_re_dispatch is used in the cancel method. Warning, this example could loop forever…



205
206
207
208
209
210
# File 'lib/ruote/part/local_participant.rb', line 205

def unschedule_re_dispatch(fei=nil)

  if s = fexp.h['re_dispatch_sched_id']
    @context.storage.delete_schedule(s)
  end
end