Class: Cosmos::TriggerGroupWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmos/microservices/trigger_group_microservice.rb

Overview

The TriggerGroupWorker is a very simple thread pool worker. Once the trigger manager has pushed a packet to the queue one of these workers will evaluate the triggers in the kit and evaluate triggers for that packet.

Constant Summary collapse

TRIGGER_METRIC_NAME =
'trigger_eval_duration_seconds'.freeze
TYPE =
'type'.freeze
ITEM_RAW =
'raw'.freeze
ITEM_TARGET =
'target'.freeze
ITEM_PACKET =
'packet'.freeze
ITEM_TYPE =
'item'.freeze
FLOAT_TYPE =
'float'.freeze
STRING_TYPE =
'string'.freeze
LIMIT_TYPE =
'limit'.freeze
TRIGGER_TYPE =
'trigger'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, scope:, group:, queue:, share:, ident:) ⇒ TriggerGroupWorker

Returns a new instance of TriggerGroupWorker.



258
259
260
261
262
263
264
265
266
267
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 258

def initialize(name:, scope:, group:, queue:, share:, ident:)
  @name = name
  @scope = scope
  @group = group
  @queue = queue
  @share = share
  @ident = ident
  @metric = Metric.new(microservice: @name, scope: @scope)
  @metric_output_time = 0
end

Instance Attribute Details

#groupObject (readonly)

Returns the value of attribute group.



256
257
258
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 256

def group
  @group
end

#nameObject (readonly)

Returns the value of attribute name.



256
257
258
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 256

def name
  @name
end

#packetObject (readonly)

Returns the value of attribute packet.



256
257
258
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 256

def packet
  @packet
end

#scopeObject (readonly)

Returns the value of attribute scope.



256
257
258
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 256

def scope
  @scope
end

#targetObject (readonly)

Returns the value of attribute target.



256
257
258
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 256

def target
  @target
end

Instance Method Details

#evaluate(left:, operator:, right:) ⇒ Object

the base evaluate method used by evaluate_trigger

-1 (the value is considered an error used to disable the trigger)
 0 (the value is considered as a false value)
 1 (the value is considered as a true value)


366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 366

def evaluate(left:, operator:, right:)
  Logger.debug "TriggerGroupWorker-#{@ident} evaluate: (#{left} #{operator} #{right})"
  begin
    case operator
    when '>'
      return left > right ? 1 : 0
    when '<'
      return left < right ? 1 : 0
    when '>='
      return left >= right ? 1 : 0
    when '<='
      return left <= right ? 1 : 0
    when '!='
      return left != right ? 1 : 0
    when '=='
      return left == right ? 1 : 0
    when 'AND'
      return left && right ? 1 : 0
    when 'OR'
      return left || right ? 1 : 0
    end
  rescue ArgumentError
    Logger.error "invalid evaluate: (#{left} #{operator} #{right})"
    return -1
  end
end

#evaluate_data_packet(topic:, triggers:) ⇒ Object

Each packet will be evaluated to all triggers and use the result to send the results back to the topic to be used by the reaction microservice.



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 299

def evaluate_data_packet(topic:, triggers:)
  visited = Hash.new
  Logger.debug "TriggerGroupWorker-#{@ident} topic: #{topic}"
  triggers_to_eval = @share.trigger_base.get_triggers(topic: topic)
  Logger.debug "TriggerGroupWorker-#{@ident} triggers_to_eval: #{triggers_to_eval}"
  triggers_to_eval.each do | trigger |
    Logger.debug "TriggerGroupWorker-#{@ident} eval head: #{trigger}"
    value = evaluate_trigger(
      head: trigger,
      trigger: trigger,
      visited: visited,
      triggers: triggers
    )
    Logger.debug "TriggerGroupWorker-#{@ident} trigger: #{trigger} value: #{value}"
    # value MUST be -1, 0, or 1
    @share.trigger_base.update_state(name: trigger.name, value: value)
  end
end

#evaluate_trigger(head:, trigger:, visited:, triggers:) ⇒ Object

This could be confusing… So this is a recursive method for the TriggerGroupWorkers to call. It will use the trigger name and append a __P for path or __R for result. The Path is a Hash that contains a key for each node traveled to get results. When the result has been found it will be stored in the result key __R in the vistied Hash and eval_trigger will return a number.

-1 (the value is considered an error used to disable the trigger)
 0 (the value is considered as a false value)
 1 (the value is considered as a true value)

IF an operand is evaluated as nil it will log an error and return -1 IF a loop is detected it will log an error and return -1



405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 405

