Class: Cosmos::RouterTopic
- Defined in:
- lib/cosmos/topics/router_topic.rb
Class Method Summary collapse
- .connect_router(router_name, scope:) ⇒ Object
- .disconnect_router(router_name, scope:) ⇒ Object
- .receive_telemetry(router, scope:) ⇒ Object
- .route_command(packet, target_names, scope:) ⇒ Object
- .shutdown(router, scope:) ⇒ Object
- .start_raw_logging(router_name, scope:) ⇒ Object
- .stop_raw_logging(router_name, scope:) ⇒ Object
-
.topics(router, scope:) ⇒ Object
Generate a list of topics for this router.
Methods inherited from Topic
clear_topics, initialize_streams, read_topics
Class Method Details
.connect_router(router_name, scope:) ⇒ Object
63 64 65 |
# File 'lib/cosmos/topics/router_topic.rb', line 63 def self.connect_router(router_name, scope:) Store.write_topic("{#{scope}__CMD}ROUTER__#{router_name}", { 'connect' => true }, '*', 100) end |
.disconnect_router(router_name, scope:) ⇒ Object
67 68 69 |
# File 'lib/cosmos/topics/router_topic.rb', line 67 def self.disconnect_router(router_name, scope:) Store.write_topic("{#{scope}__CMD}ROUTER__#{router_name}", { 'disconnect' => true }, '*', 100) end |
.receive_telemetry(router, scope:) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/cosmos/topics/router_topic.rb', line 37 def self.receive_telemetry(router, scope:) while true Store.read_topics(RouterTopic.topics(router, scope: scope)) do |topic, msg_id, msg_hash, redis| result = yield topic, msg_hash if /CMD}ROUTER/.match?(topic) ack_topic = topic.split("__") ack_topic[1] = 'ACK' + ack_topic[1] ack_topic = ack_topic.join("__") Store.write_topic(ack_topic, { 'result' => result }, msg_id, 100) end end end end |
.route_command(packet, target_names, scope:) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/cosmos/topics/router_topic.rb', line 51 def self.route_command(packet, target_names, scope:) if packet.identified? topic = "{#{scope}__CMD}TARGET__#{packet.target_name}" Store.write_topic(topic, { 'target_name' => packet.target_name, 'cmd_name' => packet.packet_name, 'cmd_buffer' => packet.buffer(false) }, '*', 100) elsif target_names.length == 1 topic = "{#{scope}__CMD}TARGET__#{target_names[0]}" Store.write_topic(topic, { 'target_name' => packet.target_name, 'cmd_name' => 'UNKNOWN', 'cmd_buffer' => packet.buffer(false) }, '*', 100) else raise "No route for command: #{packet.target_name} #{packet.packet_name}" end end |
.shutdown(router, scope:) ⇒ Object
79 80 81 82 83 |
# File 'lib/cosmos/topics/router_topic.rb', line 79 def self.shutdown(router, scope:) Store.write_topic("{#{scope}__CMD}ROUTER__#{router.name}", { 'shutdown' => 'true' }, '*', 100) sleep 1 # Give some time for the interface to shutdown RouterTopic.clear_topics(RouterTopic.topics(router, scope: scope)) end |
.start_raw_logging(router_name, scope:) ⇒ Object
71 72 73 |
# File 'lib/cosmos/topics/router_topic.rb', line 71 def self.start_raw_logging(router_name, scope:) Store.write_topic("{#{scope}__CMD}ROUTER__#{router_name}", { 'log_raw' => 'true' }, '*', 100) end |
.stop_raw_logging(router_name, scope:) ⇒ Object
75 76 77 |
# File 'lib/cosmos/topics/router_topic.rb', line 75 def self.stop_raw_logging(router_name, scope:) Store.write_topic("{#{scope}__CMD}ROUTER__#{router_name}", { 'log_raw' => 'false' }, '*', 100) end |
.topics(router, scope:) ⇒ Object
Generate a list of topics for this router. This includes the router itself and all the targets which are assigned to this router.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/cosmos/topics/router_topic.rb', line 26 def self.topics(router, scope:) topics = [] topics << "{#{scope}__CMD}ROUTER__#{router.name}" router.target_names.each do |target_name| System.telemetry.packets(target_name).each do |packet_name, packet| topics << "#{scope}__TELEMETRY__{#{packet.target_name}}__#{packet.packet_name}" end end topics end |