Class: Cosmos::TriggerGroupWorker
- Defined in:
- lib/cosmos/microservices/trigger_group_microservice.rb
Overview
The TriggerGroupWorker is a very simple thread pool worker. Once the trigger manager has pushed a packet to the queue one of these workers will evaluate the triggers in the kit and evaluate triggers for that packet.
Constant Summary collapse
- TRIGGER_METRIC_NAME =
'trigger_eval_duration_seconds'.freeze
- TYPE =
'type'.freeze
- ITEM_RAW =
'raw'.freeze
- ITEM_TARGET =
'target'.freeze
- ITEM_PACKET =
'packet'.freeze
- ITEM_TYPE =
'item'.freeze
- FLOAT_TYPE =
'float'.freeze
- STRING_TYPE =
'string'.freeze
- LIMIT_TYPE =
'limit'.freeze
- TRIGGER_TYPE =
'trigger'.freeze
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#packet ⇒ Object
readonly
Returns the value of attribute packet.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#target ⇒ Object
readonly
Returns the value of attribute target.
Instance Method Summary collapse
-
#evaluate(left:, operator:, right:) ⇒ Object
the base evaluate method used by evaluate_trigger -1 (the value is considered an error used to disable the trigger) 0 (the value is considered as a false value) 1 (the value is considered as a true value).
-
#evaluate_data_packet(topic:, triggers:) ⇒ Object
Each packet will be evaluated to all triggers and use the result to send the results back to the topic to be used by the reaction microservice.
-
#evaluate_trigger(head:, trigger:, visited:, triggers:) ⇒ Object
This could be confusing…
-
#evaluate_wrapper(topic:) ⇒ Object
time how long each packet takes to eval and produce a metric to public.
-
#get_packet_limit(operand:, other:) ⇒ Object
extract the value outlined in the operand to get the packet item limit IF operand limit does not include _LOW or _HIGH this will match the COLOR and return COLOR_LOW || COLOR_HIGH operand item: GREEN_LOW == other operand limit: GREEN.
-
#get_packet_value(operand:) ⇒ Object
extract the value outlined in the operand to get the packet item value IF raw in operand it will pull the raw value over the converted.
-
#initialize(name:, scope:, group:, queue:, share:, ident:) ⇒ TriggerGroupWorker
constructor
A new instance of TriggerGroupWorker.
-
#operand_value(operand:, other:, visited:) ⇒ Object
extract the value of the operand from the packet.
- #run ⇒ Object
Constructor Details
#initialize(name:, scope:, group:, queue:, share:, ident:) ⇒ TriggerGroupWorker
Returns a new instance of TriggerGroupWorker.
258 259 260 261 262 263 264 265 266 267 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 258 def initialize(name:, scope:, group:, queue:, share:, ident:) @name = name @scope = scope @group = group @queue = queue @share = share @ident = ident @metric = Metric.new(microservice: @name, scope: @scope) @metric_output_time = 0 end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
256 257 258 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 256 def group @group end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
256 257 258 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 256 def name @name end |
#packet ⇒ Object (readonly)
Returns the value of attribute packet.
256 257 258 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 256 def packet @packet end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
256 257 258 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 256 def scope @scope end |
#target ⇒ Object (readonly)
Returns the value of attribute target.
256 257 258 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 256 def target @target end |
Instance Method Details
#evaluate(left:, operator:, right:) ⇒ Object
the base evaluate method used by evaluate_trigger
-1 (the value is considered an error used to disable the trigger)
0 (the value is considered as a false value)
1 (the value is considered as a true value)
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 366 def evaluate(left:, operator:, right:) Logger.debug "TriggerGroupWorker-#{@ident} evaluate: (#{left} #{operator} #{right})" begin case operator when '>' return left > right ? 1 : 0 when '<' return left < right ? 1 : 0 when '>=' return left >= right ? 1 : 0 when '<=' return left <= right ? 1 : 0 when '!=' return left != right ? 1 : 0 when '==' return left == right ? 1 : 0 when 'AND' return left && right ? 1 : 0 when 'OR' return left || right ? 1 : 0 end rescue ArgumentError Logger.error "invalid evaluate: (#{left} #{operator} #{right})" return -1 end end |
#evaluate_data_packet(topic:, triggers:) ⇒ Object
Each packet will be evaluated to all triggers and use the result to send the results back to the topic to be used by the reaction microservice.
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 299 def evaluate_data_packet(topic:, triggers:) visited = Hash.new Logger.debug "TriggerGroupWorker-#{@ident} topic: #{topic}" triggers_to_eval = @share.trigger_base.get_triggers(topic: topic) Logger.debug "TriggerGroupWorker-#{@ident} triggers_to_eval: #{triggers_to_eval}" triggers_to_eval.each do | trigger | Logger.debug "TriggerGroupWorker-#{@ident} eval head: #{trigger}" value = evaluate_trigger( head: trigger, trigger: trigger, visited: visited, triggers: triggers ) Logger.debug "TriggerGroupWorker-#{@ident} trigger: #{trigger} value: #{value}" # value MUST be -1, 0, or 1 @share.trigger_base.update_state(name: trigger.name, value: value) end end |
#evaluate_trigger(head:, trigger:, visited:, triggers:) ⇒ Object
This could be confusing… So this is a recursive method for the TriggerGroupWorkers to call. It will use the trigger name and append a __P for path or __R for result. The Path is a Hash that contains a key for each node traveled to get results. When the result has been found it will be stored in the result key __R in the vistied Hash and eval_trigger will return a number.
-1 (the value is considered an error used to disable the trigger)
0 (the value is considered as a false value)
1 (the value is considered as a true value)
IF an operand is evaluated as nil it will log an error and return -1 IF a loop is detected it will log an error and return -1
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 405 def evaluate_trigger(head:, trigger:, visited:, triggers:) if visited["#{trigger.name}__R"] return visited["#{trigger.name}__R"] end if visited["#{trigger.name}__P"].nil? visited["#{trigger.name}__P"] = Hash.new end if visited["#{head.name}__P"][trigger.name] # Not sure if this is posible as on create it validates that the dependents are already created Logger.error "loop detected from #{head} -> #{trigger} path: #{visited["#{head.name}__P"]}" return visited["#{trigger.name}__R"] = -1 end trigger.roots.each do | root_trigger_name | next if visited["#{root_trigger_name}__R"] root_trigger = triggers[root_trigger_name] if head.name == root_trigger.name Logger.error "loop detected from #{head} -> #{root_trigger} path: #{visited["#{head.name}__P"]}" return visited["#{trigger.name}__R"] = -1 end result = evaluate_trigger( head: head, trigger: root_trigger, visited: visited, triggers: triggers ) Logger.debug "TriggerGroupWorker-#{@ident} #{root_trigger.name} result: #{result}" visited["#{root_trigger.name}__R"] = visited["#{head.name}__P"][root_trigger.name] = result end left = operand_value(operand: trigger.left, other: trigger.right, visited: visited) right = operand_value(operand: trigger.right, other: trigger.left, visited: visited) if left.nil? || right.nil? return visited["#{trigger.name}__R"] = 0 end result = evaluate(left: left, operator: trigger.operator, right: right) return visited["#{trigger.name}__R"] = result end |
#evaluate_wrapper(topic:) ⇒ Object
time how long each packet takes to eval and produce a metric to public
289 290 291 292 293 294 295 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 289 def evaluate_wrapper(topic:) start = Process.clock_gettime(Process::CLOCK_MONOTONIC) evaluate_data_packet(topic: topic, triggers: @share.trigger_base.triggers) diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float metric_labels = { 'trigger_group' => @group, 'thread' => "worker-#{@ident}" } @metric.add_sample(name: TRIGGER_METRIC_NAME, value: diff, labels: metric_labels) end |
#get_packet_limit(operand:, other:) ⇒ Object
extract the value outlined in the operand to get the packet item limit IF operand limit does not include _LOW or _HIGH this will match the COLOR and return COLOR_LOW || COLOR_HIGH operand item: GREEN_LOW == other operand limit: GREEN
322 323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 322 def get_packet_limit(operand:, other:) packet = @share.packet_base.packet( target: operand[ITEM_TARGET], packet: operand[ITEM_PACKET] ) return nil if packet.nil? limit = packet["#{operand[ITEM_TYPE]}__L"] if limit.nil? == false && limit.include?('_') return other[LIMIT_TYPE] if limit.include?(other[LIMIT_TYPE]) end return limit end |
#get_packet_value(operand:) ⇒ Object
extract the value outlined in the operand to get the packet item value IF raw in operand it will pull the raw value over the converted
337 338 339 340 341 342 343 344 345 346 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 337 def get_packet_value(operand:) packet = @share.packet_base.packet( target: operand[ITEM_TARGET], packet: operand[ITEM_PACKET] ) return nil if packet.nil? value_type = operand[ITEM_RAW] ? '' : '__C' return packet["#{operand[ITEM_TYPE]}#{value_type}"] end |
#operand_value(operand:, other:, visited:) ⇒ Object
extract the value of the operand from the packet
349 350 351 352 353 354 355 356 357 358 359 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 349 def operand_value(operand:, other:, visited:) if operand[TYPE] == ITEM_TYPE && other[TYPE] == LIMIT_TYPE return get_packet_limit(operand: operand, other: other) elsif operand[TYPE] == ITEM_TYPE return get_packet_value(operand: operand) elsif operand[TYPE] == TRIGGER_TYPE return visited["#{operand[TRIGGER_TYPE]}__R"] == 1 else return operand[operand[TYPE]] end end |
#run ⇒ Object
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 269 def run Logger.info "TriggerGroupWorker-#{@ident} running" loop do topic = @queue.pop break if topic.nil? begin evaluate_wrapper(topic: topic) current_time = Time.now.to_i if @metric_output_time < current_time @metric.output @metric_output_time = current_time + 120 end rescue StandardError => e Logger.error "TriggerGroupWorker-#{@ident} failed to evaluate data packet from topic: #{topic}\n#{e.formatted}" end end Logger.info "TriggerGroupWorker-#{@ident} exiting" end |