Class: Cosmos::PacketLogWriter

Inherits:
LogWriter show all
Includes:
PacketLogConstants
Defined in:
lib/cosmos/logs/packet_log_writer.rb

Overview

Creates a packet log. Can automatically cycle the log based on an elasped time period or when the log file reaches a predefined size.

Constant Summary

Constants included from PacketLogConstants

Cosmos::PacketLogConstants::COSMOS2_FILE_HEADER, Cosmos::PacketLogConstants::COSMOS4_FILE_HEADER, Cosmos::PacketLogConstants::COSMOS5_CMD_FLAG_MASK, Cosmos::PacketLogConstants::COSMOS5_ENTRY_TYPE_MASK, Cosmos::PacketLogConstants::COSMOS5_FILE_HEADER, Cosmos::PacketLogConstants::COSMOS5_HEADER_LENGTH, Cosmos::PacketLogConstants::COSMOS5_ID_FIXED_SIZE, Cosmos::PacketLogConstants::COSMOS5_ID_FLAG_MASK, Cosmos::PacketLogConstants::COSMOS5_INDEX_HEADER, Cosmos::PacketLogConstants::COSMOS5_JSON_PACKET_ENTRY_TYPE_MASK, Cosmos::PacketLogConstants::COSMOS5_MAX_PACKET_INDEX, Cosmos::PacketLogConstants::COSMOS5_MAX_TARGET_INDEX, Cosmos::PacketLogConstants::COSMOS5_OFFSET_MARKER_ENTRY_TYPE_MASK, Cosmos::PacketLogConstants::COSMOS5_OFFSET_MARKER_PACK_DIRECTIVE, Cosmos::PacketLogConstants::COSMOS5_OFFSET_MARKER_PACK_ITEMS, Cosmos::PacketLogConstants::COSMOS5_OFFSET_MARKER_SECONDARY_FIXED_SIZE, Cosmos::PacketLogConstants::COSMOS5_PACKET_DECLARATION_ENTRY_TYPE_MASK, Cosmos::PacketLogConstants::COSMOS5_PACKET_DECLARATION_PACK_DIRECTIVE, Cosmos::PacketLogConstants::COSMOS5_PACKET_DECLARATION_PACK_ITEMS, Cosmos::PacketLogConstants::COSMOS5_PACKET_DECLARATION_SECONDARY_FIXED_SIZE, Cosmos::PacketLogConstants::COSMOS5_PACKET_PACK_DIRECTIVE, Cosmos::PacketLogConstants::COSMOS5_PACKET_PACK_ITEMS, Cosmos::PacketLogConstants::COSMOS5_PACKET_SECONDARY_FIXED_SIZE, Cosmos::PacketLogConstants::COSMOS5_PRIMARY_FIXED_SIZE, Cosmos::PacketLogConstants::COSMOS5_RAW_PACKET_ENTRY_TYPE_MASK, Cosmos::PacketLogConstants::COSMOS5_STORED_FLAG_MASK, Cosmos::PacketLogConstants::COSMOS5_TARGET_DECLARATION_ENTRY_TYPE_MASK, Cosmos::PacketLogConstants::COSMOS5_TARGET_DECLARATION_PACK_DIRECTIVE, Cosmos::PacketLogConstants::COSMOS5_TARGET_DECLARATION_PACK_ITEMS, Cosmos::PacketLogConstants::COSMOS5_TARGET_DECLARATION_SECONDARY_FIXED_SIZE

Constants inherited from LogWriter

LogWriter::CYCLE_TIME_INTERVAL

Instance Attribute Summary

Attributes inherited from LogWriter

#filename, #logging_enabled

Instance Method Summary collapse

Methods inherited from LogWriter

#create_unique_filename, #cycle_thread_body, #first_time, #first_timestamp, #graceful_kill, #last_time, #last_timestamp, #prepare_write, #shutdown, #start, #stop

Constructor Details

#initialize(remote_log_directory, label, logging_enabled = true, cycle_time = nil, cycle_size = 1_000_000_000, cycle_hour = nil, cycle_minute = nil, redis_topic: nil) ⇒ PacketLogWriter

