Class: Ruote::Tracker

Inherits:
Object
  • Object
show all
Defined in:
lib/ruote/svc/tracker.rb

Overview

The tracker service is used by the “listen” expression. This services sees all the msg processed by a worker and triggers any listener interested in a particular msg.

Look at the ListenExpression for more details.

Instance Method Summary collapse

Constructor Details

#initialize(context) ⇒ Tracker

Returns a new instance of Tracker.



37
38
39
40
# File 'lib/ruote/svc/tracker.rb', line 37

def initialize(context)

  @context = context
end

Instance Method Details

#add_tracker(wfid, action, id, conditions, msg) ⇒ Object

Adds a tracker (usually when a ‘listen’ expression gets applied).



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/ruote/svc/tracker.rb', line 103

def add_tracker(wfid, action, id, conditions, msg)

  conditions =
    conditions && conditions.remap { |(k, v), h| h[k] = Array(v) }

  doc = @context.storage.get_trackers

  doc['trackers'][id] =
    { 'wfid' => wfid,
      'action' => action,
      'id' => id,
      'conditions' => conditions,
      'msg' => msg }

  r = @context.storage.put(doc)

  add_tracker(wfid, action, id, conditions, msg) if r
    # the put failed, have to redo the work
end

#on_msg(message) ⇒ Object

The context calls this method for each successfully processed msg in the worker.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/ruote/svc/tracker.rb', line 45

def on_msg(message)

  m_error = message['error']
  m_wfid = message['wfid'] || (message['fei']['wfid'] rescue nil)
  m_action = message['action']

  msg = m_action == 'error_intercepted' ? message['msg'] : message

  @context.storage.get_trackers['trackers'].each do |tracker_id, tracker|

    # filter msgs

    t_wfid = tracker['wfid']
    t_action = tracker['action']

    next if t_wfid && t_wfid != m_wfid
    next if t_action && t_action != m_action

    next unless does_match?(message, tracker['conditions'])

    if tracker_id == 'on_error' || tracker_id == 'on_terminate'

      fields = msg['workitem']['fields']

      next if m_action == 'error_intercepted' && fields['__error__']
      next if m_action == 'terminated' && (fields['__error__'] || fields['__terminate__'])
    end

    # prepare and emit/put 'reaction' message

    m = Ruote.fulldup(tracker['msg'])

    action = m.delete('action')

    m['wfid'] = m_wfid if m['wfid'] == 'replace'
    m['wfid'] ||= @context.wfidgen.generate

    m['workitem'] = msg['workitem'] if m['workitem'] == 'replace'

    if t_action == 'error_intercepted'
      m['workitem']['fields']['__error__'] = m_error
    elsif tracker_id == 'on_error' && m_action == 'error_intercepted'
      m['workitem']['fields']['__error__'] = m_error
    elsif tracker_id == 'on_terminate' && m_action == 'terminated'
      m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid }
    end

    if m['variables'] == 'compile'
      fexp = Ruote::Exp::FlowExpression.fetch(@context, msg['fei'])
      m['variables'] = fexp ? fexp.compile_variables : {}
    end

    @context.storage.put_msg(action, m)
  end
end

#remove_tracker(fei, doc = nil) ⇒ Object

Removes a tracker (usually when a ‘listen’ expression replies to its parent expression or is cancelled).



126
127
128
129
130
131
132
133
134
135
136
# File 'lib/ruote/svc/tracker.rb', line 126

def remove_tracker(fei, doc=nil)

  doc ||= @context.storage.get_trackers

  doc['trackers'].delete(Ruote.to_storage_id(fei))

  r = @context.storage.put(doc)

  remove_tracker(fei, r) if r
    # the put failed, have to redo the work
end