Class: Cosmos::TriggerGroupManager

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmos/microservices/trigger_group_microservice.rb

Overview

The trigger manager starts a thread pool and subscribes to the telemtry decom topic add the packet to a queue. TriggerGroupManager adds the “packet” to the thread pool queue and the thread will evaluate the “trigger”.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, scope:, group:, share:) ⇒ TriggerGroupManager

Returns a new instance of TriggerGroupManager.



452
453
454
455
456
457
458
459
460
461
462
463
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 452

def initialize(name:, scope:, group:, share:)
  @name = name
  @scope = scope
  @group = group
  @share = share
  @worker_count = 3
  @queue = Queue.new
  @read_topic = true
  @topics = []
  @thread_pool = nil
  @cancel_thread = false
end

Instance Attribute Details

#groupObject (readonly)

Returns the value of attribute group.



450
451
452
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450

def group
  @group
end

#nameObject (readonly)

Returns the value of attribute name.



450
451
452
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450

def name
  @name
end

#scopeObject (readonly)

Returns the value of attribute scope.



450
451
452
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450

def scope
  @scope
end

#shareObject (readonly)

Returns the value of attribute share.



450
451
452
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450

def share
  @share
end

#thread_poolObject (readonly)

Returns the value of attribute thread_pool.



450
451
452
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450

def thread_pool
  @thread_pool
end

#topicsObject (readonly)

Returns the value of attribute topics.



450
451
452
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450

def topics
  @topics
end

Instance Method Details

#block_for_updatesObject



507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 507

def block_for_updates
  @read_topic = true
  while @read_topic
    begin
      Topic.read_topics(@topics) do |topic, _msg_id, msg_hash, _redis|
        Logger.debug "TriggerGroupManager block_for_updates: #{topic} #{msg_hash.to_s}"
        if topic != @share.trigger_base.autonomic_topic
          packet = JSON.parse(msg_hash['json_data'])
          @share.packet_base.add(topic: topic, packet: packet)
        end
        @queue << "#{topic}"
      end
    rescue StandardError => e
      Logger.error "TriggerGroupManager failed to read topics #{@topics}\n#{e.formatted}"
    end
  end
end

#generate_thread_poolObject



465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 465

def generate_thread_pool()
  thread_pool = []
  @worker_count.times do | i |
    worker = TriggerGroupWorker.new(
      name: @name,
      scope: @scope,
      group: @group,
      queue: @queue,
      share: @share,
      ident: i,
    )
    thread_pool << Thread.new { worker.run }
  end
  return thread_pool
end

#refreshObject



525
526
527
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 525

def refresh
  @read_topic = false
end

#runObject



481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 481

def run
  Logger.info "TriggerGroupManager running"
  @thread_pool = generate_thread_pool()
  loop do
    begin
      update_topics()
    rescue StandardError => e
      Logger.error "TriggerGroupManager failed to update topics.\n#{e.formatted}"
    end
    break if @cancel_thread

    block_for_updates()
    break if @cancel_thread
  end
  Logger.info "TriggerGroupManager exiting"
end

#shutdownObject



529
530
531
532
533
534
535
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 529

def shutdown
  @read_topic = false
  @cancel_thread = true
  @worker_count.times do | i |
    @queue << nil
  end
end

#update_topicsObject



498
499
500
501
502
503
504
505
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 498

def update_topics
  past_topics = @topics
  @topics = @share.trigger_base.topics()
  Logger.debug "TriggerGroupManager past_topics: #{past_topics} topics: #{@topics}"
  (past_topics - @topics).each do | removed_topic |
    @share.packet_base.remove(topic: removed_topic)
  end
end