Class: Ruote::StorageParticipant

Inherits:
Object
  • Object
show all
Includes:
Enumerable, LocalParticipant
Defined in:
lib/ruote/part/storage_participant.rb

Overview

A participant that stores the workitem in the same storage used by the engine and the worker(s).

part = engine.register_participant 'alfred', Ruote::StorageParticipant

# ... a bit later

puts "workitems still open : "
part.each do |workitem|
  puts "#{workitem.fei.wfid} - #{workitem.fields['params']['task']}"
end

# ... when done with a workitem

part.reply(workitem)
  # this will remove the workitem from the storage and hand it back
  # to the engine

Does not thread by default (the engine will not spawn a dedicated thread to handle the delivery to this participant, the workitem will get stored via the main engine thread and basta).

Instance Attribute Summary

Attributes included from LocalParticipant

#context, #fei, #flavour, #workitem

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Enumerable

#each_with_object

Methods included from LocalParticipant

#_accept?, #_dont_thread?, #_on_cancel, #_on_reply, #_on_workitem, #_rtimeout, #applied_workitem, #fexp, #is_cancelled?, #is_gone?, #lookup_variable, #participant_name, #re_dispatch, #reply_to_engine, #unschedule_re_dispatch

Methods included from ReceiverMixin

#fetch_flow_expression, #fetch_workitem, #launch, #receive, #sign

Constructor Details

#initialize(engine_or_options = {}, options = nil) ⇒ StorageParticipant

Returns a new instance of StorageParticipant.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/ruote/part/storage_participant.rb', line 58

def initialize(engine_or_options={}, options=nil)

  if engine_or_options.respond_to?(:context)
    @context = engine_or_options.context
  elsif engine_or_options.is_a?(Ruote::Context)
    @context = engine_or_options
  else
    @options = engine_or_options
  end

  @options ||= {}

  @store_name = @options['store_name']
end

Class Method Details

.matches?(hwi, pname, criteria) ⇒ Boolean

Used by #query when filtering workitems.

Returns:

  • (Boolean)


314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/ruote/part/storage_participant.rb', line 314

def self.matches?(hwi, pname, criteria)

  return false if pname && hwi['participant_name'] != pname

  fields = hwi['fields']

  criteria.each do |fname, fvalue|
    return false if fields[fname] != fvalue
  end

  true
end

Instance Method Details

#[](fei) ⇒ Object Also known as: by_fei

Given a fei (or its string version, a sid), returns the corresponding workitem (or nil).



127
128
129
130
131
132
# File 'lib/ruote/part/storage_participant.rb', line 127

def [](fei)

  doc = fetch(fei)

  doc ? Ruote::Workitem.new(doc) : nil
end

#all(opts = {}) ⇒ Object

Returns all the workitems stored in here.



192
193
194
195
196
197
# File 'lib/ruote/part/storage_participant.rb', line 192

def all(opts={})

  res = fetch_all(opts)

  res.is_a?(Array) ? res.map { |hwi| Ruote::Workitem.new(hwi) } : res
end

#by_field(field, value = nil, opts = {}) ⇒ Object

field : returns all the workitems with the given field name present.

field and value : returns all the workitems with the given field name and the given value for that field.

Warning : only some storages are optimized for such queries (like CouchStorage), the others will load all the workitems and then filter them.



240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/ruote/part/storage_participant.rb', line 240

def by_field(field, value=nil, opts={})

  (value, opts = nil, value) if value.is_a?(Hash)

  if @context.storage.respond_to?(:by_field)
    return @context.storage.by_field('workitems', field, value, opts)
  end

  do_select(opts) do |hwi|
    hwi['fields'].keys.include?(field) &&
    (value.nil? || hwi['fields'][field] == value)
  end
end

#by_participant(participant_name, opts = {}) ⇒ Object

Returns all workitems for the specified participant name



220
221
222
223
224
225
226
227
228
229
# File 'lib/ruote/part/storage_participant.rb', line 220

def by_participant(participant_name, opts={})

  return @context.storage.by_participant(
    'workitems', participant_name, opts
  ) if @context.storage.respond_to?(:by_participant)

  do_select(opts) do |hwi|
    hwi['participant_name'] == participant_name
  end
end

#by_wfid(wfid, opts = {}) ⇒ Object

Return all workitems for the specified wfid



