Class: Cosmos::CommandTopic

Inherits:
Topic show all
Defined in:
lib/cosmos/topics/command_topic.rb

Constant Summary collapse

COMMAND_ACK_TIMEOUT_S =
5

Class Method Summary collapse

Methods inherited from Topic

clear_topics, initialize_streams, read_topics, topics

Class Method Details

.raise_hazardous_error(msg_hash, target_name, cmd_name, cmd_params) ⇒ Object

PRIVATE implementation details



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/cosmos/topics/command_topic.rb', line 68

def self.raise_hazardous_error(msg_hash, target_name, cmd_name, cmd_params)
  _, description, _ = msg_hash["result"].split("\n")
  # Create and populate a new HazardousError and raise it up
  # The _cmd method in script/commands.rb rescues this and calls prompt_for_hazardous
  error = HazardousError.new
  error.target_name = target_name
  error.cmd_name = cmd_name
  error.cmd_params = cmd_params
  error.hazardous_description = description
  # No Logger.info because the error is already logged by the Logger.info "Ack Received ...
  raise error
end

.send_command(command, scope:) ⇒ Object

Parameters:

  • command (Hash)

    Command hash structure read to be written to a topic



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/cosmos/topics/command_topic.rb', line 38

def self.send_command(command, scope:)
  ack_topic = "{#{scope}__ACKCMD}TARGET__#{command['target_name']}"
  Store.update_topic_offsets([ack_topic])
  # Save the existing cmd_params Hash and JSON generate before writing to the topic
  cmd_params = command['cmd_params']
  command['cmd_params'] = JSON.generate(command['cmd_params'].as_json)
  cmd_id = Store.write_topic("{#{scope}__CMD}TARGET__#{command['target_name']}", command, '*', 100)
  # TODO: This timeout is fine for most but can we get the write_timeout from the interface here?
  time = Time.now
  while (Time.now - time) < COMMAND_ACK_TIMEOUT_S
    Topic.read_topics([ack_topic]) do |topic, msg_id, msg_hash, redis|
      if msg_hash["id"] == cmd_id
        if msg_hash["result"] == "SUCCESS"
          return [command['target_name'], command['cmd_name'], cmd_params]
        # Check for HazardousError which is a special case
        elsif msg_hash["result"].include?("HazardousError")
          raise_hazardous_error(msg_hash, command['target_name'], command['cmd_name'], cmd_params)
        else
          raise msg_hash["result"]
        end
      end
    end
  end
  raise "Timeout waiting for cmd ack"
end

.write_packet(packet, scope:) ⇒ Object



26
27
28
29
30
31
32
33
34
35
# File 'lib/cosmos/topics/command_topic.rb', line 26

def self.write_packet(packet, scope:)
  topic = "#{scope}__COMMAND__{#{packet.target_name}}__#{packet.packet_name}"
  msg_hash = { time: packet.received_time.to_nsec_from_epoch,
               target_name: packet.target_name,
               packet_name: packet.packet_name,
               received_count: packet.received_count,
               stored: packet.stored,
               buffer: packet.buffer(false) }
  Store.write_topic(topic, msg_hash)
end