Module: Ruote::StorageBase

Included in:
CompositeStorage, FsStorage, HashStorage
Defined in:
lib/ruote/storage/base.rb

Overview

Base methods for storage implementations.

Constant Summary collapse

DUMMY_WORKER =

A helper for the #worker method, it returns that dummy worker when there is no reference to the calling worker in the current thread’s local variables.

OpenStruct.new(
:name => 'worker', :identity => 'unknown', :state => 'running')

Instance Method Summary collapse

Instance Method Details

#clearObject

Used when doing integration tests, removes all msgs, schedules, errors, expressions and workitems.

NOTE that it doesn’t remove engine variables (danger)


274
275
276
277
278
279
# File 'lib/ruote/storage/base.rb', line 274

def clear

  %w[ msgs schedules errors expressions workitems ].each do |type|
    purge_type!(type)
  end
end

#contextObject

– misc ++


40
41
42
43
# File 'lib/ruote/storage/base.rb', line 40

def context

  @context ||= Ruote::Context.new(self)
end

#context=(c) ⇒ Object


45
46
47
48
# File 'lib/ruote/storage/base.rb', line 45

def context=(c)

  @context = c
end

#copy_to(target, opts = {}) ⇒ Object

Copies the content of this storage into a target storage.

Of course, the target storage may be a different implementation.


246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/ruote/storage/base.rb', line 246

def copy_to(target, opts={})

  counter = 0

  %w[
    configurations errors expressions msgs schedules variables workitems
  ].each do |type|

    ids(type).each do |id|

      item = get(type, id)

      item.delete('_rev')
      target.put(item)

      counter += 1
      puts("  #{type}/#{item['_id']}") if opts[:verbose]
    end
  end

  counter
end

#delete_schedule(schedule_id) ⇒ Object


213
214
215
216
217
218
219
# File 'lib/ruote/storage/base.rb', line 213

def delete_schedule(schedule_id)

  return if schedule_id.nil?

  s = get('schedules', schedule_id)
  delete(s) if s
end

#dump(type) ⇒ Object


306
307
308
309
310
311
# File 'lib/ruote/storage/base.rb', line 306

def dump(type)

  require 'yaml'

  YAML.dump({ type => get_many(type) })
end

#empty?(type) ⇒ Boolean

Returns:

  • (Boolean)

110
111
112
113
# File 'lib/ruote/storage/base.rb', line 110

def empty?(type)

  (get_many(type, nil, :count => true) == 0)
end

#expression_wfids(opts) ⇒ Object