209
210
211
212
213
214
215
216
# File 'lib/ruote/part/storage_participant.rb', line 209

def by_wfid(wfid, opts={})

  if @context.storage.respond_to?(:by_wfid)
    return @context.storage.by_wfid('workitems', wfid, opts)
  end

  wis(@context.storage.get_many('workitems', wfid, opts))
end

#delegate(workitem, new_owner) ⇒ Object

Delegates a currently owned workitem to a new owner.

Fails if the workitem can’t be found, belongs to noone, or if the workitem passed as argument is out of date (got modified in the mean time).

It’s OK to delegate to nil, thus freeing the workitem.

See #reserve for an an explanation of the reserve/delegate/proceed flow.



388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/ruote/part/storage_participant.rb', line 388

def delegate(workitem, new_owner)

  hwi = fetch(workitem)

  fail ArgumentError.new(
    "workitem not found"
  ) if hwi == nil

  fail ArgumentError.new(
    "cannot delegate, workitem doesn't belong to anyone"
  ) if hwi['owner'] == nil

  fail ArgumentError.new(
    "cannot delegate, " +
    "workitem owned by '#{hwi['owner']}', not '#{workitem.owner}'"
  ) if hwi['owner'] != workitem.owner

  hwi['owner'] = new_owner

  r = @context.storage.put(hwi, :update_rev => true)

  fail ArgumentError.new("workitem is gone") if r == true
  fail ArgumentError.new("workitem got modified meanwhile") if r != nil

  Workitem.new(hwi)
end

#do_not_threadObject

No need for a separate thread when delivering to this participant.



75
# File 'lib/ruote/part/storage_participant.rb', line 75

def do_not_thread; true; end

#each(&block) ⇒ Object

Iterates over the workitems stored in here.



185
186
187
188
# File 'lib/ruote/part/storage_participant.rb', line 185

def each(&block)

  all.each { |wi| block.call(wi) }
end

#firstObject

A convenience method (especially when testing), returns the first (only ?) workitem in the participant.



202
203
204
205
# File 'lib/ruote/part/storage_participant.rb', line 202

def first

  wi(fetch_all({}).first)
end

#flunk(workitem, err_class_or_instance, *err_arguments) ⇒ Object

Removes the workitem and hands it back to the flow with an error to raise for the participant expression that emitted the workitem.



152
153
154
155
156
157
158
159
160
161
# File 'lib/ruote/part/storage_participant.rb', line 152

def flunk(workitem, err_class_or_instance, *err_arguments)

  r = remove_workitem('reject', workitem)

  return flunk(workitem) if r != nil

  workitem.h.delete('_rev')

  super(workitem, err_class_or_instance, *err_arguments)
end

#on_cancelObject

Removes the document/workitem from the storage.

Warning: this method is called by the engine (worker), i.e. not by you.



113
114
115
116
117
118
119
120
121
122
# File 'lib/ruote/part/storage_participant.rb', line 113

def on_cancel

  doc = fetch(fei)

  return unless doc

  r = @context.storage.delete(doc)

  on_cancel(fei, flavour) if r != nil
end

#on_workitemObject

This is the method called by ruote when passing a workitem to this participant.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/ruote/part/storage_participant.rb', line 80

def on_workitem

  doc = workitem.to_h

  doc.merge!(
    'type' => 'workitems',
    '_id' => to_id(doc['fei']),
    'participant_name' => doc['participant_name'],
    'wfid' => doc['fei']['wfid'])

  doc['store_name'] = @store_name if @store_name

  @context.storage.put(doc)
end

#per_participantObject

Mostly a test method. Returns a Hash were keys are participant names and values are lists of workitems.



330
331
332
333
# File 'lib/ruote/part/storage_participant.rb', line 330

def per_participant

  each_with_object({}) { |wi, h| (h[wi.participant_name] ||= []) << wi }
end

#per_participant_countObject

Mostly a test method. Returns a Hash were keys are participant names and values are integers, the count of workitems for a given participant name.



339
340
341
342
# File 'lib/ruote/part/storage_participant.rb', line 339

def per_participant_count

  per_participant.remap { |(k, v), h| h[k] = v.size }
end

#proceed(workitem) ⇒ Object

Removes the workitem from the storage and replies to the engine.



