Class: Cosmos::ReactionMicroservice

Inherits:
Microservice show all
Defined in:
lib/cosmos/microservices/reaction_microservice.rb

Overview

The reaction microservice starts a manager then gets the reactions and triggers from redis. It then monitors the AutonomicTopic for changes.

Constant Summary collapse

ACTION_METRIC_NAME =
'reactions_duration_seconds'.freeze

Instance Attribute Summary collapse

Attributes inherited from Microservice

#count, #custom, #error, #microservice_status_thread, #state

Instance Method Summary collapse

Methods inherited from Microservice

#as_json, run

Constructor Details

#initialize(*args) ⇒ ReactionMicroservice

Returns a new instance of ReactionMicroservice.



433
434
435
436
437
438
439
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 433

def initialize(*args)
  super(*args)
  @share = ReactionShare.new(scope: @scope)
  @manager = ReactionSnoozeManager.new(name: @name, scope: @scope, share: @share)
  @manager_thread = nil
  @read_topic = true
end

Instance Attribute Details

#managerObject (readonly)

Returns the value of attribute manager.



431
432
433
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 431

def manager
  @manager
end

#manager_threadObject (readonly)

Returns the value of attribute manager_thread.



431
432
433
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 431

def manager_thread
  @manager_thread
end

#nameObject (readonly)

Returns the value of attribute name.



431
432
433
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 431

def name
  @name
end

#scopeObject (readonly)

Returns the value of attribute scope.



431
432
433
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 431

def scope
  @scope
end

#shareObject (readonly)

Returns the value of attribute share.



431
432
433
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 431

def share
  @share
end

Instance Method Details

#block_for_updatesObject



486
487
488
489
490
491
492
493
494
495
496
497
498
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 486

def block_for_updates
  @read_topic = true
  while @read_topic
    begin
      AutonomicTopic.read_topics(@topics) do |_topic, _msg_id, msg_hash, _redis|
        Logger.debug "ReactionMicroservice block_for_updates: #{msg_hash.to_s}"
        public_send(topic_lookup_functions[msg_hash['type']][msg_hash['kind']], msg_hash)
      end
    rescue StandardError => e
      Logger.error "ReactionMicroservice failed to read topics #{@topics}\n#{e.formatted}"
    end
  end
end

#no_op(data) ⇒ Object



500
501
502
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 500

def no_op(data)
  Logger.debug "ReactionMicroservice web socket event: #{data}"
end

#reaction_created_event(msg_hash) ⇒ Object

Add the reaction to the shared data.



516
517
518
519
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 516

def reaction_created_event(msg_hash)
  Logger.debug "ReactionMicroservice reaction created msg_hash: #{msg_hash}"
  @share.reaction_base.add(reaction: JSON.parse(msg_hash['data']))
end

#reaction_deleted_event(msg_hash) ⇒ Object

Remove the reaction from the shared data



528
529
530
531
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 528

def reaction_deleted_event(msg_hash)
  Logger.debug "ReactionMicroservice reaction deleted msg_hash: #{msg_hash}"
  @share.reaction_base.remove(reaction: JSON.parse(msg_hash['data']))
end

#reaction_updated_event(msg_hash) ⇒ Object

Update the reaction to the shared data.



522
523
524
525
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 522

def reaction_updated_event(msg_hash)
  Logger.debug "ReactionMicroservice reaction updated msg_hash: #{msg_hash}"
  @share.reaction_base.update(reaction: JSON.parse(msg_hash['data']))
end

#refresh_event(data) ⇒ Object



504
505
506
507
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 504

def refresh_event(data)
  Logger.debug "ReactionMicroservice web socket schedule refresh: #{data}"
  @read_topic = false
end

#runObject



441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 441

def run
  Logger.info "ReactionMicroservice running"
  @manager_thread = Thread.new { @manager.run }
  loop do
    start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    reactions = ReactionModel.all(scope: @scope)
    @share.reaction_base.setup(reactions: reactions)
    diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
    @metric.add_sample(name: ACTION_METRIC_NAME, value: diff, labels: { 'thread' => 'microservice' })
    break if @cancel_thread

    block_for_updates()
    break if @cancel_thread
  end
  Logger.info "ReactionMicroservice exiting"
end

#shutdownObject



533
534
535
536
537
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 533

def shutdown
  @read_topic = false
  @manager.shutdown()
  super
end

#topic_lookup_functionsObject



458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 458

def topic_lookup_functions
  return {
    'group' => {
      'created' => :no_op,
      'updated' => :no_op,
      'deleted' => :no_op,
    },
    'trigger' => {
      'created' => :no_op,
      'updated' => :no_op,
      'deleted' => :no_op,
      'enabled' => :trigger_enabled_event,
      'disabled' => :no_op,
      'activated' => :no_op,
      'deactivated' => :no_op,
    },
    'reaction' => {
      'created' => :reaction_created_event,
      'updated' => :refresh_event,
      'deleted' => :reaction_deleted_event,
      'sleep' => :no_op,
      'awaken' => :no_op,
      'activated' => :reaction_updated_event,
      'deactivated' => :reaction_updated_event,
    }
  }
end

#trigger_enabled_event(msg_hash) ⇒ Object



510
511
512
513
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 510

def trigger_enabled_event(msg_hash)
  Logger.debug "ReactionMicroservice trigger event msg_hash: #{msg_hash}"
  @share.queue_base.enqueue(kind: 'trigger', data: JSON.parse(msg_hash['data']))
end