Class: Cosmos::ReactionMicroservice
- Inherits:
-
Microservice
- Object
- Microservice
- Cosmos::ReactionMicroservice
- Defined in:
- lib/cosmos/microservices/reaction_microservice.rb
Overview
The reaction microservice starts a manager then gets the reactions and triggers from redis. It then monitors the AutonomicTopic for changes.
Constant Summary collapse
- ACTION_METRIC_NAME =
'reactions_duration_seconds'.freeze
Instance Attribute Summary collapse
-
#manager ⇒ Object
readonly
Returns the value of attribute manager.
-
#manager_thread ⇒ Object
readonly
Returns the value of attribute manager_thread.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#share ⇒ Object
readonly
Returns the value of attribute share.
Attributes inherited from Microservice
#count, #custom, #error, #microservice_status_thread, #state
Instance Method Summary collapse
- #block_for_updates ⇒ Object
-
#initialize(*args) ⇒ ReactionMicroservice
constructor
A new instance of ReactionMicroservice.
- #no_op(data) ⇒ Object
-
#reaction_created_event(msg_hash) ⇒ Object
Add the reaction to the shared data.
-
#reaction_deleted_event(msg_hash) ⇒ Object
Remove the reaction from the shared data.
-
#reaction_updated_event(msg_hash) ⇒ Object
Update the reaction to the shared data.
- #refresh_event(data) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #topic_lookup_functions ⇒ Object
- #trigger_enabled_event(msg_hash) ⇒ Object
Methods inherited from Microservice
Constructor Details
#initialize(*args) ⇒ ReactionMicroservice
Returns a new instance of ReactionMicroservice.
433 434 435 436 437 438 439 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 433 def initialize(*args) super(*args) @share = ReactionShare.new(scope: @scope) @manager = ReactionSnoozeManager.new(name: @name, scope: @scope, share: @share) @manager_thread = nil @read_topic = true end |
Instance Attribute Details
#manager ⇒ Object (readonly)
Returns the value of attribute manager.
431 432 433 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 431 def manager @manager end |
#manager_thread ⇒ Object (readonly)
Returns the value of attribute manager_thread.
431 432 433 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 431 def manager_thread @manager_thread end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
431 432 433 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 431 def name @name end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
431 432 433 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 431 def scope @scope end |
#share ⇒ Object (readonly)
Returns the value of attribute share.
431 432 433 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 431 def share @share end |
Instance Method Details
#block_for_updates ⇒ Object
486 487 488 489 490 491 492 493 494 495 496 497 498 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 486 def block_for_updates @read_topic = true while @read_topic begin AutonomicTopic.read_topics(@topics) do |_topic, _msg_id, msg_hash, _redis| Logger.debug "ReactionMicroservice block_for_updates: #{msg_hash.to_s}" public_send(topic_lookup_functions[msg_hash['type']][msg_hash['kind']], msg_hash) end rescue StandardError => e Logger.error "ReactionMicroservice failed to read topics #{@topics}\n#{e.formatted}" end end end |
#no_op(data) ⇒ Object
500 501 502 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 500 def no_op(data) Logger.debug "ReactionMicroservice web socket event: #{data}" end |
#reaction_created_event(msg_hash) ⇒ Object
Add the reaction to the shared data.
516 517 518 519 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 516 def reaction_created_event(msg_hash) Logger.debug "ReactionMicroservice reaction created msg_hash: #{msg_hash}" @share.reaction_base.add(reaction: JSON.parse(msg_hash['data'])) end |
#reaction_deleted_event(msg_hash) ⇒ Object
Remove the reaction from the shared data
528 529 530 531 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 528 def reaction_deleted_event(msg_hash) Logger.debug "ReactionMicroservice reaction deleted msg_hash: #{msg_hash}" @share.reaction_base.remove(reaction: JSON.parse(msg_hash['data'])) end |
#reaction_updated_event(msg_hash) ⇒ Object
Update the reaction to the shared data.
522 523 524 525 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 522 def reaction_updated_event(msg_hash) Logger.debug "ReactionMicroservice reaction updated msg_hash: #{msg_hash}" @share.reaction_base.update(reaction: JSON.parse(msg_hash['data'])) end |
#refresh_event(data) ⇒ Object
504 505 506 507 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 504 def refresh_event(data) Logger.debug "ReactionMicroservice web socket schedule refresh: #{data}" @read_topic = false end |
#run ⇒ Object
441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 441 def run Logger.info "ReactionMicroservice running" @manager_thread = Thread.new { @manager.run } loop do start = Process.clock_gettime(Process::CLOCK_MONOTONIC) reactions = ReactionModel.all(scope: @scope) @share.reaction_base.setup(reactions: reactions) diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float @metric.add_sample(name: ACTION_METRIC_NAME, value: diff, labels: { 'thread' => 'microservice' }) break if @cancel_thread block_for_updates() break if @cancel_thread end Logger.info "ReactionMicroservice exiting" end |
#shutdown ⇒ Object
533 534 535 536 537 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 533 def shutdown @read_topic = false @manager.shutdown() super end |
#topic_lookup_functions ⇒ Object
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 458 def topic_lookup_functions return { 'group' => { 'created' => :no_op, 'updated' => :no_op, 'deleted' => :no_op, }, 'trigger' => { 'created' => :no_op, 'updated' => :no_op, 'deleted' => :no_op, 'enabled' => :trigger_enabled_event, 'disabled' => :no_op, 'activated' => :no_op, 'deactivated' => :no_op, }, 'reaction' => { 'created' => :reaction_created_event, 'updated' => :refresh_event, 'deleted' => :reaction_deleted_event, 'sleep' => :no_op, 'awaken' => :no_op, 'activated' => :reaction_updated_event, 'deactivated' => :reaction_updated_event, } } end |
#trigger_enabled_event(msg_hash) ⇒ Object
510 511 512 513 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 510 def trigger_enabled_event(msg_hash) Logger.debug "ReactionMicroservice trigger event msg_hash: #{msg_hash}" @share.queue_base.enqueue(kind: 'trigger', data: JSON.parse(msg_hash['data'])) end |