Returns a new instance of PacketLogWriter.

Parameters:

  • remote_log_directory (String)

    The s3 path to store the log files

  • label (String)

    Label to apply to the log filename

  • logging_enabled (Boolean) (defaults to: true)

    Whether to start with logging enabled

  • cycle_time (Integer) (defaults to: nil)

    The amount of time in seconds before creating a new log file. This can be combined with cycle_size but is better used independently.

  • cycle_size (Integer) (defaults to: 1_000_000_000)

    The size in bytes before creating a new log file. This can be combined with cycle_time but is better used independently.

  • cycle_hour (Integer) (defaults to: nil)

    The time at which to cycle the log. Combined with cycle_minute to cycle the log daily at the specified time. If nil, the log will be cycled hourly at the specified cycle_minute.

  • cycle_minute (Integer) (defaults to: nil)

    The time at which to cycle the log. See cycle_hour for more information.

  • redis_topic (String) (defaults to: nil)

    The key of the Redis stream to trim when files are moved to S3



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/cosmos/logs/packet_log_writer.rb', line 45

def initialize(
  remote_log_directory,
  label,
  logging_enabled = true,
  cycle_time = nil,
  cycle_size = 1_000_000_000,
  cycle_hour = nil,
  cycle_minute = nil,
  redis_topic: nil
)
  super(
    remote_log_directory,
    logging_enabled,
    cycle_time,
    cycle_size,
    cycle_hour,
    cycle_minute,
    redis_topic: redis_topic
  )
  @label = label
  @index_file = nil
  @index_filename = nil
  @cmd_packet_table = {}
  @tlm_packet_table = {}
  @target_dec_entries = []
  @packet_dec_entries = []
  @next_packet_index = 0
  @target_indexes = {}
  @next_target_index = 0

  # This is an optimization to avoid creating a new entry object
  # each time we create an entry which we do a LOT!
  @index_entry = String.new
end

Instance Method Details

#close_file(take_mutex = true) ⇒ Object

Closing a log file isn’t critical so we just log an error



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/cosmos/logs/packet_log_writer.rb', line 134

def close_file(take_mutex = true)
  write_entry(:OFFSET_MARKER, nil, nil, nil, nil, nil, nil, nil) if @file
  super

  @mutex.lock if take_mutex
  begin
    if @index_file
      begin
        write_index_file_footer()
        @index_file.close unless @index_file.closed?
        Logger.debug "Index Log File Closed : #{@index_filename}"
        date = first_timestamp[0..7] # YYYYMMDD
        s3_key = File.join(@remote_log_directory, date, "#{first_timestamp}__#{last_timestamp}__#{@label}.idx")
        S3Utilities.move_log_file_to_s3(@index_filename, s3_key)
      rescue Exception => err
        Logger.instance.error "Error closing #{@index_filename} : #{err.formatted}"
      end

      @index_file = nil
      @index_filename = nil
    end
  ensure
    @mutex.unlock if take_mutex
  end
end

#extensionObject



295
296
297
# File 'lib/cosmos/logs/packet_log_writer.rb', line 295

def extension
  '.bin'.freeze
end

#get_packet_index(cmd_or_tlm, target_name, packet_name) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/cosmos/logs/packet_log_writer.rb', line 160

def get_packet_index(cmd_or_tlm, target_name, packet_name)
  if cmd_or_tlm == :CMD
    target_table = @cmd_packet_table[target_name]
  else
    target_table = @tlm_packet_table[target_name]
  end
  if target_table
    packet_index = target_table[packet_name]
    return packet_index if packet_index
  else
    # New packet_table entry needed
    target_table = {}
    if cmd_or_tlm == :CMD
      @cmd_packet_table[target_name] = target_table
    else
      @tlm_packet_table[target_name] = target_table
    end
    id = nil
    target = System.targets[target_name]
    id = target.id if target
    write_entry(:TARGET_DECLARATION, cmd_or_tlm, target_name, packet_name, nil, nil, nil, id)
  end

  # New target_table entry needed
  packet_index = @next_packet_index
  if packet_index > COSMOS5_MAX_PACKET_INDEX
    raise "Packet Index Overflow"
  end

  target_table[packet_name] = packet_index
  @next_packet_index += 1

  id = nil
  begin
    if cmd_or_tlm == :CMD
      id = System.commands.packet(target_nam, packet_name).config_name
    else
      id = System.telemetry.packet(target_name, packet_name).config_name
    end
  rescue
    # No packet def
  end
  write_entry(:PACKET_DECLARATION, cmd_or_tlm, target_name, packet_name, nil, nil, nil, id)
  return packet_index
