Module: Ruote::StorageBase
- Included in:
- CompositeStorage, FsStorage, HashStorage
- Defined in:
- lib/ruote/storage/base.rb
Overview
Base methods for storage implementations.
Constant Summary collapse
- DUMMY_WORKER =
A helper for the #worker method, it returns that dummy worker when there is no reference to the calling worker in the current thread’s local variables.
OpenStruct.new( :name => 'worker', :identity => 'unknown', :state => 'running')
Instance Method Summary collapse
-
#clear ⇒ Object
Used when doing integration tests, removes all msgs, schedules, errors, expressions and workitems.
-
#context ⇒ Object
– misc ++.
- #context=(c) ⇒ Object
-
#copy_to(target, opts = {}) ⇒ Object
Copies the content of this storage into a target storage.
- #delete_schedule(schedule_id) ⇒ Object
- #dump(type) ⇒ Object
- #empty?(type) ⇒ Boolean
-
#expression_wfids(opts) ⇒ Object
Given all the expressions stored here, returns a sorted list of unique wfids (this is used in Engine#processes(opts).
-
#find_expressions(wfid) ⇒ Object
Given a wfid, returns all the expressions of that process instance.
-
#find_root_expression(wfid) ⇒ Object
For a given wfid, fetches all the root expressions, sort by expid and return the first.
-
#find_root_expressions(wfid) ⇒ Object
For a given wfid, returns all the expressions (array of Hash instances) that have a nil ‘parent_id’.
-
#get_configuration(key) ⇒ Object
– configurations ++.
-
#get_engine_variable(k) ⇒ Object
– engine variables ++.
- #get_msgs ⇒ Object
-
#get_schedules(delta, now) ⇒ Object
– ats and crons ++.
-
#get_trackers ⇒ Object
– trackers ++.
- #put_engine_variable(k, v) ⇒ Object
-
#put_msg(action, options) ⇒ Object
– messages ++.
-
#put_schedule(flavour, owner_fei, s, msg) ⇒ Object
Places schedule in storage.
-
#remove_process(wfid) ⇒ Object
Removes a process by removing all its schedules, expressions, errors, workitems and trackers.
- #replace_engine_configuration(options) ⇒ Object
-
#reserve(doc) ⇒ Object
Attempts to delete a document, returns true if the deletion succeeded.
-
#worker ⇒ Object
Warning, this is not equivalent to doing @context.worker, this method fetches the worker from the local thread variables.
Instance Method Details
#clear ⇒ Object
Used when doing integration tests, removes all msgs, schedules, errors, expressions and workitems.
NOTE that it doesn’t remove engine variables (danger)
274 275 276 277 278 279 |
# File 'lib/ruote/storage/base.rb', line 274 def clear %w[ msgs schedules errors expressions workitems ].each do |type| purge_type!(type) end end |
#context ⇒ Object
– misc ++
40 41 42 43 |
# File 'lib/ruote/storage/base.rb', line 40 def context @context ||= Ruote::Context.new(self) end |
#context=(c) ⇒ Object
45 46 47 48 |
# File 'lib/ruote/storage/base.rb', line 45 def context=(c) @context = c end |
#copy_to(target, opts = {}) ⇒ Object
Copies the content of this storage into a target storage.
Of course, the target storage may be a different implementation.
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/ruote/storage/base.rb', line 246 def copy_to(target, opts={}) counter = 0 %w[ configurations errors expressions msgs schedules variables workitems ].each do |type| ids(type).each do |id| item = get(type, id) item.delete('_rev') target.put(item) counter += 1 puts(" #{type}/#{item['_id']}") if opts[:verbose] end end counter end |
#delete_schedule(schedule_id) ⇒ Object
213 214 215 216 217 218 219 |
# File 'lib/ruote/storage/base.rb', line 213 def delete_schedule(schedule_id) return if schedule_id.nil? s = get('schedules', schedule_id) delete(s) if s end |
#dump(type) ⇒ Object
306 307 308 309 310 311 |
# File 'lib/ruote/storage/base.rb', line 306 def dump(type) require 'yaml' YAML.dump({ type => get_many(type) }) end |
#empty?(type) ⇒ Boolean
110 111 112 113 |
# File 'lib/ruote/storage/base.rb', line 110 def empty?(type) (get_many(type, nil, :count => true) == 0) end |
#expression_wfids(opts) ⇒ Object
Given all the expressions stored here, returns a sorted list of unique wfids (this is used in Engine#processes(opts).
Understands the :skip, :limit and :descending options.
This is a base implementation, different storage implementations may come up with different implementations (think CouchDB, which could provide a view for it).
151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/ruote/storage/base.rb', line 151 def expression_wfids(opts) wfids = ids('expressions').collect { |fei| fei.split('!').last }.uniq.sort wfids = wfids.reverse if opts[:descending] skip = opts[:skip] || 0 limit = opts[:limit] || wfids.length wfids[skip, limit] end |
#find_expressions(wfid) ⇒ Object
Given a wfid, returns all the expressions of that process instance.
121 122 123 124 |
# File 'lib/ruote/storage/base.rb', line 121 def find_expressions(wfid) get_many('expressions', wfid) end |
#find_root_expression(wfid) ⇒ Object
For a given wfid, fetches all the root expressions, sort by expid and return the first. Hopefully it’s the right root_expression.
137 138 139 140 |
# File 'lib/ruote/storage/base.rb', line 137 def find_root_expression(wfid) find_root_expressions(wfid).sort_by { |hexp| hexp['fei']['expid'] }.first end |
#find_root_expressions(wfid) ⇒ Object
For a given wfid, returns all the expressions (array of Hash instances) that have a nil ‘parent_id’.
129 130 131 132 |
# File 'lib/ruote/storage/base.rb', line 129 def find_root_expressions(wfid) find_expressions(wfid).select { |hexp| hexp['parent_id'].nil? } end |
#get_configuration(key) ⇒ Object
– configurations ++
77 78 79 80 |
# File 'lib/ruote/storage/base.rb', line 77 def get_configuration(key) get('configurations', key) end |
#get_engine_variable(k) ⇒ Object
– engine variables ++
225 226 227 228 |
# File 'lib/ruote/storage/base.rb', line 225 def get_engine_variable(k) get_engine_variables['variables'][k] end |
#get_msgs ⇒ Object
105 106 107 108 |
# File 'lib/ruote/storage/base.rb', line 105 def get_msgs get_many('msgs', nil, :limit => 300).sort_by { |d| d['put_at'] } end |
#get_schedules(delta, now) ⇒ Object
– ats and crons ++
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/ruote/storage/base.rb', line 177 def get_schedules(delta, now) # TODO : bring that 'optimization' back in, # maybe every minute, if min != last_min ... #if delta < 1.0 # at = now.strftime('%Y%m%d%H%M%S') # get_many('schedules', /-#{at}$/) #elsif delta < 60.0 # at = now.strftime('%Y%m%d%H%M') # scheds = get_many('schedules', /-#{at}\d\d$/) # filter_schedules(scheds, now) #else # load all the schedules scheds = get_many('schedules') filter_schedules(scheds, now) #end end |
#get_trackers ⇒ Object
– trackers ++
167 168 169 170 171 |
# File 'lib/ruote/storage/base.rb', line 167 def get_trackers get('variables', 'trackers') || { '_id' => 'trackers', 'type' => 'variables', 'trackers' => {} } end |
#put_engine_variable(k, v) ⇒ Object
230 231 232 233 234 235 236 |
# File 'lib/ruote/storage/base.rb', line 230 def put_engine_variable(k, v) vars = get_engine_variables vars['variables'][k] = v put_engine_variable(k, v) unless put(vars).nil? end |
#put_msg(action, options) ⇒ Object
– messages ++
98 99 100 101 102 103 |
# File 'lib/ruote/storage/base.rb', line 98 def put_msg(action, ) msg = prepare_msg_doc(action, ) put(msg) end |
#put_schedule(flavour, owner_fei, s, msg) ⇒ Object
Places schedule in storage. Returns the id of the ‘schedule’ document. If the schedule got triggered immediately, nil is returned.
200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/ruote/storage/base.rb', line 200 def put_schedule(flavour, owner_fei, s, msg) doc = prepare_schedule_doc(flavour, owner_fei, s, msg) return nil unless doc r = put(doc) raise "put_schedule failed" if r != nil doc['_id'] end |
#remove_process(wfid) ⇒ Object
Removes a process by removing all its schedules, expressions, errors, workitems and trackers.
Warning: will not trigger any cancel behaviours at all, just removes the process.
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/ruote/storage/base.rb', line 287 def remove_process(wfid) 2.times do # two passes Thread.pass %w[ schedules expressions errors workitems ].each do |type| get_many(type, wfid).each { |d| delete(d) } end doc = get_trackers doc['trackers'].delete_if { |k, v| k.end_with?("!#{wfid}") } @context.storage.put(doc) end end |
#replace_engine_configuration(options) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/ruote/storage/base.rb', line 82 def replace_engine_configuration() return if .delete('preserve_configuration') conf = get('configurations', 'engine') doc = .merge('type' => 'configurations', '_id' => 'engine') doc['_rev'] = conf['_rev'] if conf put(doc) end |
#reserve(doc) ⇒ Object
Attempts to delete a document, returns true if the deletion succeeded. This is used with msgs to reserve work on them.
53 54 55 56 |
# File 'lib/ruote/storage/base.rb', line 53 def reserve(doc) delete(doc).nil? end |
#worker ⇒ Object
Warning, this is not equivalent to doing @context.worker, this method fetches the worker from the local thread variables.
68 69 70 71 |
# File 'lib/ruote/storage/base.rb', line 68 def worker Thread.current['ruote_worker'] || DUMMY_WORKER end |