Given all the expressions stored here, returns a sorted list of unique wfids (this is used in Engine#processes(opts).

Understands the :skip, :limit and :descending options.

This is a base implementation, different storage implementations may come up with different implementations (think CouchDB, which could provide a view for it).


151
152
153
154
155
156
157
158
159
160
161
# File 'lib/ruote/storage/base.rb', line 151

def expression_wfids(opts)

  wfids = ids('expressions').collect { |fei| fei.split('!').last }.uniq.sort

  wfids = wfids.reverse if opts[:descending]

  skip = opts[:skip] || 0
  limit = opts[:limit] || wfids.length

  wfids[skip, limit]
end

#find_expressions(wfid) ⇒ Object

Given a wfid, returns all the expressions of that process instance.


121
122
123
124
# File 'lib/ruote/storage/base.rb', line 121

def find_expressions(wfid)

  get_many('expressions', wfid)
end

#find_root_expression(wfid) ⇒ Object

For a given wfid, fetches all the root expressions, sort by expid and return the first. Hopefully it’s the right root_expression.


137
138
139
140
# File 'lib/ruote/storage/base.rb', line 137

def find_root_expression(wfid)

  find_root_expressions(wfid).sort_by { |hexp| hexp['fei']['expid'] }.first
end

#find_root_expressions(wfid) ⇒ Object

For a given wfid, returns all the expressions (array of Hash instances) that have a nil ‘parent_id’.


129
130
131
132
# File 'lib/ruote/storage/base.rb', line 129

def find_root_expressions(wfid)

  find_expressions(wfid).select { |hexp| hexp['parent_id'].nil? }
end

#get_configuration(key) ⇒ Object

– configurations ++


77
78
79
80
# File 'lib/ruote/storage/base.rb', line 77

def get_configuration(key)

  get('configurations', key)
end

#get_engine_variable(k) ⇒ Object

– engine variables ++


225
226
227
228
# File 'lib/ruote/storage/base.rb', line 225

def get_engine_variable(k)

  get_engine_variables['variables'][k]
end

#get_msgsObject


105
106
107
108
# File 'lib/ruote/storage/base.rb', line 105

def get_msgs

  get_many('msgs', nil, :limit => 300).sort_by { |d| d['put_at'] }
end

#get_schedules(delta, now) ⇒ Object

– ats and crons ++


177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/ruote/storage/base.rb', line 177

def get_schedules(delta, now)

  # TODO : bring that 'optimization' back in,
  #        maybe every minute, if min != last_min ...

  #if delta < 1.0
  #  at = now.strftime('%Y%m%d%H%M%S')
  #  get_many('schedules', /-#{at}$/)
  #elsif delta < 60.0
  #  at = now.strftime('%Y%m%d%H%M')
  #  scheds = get_many('schedules', /-#{at}\d\d$/)
  #  filter_schedules(scheds, now)
  #else # load all the schedules

  scheds = get_many('schedules')
  filter_schedules(scheds, now)

  #end
end

#get_trackersObject

– trackers ++


167
168
169
170
171
# File 'lib/ruote/storage/base.rb', line 167

def get_trackers

  get('variables', 'trackers') ||
    { '_id' => 'trackers', 'type' => 'variables', 'trackers' => {} }
end

#put_engine_variable(k, v) ⇒ Object


230
231
232
233
234
235
236
# File 'lib/ruote/storage/base.rb', line 230

def put_engine_variable(k, v)

  vars = get_engine_variables
  vars['variables'][k] = v

  put_engine_variable(k, v) unless put(vars).nil?
end

#put_msg(action, options) ⇒ Object

– messages ++


98
99
100
101
102
103
# File 'lib/ruote/storage/base.rb', line 98

def put_msg(action, options)

  msg = prepare_msg_doc(action, options)

  put(msg)
end

#put_schedule(flavour, owner_fei, s, msg) ⇒ Object

Places schedule in storage. Returns the id of the ‘schedule’ document. If the schedule got triggered immediately, nil is returned.


200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/ruote/storage/base.rb', line 200

def put_schedule(flavour, owner_fei, s, msg)

  doc = prepare_schedule_doc(flavour, owner_fei, s, msg)

  return nil unless doc

  r = put(doc)

  raise "put_schedule failed" if r != nil

  doc['_id']
end

#remove_process(wfid) ⇒ Object

Removes a process by removing all its schedules, expressions, errors, workitems and trackers.

Warning: will not trigger any cancel behaviours at all, just removes the process.


287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/ruote/storage/base.rb', line 287

def remove_process(wfid)

  2.times do
    # two passes

    Thread.pass

    %w[ schedules expressions errors workitems ].each do |type|
      get_many(type, wfid).each { |d| delete(d) }
    end

    doc = get_trackers

    doc['trackers'].delete_if { |k, v| k.end_with?("!#{wfid}") }

    @context.storage.put(doc)
  end
end

#replace_engine_configuration(options) ⇒ Object


82
83
84
85
86
87
88
89
90
91
92
# File 'lib/ruote/storage/base.rb', line 82

def replace_engine_configuration(options)

  return if options.delete('preserve_configuration')

  conf = get('configurations', 'engine')

  doc = options.merge('type' => 'configurations', '_id' => 'engine')
  doc['_rev'] = conf['_rev'] if conf

  put(doc)
end

#reserve(doc) ⇒ Object

Attempts to delete a document, returns true if the deletion succeeded. This is used with msgs to reserve work on them.


53
54
55
56
# File 'lib/ruote/storage/base.rb', line 53

def reserve(doc)

  delete(doc).nil?
end

#workerObject

Warning, this is not equivalent to doing @context.worker, this method fetches the worker from the local thread variables.


68
69
70
71
# File 'lib/ruote/storage/base.rb', line 68

def worker

  Thread.current['ruote_worker'] || DUMMY_WORKER
end