Class: Ruote::Tracker
- Inherits:
-
Object
- Object
- Ruote::Tracker
- Defined in:
- lib/ruote/svc/tracker.rb
Overview
The tracker service is used by the “listen” expression. This services sees all the msg processed by a worker and triggers any listener interested in a particular msg.
Look at the ListenExpression for more details.
Instance Method Summary collapse
-
#add_tracker(wfid, action, id, conditions, msg) ⇒ Object
Adds a tracker (usually when a ‘listen’ expression gets applied).
-
#initialize(context) ⇒ Tracker
constructor
A new instance of Tracker.
-
#on_msg(message) ⇒ Object
The context calls this method for each successfully processed msg in the worker.
-
#remove_tracker(fei, doc = nil) ⇒ Object
Removes a tracker (usually when a ‘listen’ expression replies to its parent expression or is cancelled).
Constructor Details
#initialize(context) ⇒ Tracker
Returns a new instance of Tracker.
37 38 39 40 |
# File 'lib/ruote/svc/tracker.rb', line 37 def initialize(context) @context = context end |
Instance Method Details
#add_tracker(wfid, action, id, conditions, msg) ⇒ Object
Adds a tracker (usually when a ‘listen’ expression gets applied).
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/ruote/svc/tracker.rb', line 103 def add_tracker(wfid, action, id, conditions, msg) conditions = conditions && conditions.remap { |(k, v), h| h[k] = Array(v) } doc = @context.storage.get_trackers doc['trackers'][id] = { 'wfid' => wfid, 'action' => action, 'id' => id, 'conditions' => conditions, 'msg' => msg } r = @context.storage.put(doc) add_tracker(wfid, action, id, conditions, msg) if r # the put failed, have to redo the work end |
#on_msg(message) ⇒ Object
The context calls this method for each successfully processed msg in the worker.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/ruote/svc/tracker.rb', line 45 def on_msg() m_error = ['error'] m_wfid = ['wfid'] || (['fei']['wfid'] rescue nil) m_action = ['action'] msg = m_action == 'error_intercepted' ? ['msg'] : @context.storage.get_trackers['trackers'].each do |tracker_id, tracker| # filter msgs t_wfid = tracker['wfid'] t_action = tracker['action'] next if t_wfid && t_wfid != m_wfid next if t_action && t_action != m_action next unless does_match?(, tracker['conditions']) if tracker_id == 'on_error' || tracker_id == 'on_terminate' fields = msg['workitem']['fields'] next if m_action == 'error_intercepted' && fields['__error__'] next if m_action == 'terminated' && (fields['__error__'] || fields['__terminate__']) end # prepare and emit/put 'reaction' message m = Ruote.fulldup(tracker['msg']) action = m.delete('action') m['wfid'] = m_wfid if m['wfid'] == 'replace' m['wfid'] ||= @context.wfidgen.generate m['workitem'] = msg['workitem'] if m['workitem'] == 'replace' if t_action == 'error_intercepted' m['workitem']['fields']['__error__'] = m_error elsif tracker_id == 'on_error' && m_action == 'error_intercepted' m['workitem']['fields']['__error__'] = m_error elsif tracker_id == 'on_terminate' && m_action == 'terminated' m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid } end if m['variables'] == 'compile' fexp = Ruote::Exp::FlowExpression.fetch(@context, msg['fei']) m['variables'] = fexp ? fexp.compile_variables : {} end @context.storage.put_msg(action, m) end end |
#remove_tracker(fei, doc = nil) ⇒ Object
Removes a tracker (usually when a ‘listen’ expression replies to its parent expression or is cancelled).
126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/ruote/svc/tracker.rb', line 126 def remove_tracker(fei, doc=nil) doc ||= @context.storage.get_trackers doc['trackers'].delete(Ruote.to_storage_id(fei)) r = @context.storage.put(doc) remove_tracker(fei, r) if r # the put failed, have to redo the work end |