def evaluate_trigger(head:, trigger:, visited:, triggers:)
  if visited["#{trigger.name}__R"]
    return visited["#{trigger.name}__R"] 
  end
  if visited["#{trigger.name}__P"].nil?
    visited["#{trigger.name}__P"] = Hash.new
  end
  if visited["#{head.name}__P"][trigger.name]
    # Not sure if this is posible as on create it validates that the dependents are already created
    Logger.error "loop detected from #{head} -> #{trigger} path: #{visited["#{head.name}__P"]}"
    return visited["#{trigger.name}__R"] = -1
  end
  trigger.roots.each do | root_trigger_name |
    next if visited["#{root_trigger_name}__R"]
    root_trigger = triggers[root_trigger_name]
    if head.name == root_trigger.name
      Logger.error "loop detected from #{head} -> #{root_trigger} path: #{visited["#{head.name}__P"]}"
      return visited["#{trigger.name}__R"] = -1
    end
    result = evaluate_trigger(
      head: head,
      trigger: root_trigger,
      visited: visited,
      triggers: triggers
    )
    Logger.debug "TriggerGroupWorker-#{@ident} #{root_trigger.name} result: #{result}"
    visited["#{root_trigger.name}__R"] = visited["#{head.name}__P"][root_trigger.name] = result
  end
  left = operand_value(operand: trigger.left, other: trigger.right, visited: visited)
  right = operand_value(operand: trigger.right, other: trigger.left, visited: visited)
  if left.nil? || right.nil?
    return visited["#{trigger.name}__R"] = 0
  end
  result = evaluate(left: left, operator: trigger.operator, right: right)
  return visited["#{trigger.name}__R"] = result
end

#evaluate_wrapper(topic:) ⇒ Object

time how long each packet takes to eval and produce a metric to public



289
290
291
292
293
294
295
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 289

def evaluate_wrapper(topic:)
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  evaluate_data_packet(topic: topic, triggers: @share.trigger_base.triggers)
  diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
  metric_labels = { 'trigger_group' => @group, 'thread' => "worker-#{@ident}" }
  @metric.add_sample(name: TRIGGER_METRIC_NAME, value: diff, labels: metric_labels) 
end

#get_packet_limit(operand:, other:) ⇒ Object

extract the value outlined in the operand to get the packet item limit IF operand limit does not include _LOW or _HIGH this will match the COLOR and return COLOR_LOW || COLOR_HIGH operand item: GREEN_LOW == other operand limit: GREEN



322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 322

def get_packet_limit(operand:, other:)
  packet = @share.packet_base.packet(
    target: operand[ITEM_TARGET],
    packet: operand[ITEM_PACKET]
  )
  return nil if packet.nil?
  limit = packet["#{operand[ITEM_TYPE]}__L"]
  if limit.nil? == false && limit.include?('_')
    return other[LIMIT_TYPE] if limit.include?(other[LIMIT_TYPE])
  end
  return limit
end

#get_packet_value(operand:) ⇒ Object

extract the value outlined in the operand to get the packet item value IF raw in operand it will pull the raw value over the converted



337
338
339
340
341
342
343
344
345
346
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 337

def get_packet_value(operand:)
  packet = @share.packet_base.packet(
    target: operand[ITEM_TARGET],
    packet: operand[ITEM_PACKET]
  )
  return nil if packet.nil?

  value_type = operand[ITEM_RAW] ? '' : '__C'
  return packet["#{operand[ITEM_TYPE]}#{value_type}"]
end

#operand_value(operand:, other:, visited:) ⇒ Object

extract the value of the operand from the packet



349
350
351
352
353
354
355
356
357
358
359
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 349

def operand_value(operand:, other:, visited:)
  if operand[TYPE] == ITEM_TYPE && other[TYPE] == LIMIT_TYPE
    return get_packet_limit(operand: operand, other: other)
  elsif operand[TYPE] == ITEM_TYPE
    return get_packet_value(operand: operand)
  elsif operand[TYPE] == TRIGGER_TYPE
    return visited["#{operand[TRIGGER_TYPE]}__R"] == 1
  else
    return operand[operand[TYPE]]
  end
end

#runObject



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 269

def run
  Logger.info "TriggerGroupWorker-#{@ident} running"
  loop do
    topic = @queue.pop
    break if topic.nil?
    begin
      evaluate_wrapper(topic: topic)
      current_time = Time.now.to_i
      if @metric_output_time < current_time
        @metric.output
        @metric_output_time = current_time + 120
      end
    rescue StandardError => e
      Logger.error "TriggerGroupWorker-#{@ident} failed to evaluate data packet from topic: #{topic}\n#{e.formatted}"
    end
  end
  Logger.info "TriggerGroupWorker-#{@ident} exiting"
end