Class: Cosmos::TriggerGroupManager
- Defined in:
- lib/cosmos/microservices/trigger_group_microservice.rb
Overview
The trigger manager starts a thread pool and subscribes to the telemtry decom topic add the packet to a queue. TriggerGroupManager adds the “packet” to the thread pool queue and the thread will evaluate the “trigger”.
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#share ⇒ Object
readonly
Returns the value of attribute share.
-
#thread_pool ⇒ Object
readonly
Returns the value of attribute thread_pool.
-
#topics ⇒ Object
readonly
Returns the value of attribute topics.
Instance Method Summary collapse
- #block_for_updates ⇒ Object
- #generate_thread_pool ⇒ Object
-
#initialize(name:, scope:, group:, share:) ⇒ TriggerGroupManager
constructor
A new instance of TriggerGroupManager.
- #refresh ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #update_topics ⇒ Object
Constructor Details
#initialize(name:, scope:, group:, share:) ⇒ TriggerGroupManager
Returns a new instance of TriggerGroupManager.
452 453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 452 def initialize(name:, scope:, group:, share:) @name = name @scope = scope @group = group @share = share @worker_count = 3 @queue = Queue.new @read_topic = true @topics = [] @thread_pool = nil @cancel_thread = false end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
450 451 452 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450 def group @group end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
450 451 452 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450 def name @name end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
450 451 452 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450 def scope @scope end |
#share ⇒ Object (readonly)
Returns the value of attribute share.
450 451 452 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450 def share @share end |
#thread_pool ⇒ Object (readonly)
Returns the value of attribute thread_pool.
450 451 452 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450 def thread_pool @thread_pool end |
#topics ⇒ Object (readonly)
Returns the value of attribute topics.
450 451 452 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 450 def topics @topics end |
Instance Method Details
#block_for_updates ⇒ Object
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 507 def block_for_updates @read_topic = true while @read_topic begin Topic.read_topics(@topics) do |topic, _msg_id, msg_hash, _redis| Logger.debug "TriggerGroupManager block_for_updates: #{topic} #{msg_hash.to_s}" if topic != @share.trigger_base.autonomic_topic packet = JSON.parse(msg_hash['json_data']) @share.packet_base.add(topic: topic, packet: packet) end @queue << "#{topic}" end rescue StandardError => e Logger.error "TriggerGroupManager failed to read topics #{@topics}\n#{e.formatted}" end end end |
#generate_thread_pool ⇒ Object
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 465 def generate_thread_pool() thread_pool = [] @worker_count.times do | i | worker = TriggerGroupWorker.new( name: @name, scope: @scope, group: @group, queue: @queue, share: @share, ident: i, ) thread_pool << Thread.new { worker.run } end return thread_pool end |
#refresh ⇒ Object
525 526 527 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 525 def refresh @read_topic = false end |
#run ⇒ Object
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 481 def run Logger.info "TriggerGroupManager running" @thread_pool = generate_thread_pool() loop do begin update_topics() rescue StandardError => e Logger.error "TriggerGroupManager failed to update topics.\n#{e.formatted}" end break if @cancel_thread block_for_updates() break if @cancel_thread end Logger.info "TriggerGroupManager exiting" end |
#shutdown ⇒ Object
529 530 531 532 533 534 535 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 529 def shutdown @read_topic = false @cancel_thread = true @worker_count.times do | i | @queue << nil end end |
#update_topics ⇒ Object
498 499 500 501 502 503 504 505 |
# File 'lib/cosmos/microservices/trigger_group_microservice.rb', line 498 def update_topics past_topics = @topics @topics = @share.trigger_base.topics() Logger.debug "TriggerGroupManager past_topics: #{past_topics} topics: #{@topics}" (past_topics - @topics).each do | removed_topic | @share.packet_base.remove(topic: removed_topic) end end |