138
139
140
141
142
143
144
145
146
147
# File 'lib/ruote/part/storage_participant.rb', line 138

def proceed(workitem)

  r = remove_workitem('proceed', workitem)

  return proceed(workitem) if r != nil

  workitem.h.delete('_rev')

  reply_to_engine(workitem)
end

#purge!Object

Cleans this participant out completely



307
308
309
310
# File 'lib/ruote/part/storage_participant.rb', line 307

def purge!

  fetch_all({}).each { |hwi| @context.storage.delete(hwi) }
end

#query(criteria) ⇒ Object

Queries the store participant for workitems.

Some examples :

part.query(:wfid => @wfid).size
part.query('place' => 'nara').size
part.query('place' => 'heiankyou').size
part.query(:wfid => @wfid, :place => 'heiankyou').size

There are two ‘reserved’ criterion : ‘wfid’ and ‘participant’ (‘participant_name’ as well). The rest of the criteria are considered constraints for fields.

‘offset’ and ‘limit’ are reserved as well. They should prove useful for pagination. ‘skip’ can be used instead of ‘offset’.

Note : the criteria is AND only, you’ll have to do ORs (aggregation) by yourself.



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/ruote/part/storage_participant.rb', line 273

def query(criteria)

  cr = Ruote.keys_to_s(criteria)

  if @context.storage.respond_to?(:query_workitems)
    return @context.storage.query_workitems(cr)
  end

  opts = {}
  opts[:skip] = cr.delete('offset') || cr.delete('skip')
  opts[:limit] = cr.delete('limit')
  opts[:count] = cr.delete('count')

  wfid = cr.delete('wfid')

  count = opts[:count]

  pname = cr.delete('participant_name') || cr.delete('participant')
  opts.delete(:count) if pname

  hwis = wfid ?
    @context.storage.get_many('workitems', wfid, opts) : fetch_all(opts)

  return hwis unless hwis.is_a?(Array)

  hwis = hwis.select { |hwi|
    Ruote::StorageParticipant.matches?(hwi, pname, cr)
  }

  count ? hwis.size : wis(hwis)
end

#reply(workitem) ⇒ Object

(soon to be removed)



165
166
167
168
169
170
171
172
173
174
# File 'lib/ruote/part/storage_participant.rb', line 165

def reply(workitem)

  puts '-' * 80
  puts '*** WARNING : please use the Ruote::StorageParticipant#proceed(wi)'
  puts '              instead of #reply(wi) which is deprecated'
  #caller.each { |l| puts l }
  puts '-' * 80

  proceed(workitem)
end

#reserve(workitem_or_fei, owner) ⇒ Object

Claims a workitem. Returns the [updated] workitem if successful.

Returns nil if the workitem is already reserved.

Fails if the workitem can’t be found, is gone, or got modified elsewhere.

Here is a mini-diagram explaining the reserve/delegate/proceed flow:

 in    delegate(nil)    delegate(other)
 |    +---------------+ +------+
 v    v               | |      v
+-------+  reserve   +----------+  proceed
| ready | ---------> | reserved | ---------> out
+-------+            +----------+


360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/ruote/part/storage_participant.rb', line 360

def reserve(workitem_or_fei, owner)

  hwi = fetch(workitem_or_fei)

  fail ArgumentError.new("workitem not found") if hwi.nil?

  return nil if hwi['owner'] && hwi['owner'] != owner

  hwi['owner'] = owner

  r = @context.storage.put(hwi, :update_rev => true)

  fail ArgumentError.new("workitem is gone") if r == true
  fail ArgumentError.new("workitem got modified meanwhile") if r != nil

  Workitem.new(hwi)
end

#sizeObject

Returns the count of workitems stored in this participant.



178
179
180
181
# File 'lib/ruote/part/storage_participant.rb', line 178

def size

  fetch_all(:count => true)
end

#update(workitem) ⇒ Object

Used by client code when “saving” a workitem (fields may have changed). Calling #update won’t proceed the workitem.

Returns nil in case of success, true if the workitem is already gone and the newer version of the workitem if the workitem changed in the mean time.



102
103
104
105
106
107
# File 'lib/ruote/part/storage_participant.rb', line 102

def update(workitem)

  r = @context.storage.put(workitem.h)

  r.is_a?(Hash) ? Ruote::Workitem.new(r) : r
end