end

#s3_filenameObject



291
292
293
# File 'lib/cosmos/logs/packet_log_writer.rb', line 291

def s3_filename
  "#{first_timestamp}__#{last_timestamp}__#{@label}" + extension
end

#start_new_fileObject

Starting a new index file is a critical operation so the entire method is wrapped with a rescue and handled with handle_critical_exception Assumes mutex has already been taken



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/cosmos/logs/packet_log_writer.rb', line 110

def start_new_file
  super
  @file.write(COSMOS5_FILE_HEADER)
  @file_size += COSMOS5_FILE_HEADER.length

  # Start index log file
  @index_filename = create_unique_filename('.idx'.freeze)
  @index_file = File.new(@index_filename, 'wb')
  @index_file.write(COSMOS5_INDEX_HEADER)

  @cmd_packet_table = {}
  @tlm_packet_table = {}
  @next_packet_index = 0
  @target_indexes = {}
  @target_dec_entries = []
  @packet_dec_entries = []
  Logger.debug "Index Log File Opened : #{@index_filename}"
rescue => err
  Logger.error "Error starting new log file: #{err.formatted}"
  @logging_enabled = false
  Cosmos.handle_critical_exception(err)
end

#write(entry_type, cmd_or_tlm, target_name, packet_name, time_nsec_since_epoch, stored, data, id = nil, redis_offset = '0-0') ⇒ Object

Write a packet to the log file.

If no log file currently exists in the filesystem, a new file will be created.

Parameters:

  • entry_type (Symbol)

    Type of entry to write. Must be one of :TARGET_DECLARATION, :PACKET_DECLARATION, :RAW_PACKET, :JSON_PACKET

  • cmd_or_tlm (Symbol)

    One of :CMD or :TLM

  • target_name (String)

    Name of the target

  • packet_name (String)

    Name of the packet

  • time_nsec_since_epoch (Integer)

    64 bit integer nsecs since EPOCH

  • stored (Boolean)

    Whether this data is stored telemetry

  • data (String)

    Binary string of data

  • id (Integer) (defaults to: nil)

    Target ID

  • redis_offset (Integer) (defaults to: '0-0')

    The offset of this packet in its Redis stream



95
96
97
98
99
100
101
102
103
104
105
# File 'lib/cosmos/logs/packet_log_writer.rb', line 95

def write(entry_type, cmd_or_tlm, target_name, packet_name, time_nsec_since_epoch, stored, data, id = nil, redis_offset = '0-0')
  return if !@logging_enabled

  @mutex.synchronize do
    prepare_write(time_nsec_since_epoch, data.length, redis_offset)
    write_entry(entry_type, cmd_or_tlm, target_name, packet_name, time_nsec_since_epoch, stored, data, id) if @file
  end
rescue => err
  Logger.instance.error "Error writing #{@filename} : #{err.formatted}"
  Cosmos.handle_critical_exception(err)
end

#write_entry(entry_type, cmd_or_tlm, target_name, packet_name, time_nsec_since_epoch, stored, data, id) ⇒ Object

Raises:

  • (ArgumentError)


206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/cosmos/logs/packet_log_writer.rb', line 206

def write_entry(entry_type, cmd_or_tlm, target_name, packet_name, time_nsec_since_epoch, stored, data, id)
  raise ArgumentError.new("Length of id must be 64, got #{id.length}") if id and id.length != 64 # 64 hex digits, gets packed to 32 bytes with .pack('H*')

  length = COSMOS5_PRIMARY_FIXED_SIZE
  flags = 0
  flags |= COSMOS5_STORED_FLAG_MASK if stored
  flags |= COSMOS5_ID_FLAG_MASK if id
  case entry_type
  when :TARGET_DECLARATION
    target_index = @next_target_index
    @target_indexes[target_name] = target_index
    @next_target_index += 1
    if target_index > COSMOS5_MAX_TARGET_INDEX
      raise "Target Index Overflow"
    end

    flags |= COSMOS5_TARGET_DECLARATION_ENTRY_TYPE_MASK
    length += COSMOS5_TARGET_DECLARATION_SECONDARY_FIXED_SIZE + target_name.length
    length += COSMOS5_ID_FIXED_SIZE if id
    @entry.clear
    @entry << [length, flags].pack(COSMOS5_TARGET_DECLARATION_PACK_DIRECTIVE) << target_name
    @entry << [id].pack('H*') if id
    @target_dec_entries << @entry.dup
  when :PACKET_DECLARATION
    target_index = @target_indexes[target_name]
    flags |= COSMOS5_PACKET_DECLARATION_ENTRY_TYPE_MASK
    if cmd_or_tlm == :CMD
      flags |= COSMOS5_CMD_FLAG_MASK
    end
    length += COSMOS5_PACKET_DECLARATION_SECONDARY_FIXED_SIZE + packet_name.length
    length += COSMOS5_ID_FIXED_SIZE if id
    @entry.clear
    @entry << [length, flags, target_index].pack(COSMOS5_PACKET_DECLARATION_PACK_DIRECTIVE) << packet_name
    @entry << [id].pack('H*') if id
    @packet_dec_entries << @entry.dup
  when :OFFSET_MARKER
    flags |= COSMOS5_OFFSET_MARKER_ENTRY_TYPE_MASK
    length += COSMOS5_OFFSET_MARKER_SECONDARY_FIXED_SIZE + @last_offset.length
    @entry.clear
    @entry << [length, flags].pack(COSMOS5_OFFSET_MARKER_PACK_DIRECTIVE) << @last_offset
  when :RAW_PACKET, :JSON_PACKET
    target_name = 'UNKNOWN'.freeze unless target_name
    packet_name = 'UNKNOWN'.freeze unless packet_name
    packet_index = get_packet_index(cmd_or_tlm, target_name, packet_name)
    if entry_type == :RAW_PACKET
      flags |= COSMOS5_RAW_PACKET_ENTRY_TYPE_MASK
    else
      flags |= COSMOS5_JSON_PACKET_ENTRY_TYPE_MASK
    end
    if cmd_or_tlm == :CMD
      flags |= COSMOS5_CMD_FLAG_MASK
    end
    length += COSMOS5_PACKET_SECONDARY_FIXED_SIZE + data.length
    @entry.clear
    @index_entry.clear
    @index_entry << [length, flags, packet_index, time_nsec_since_epoch].pack(COSMOS5_PACKET_PACK_DIRECTIVE)
    @entry << @index_entry << data
    @index_entry << [@file_size].pack('Q>')
    @index_file.write(@index_entry)
    @first_time = time_nsec_since_epoch if !@first_time or time_nsec_since_epoch < @first_time
    @last_time = time_nsec_since_epoch if !@last_time or time_nsec_since_epoch > @last_time
  else
    raise "Unknown entry_type: #{entry_type}"
  end
  @file.write(@entry)
  @file_size += @entry.length
end


274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/cosmos/logs/packet_log_writer.rb', line 274

def write_index_file_footer
  footer_length = 4 # Includes length of length field at end
  @index_file.write([@target_dec_entries.length].pack('n'))
  footer_length += 2
  @target_dec_entries.each do |target_dec_entry|
    @index_file.write(target_dec_entry)
    footer_length += target_dec_entry.length
  end
  @index_file.write([@packet_dec_entries.length].pack('n'))
  footer_length += 2
  @packet_dec_entries.each do |packet_dec_entry|
    @index_file.write(packet_dec_entry)
    footer_length += packet_dec_entry.length
  end
  @index_file.write([footer_length].pack('N'))
end