Class: Cosmos::LogWriter

Inherits:
Object show all
Defined in:
lib/cosmos/logs/log_writer.rb

Overview

Creates a log. Can automatically cycle the log based on an elasped time period or when the log file reaches a predefined size.

Direct Known Subclasses

PacketLogWriter, TextLogWriter

Constant Summary collapse

CYCLE_TIME_INTERVAL =

The cycle time interval. Cycle times are only checked at this level of granularity.

2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(remote_log_directory, logging_enabled = true, cycle_time = nil, cycle_size = 1000000000, cycle_hour = nil, cycle_minute = nil, redis_topic: nil) ⇒ LogWriter

Returns a new instance of LogWriter.

Parameters:

  • remote_log_directory (String)

    The s3 path to store the log files

  • logging_enabled (Boolean) (defaults to: true)

    Whether to start with logging enabled

  • cycle_time (Integer) (defaults to: nil)

    The amount of time in seconds before creating a new log file. This can be combined with cycle_size but is better used independently.

  • cycle_size (Integer) (defaults to: 1000000000)

    The size in bytes before creating a new log file. This can be combined with cycle_time but is better used independently.

  • cycle_hour (Integer) (defaults to: nil)

    The time at which to cycle the log. Combined with cycle_minute to cycle the log daily at the specified time. If nil, the log will be cycled hourly at the specified cycle_minute.

  • cycle_minute (Integer) (defaults to: nil)

    The time at which to cycle the log. See cycle_hour for more information.

  • redis_topic (String) (defaults to: nil)

    The key of the Redis stream to trim when files are moved to S3



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/cosmos/logs/log_writer.rb', line 54

def initialize(
  remote_log_directory,
  logging_enabled = true,
  cycle_time = nil,
  cycle_size = 1000000000,
  cycle_hour = nil,
  cycle_minute = nil,
  redis_topic: nil
)
  @remote_log_directory = remote_log_directory
  @logging_enabled = ConfigParser.handle_true_false(logging_enabled)
  @cycle_time = ConfigParser.handle_nil(cycle_time)
  if @cycle_time
    @cycle_time = Integer(@cycle_time)
    raise "cycle_time must be >= #{CYCLE_TIME_INTERVAL}" if @cycle_time < CYCLE_TIME_INTERVAL
  end
  @cycle_size = ConfigParser.handle_nil(cycle_size)
  @cycle_size = Integer(@cycle_size) if @cycle_size
  @cycle_hour = ConfigParser.handle_nil(cycle_hour)
  @cycle_hour = Integer(@cycle_hour) if @cycle_hour
  @cycle_minute = ConfigParser.handle_nil(cycle_minute)
  @cycle_minute = Integer(@cycle_minute) if @cycle_minute
  @mutex = Mutex.new
  @file = nil
  @file_size = 0
  @filename = nil
  @start_time = Time.now.utc
  @first_time = nil
  @last_time = nil
  @cancel_threads = false
  @last_offset = nil
  @previous_file_redis_offset = nil
  @redis_topic = redis_topic

  # This is an optimization to avoid creating a new entry object
  # each time we create an entry which we do a LOT!
  @entry = String.new

  @cycle_thread = nil
  if @cycle_time or @cycle_hour or @cycle_minute
    @cycle_sleeper = Sleeper.new
    @cycle_thread = Cosmos.safe_thread("Log cycle") do
      cycle_thread_body()
    end
  end
end

Instance Attribute Details

#filenameString (readonly)

Returns The filename of the packet log.

Returns:

  • (String)

    The filename of the packet log



30
31
32
# File 'lib/cosmos/logs/log_writer.rb', line 30

def filename
  @filename
end

#logging_enabledtrue/false (readonly)

Returns Whether logging is enabled.

Returns:

  • (true/false)

    Whether logging is enabled



33
34
35
# File 'lib/cosmos/logs/log_writer.rb', line 33

def logging_enabled
  @logging_enabled
end

Instance Method Details

#close_file(take_mutex = true) ⇒ Object

Closing a log file isn’t critical so we just log an error. NOTE: This also trims the Redis stream to keep a full file’s worth of data in the stream. This is what prevents continuous stream growth.



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/cosmos/logs/log_writer.rb', line 202

def close_file(take_mutex = true)
  @mutex.lock if take_mutex
  begin
    if @file
      begin
        @file.close unless @file.closed?
        Logger.debug "Log File Closed : #{@filename}"
        date = first_timestamp[0..7] # YYYYMMDD
        s3_key = File.join(@remote_log_directory, date, s3_filename)
        S3Utilities.move_log_file_to_s3(@filename, s3_key)
        # Now that the file is in S3, trim the Redis stream up until the previous file.
        # This keeps one file worth of data in Redis as a safety buffer
        Cosmos::Store.trim_topic(@redis_topic, @previous_file_redis_offset) if @redis_topic and @previous_file_redis_offset
        @previous_file_redis_offset = @last_offset
      rescue Exception => err
        Logger.instance.error "Error closing #{@filename} : #{err.formatted}"
      end

      @file = nil
      @file_size = 0
      @filename = nil
    end
  ensure
    @mutex.unlock if take_mutex
  end
end

#create_unique_filename(ext = extension) ⇒ Object

implementation details



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/cosmos/logs/log_writer.rb', line 129

def create_unique_filename(ext = extension)
  # Create a filename that doesn't exist
  attempt = nil
  while true
    filename_parts = [attempt]
    filename_parts.unshift @label if @label
    filename = File.join(Dir.tmpdir, File.build_timestamped_filename([@label, attempt], ext))
    if File.exist?(filename)
      attempt ||= 0
      attempt += 1
    else
      return filename
    end
  end
end

#cycle_thread_bodyObject



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/cosmos/logs/log_writer.rb', line 145

def cycle_thread_body
  while true
    # The check against start_time needs to be mutex protected to prevent a packet coming in between the check
    # and closing the file
    @mutex.synchronize do
      utc_now = Time.now.utc
      # Logger.debug("start:#{@start_time.to_f} now:#{utc_now.to_f} cycle:#{@cycle_time} new:#{(utc_now - @start_time) > @cycle_time}")
      if @logging_enabled and
         (
           # Cycle based on total time logging
           (@cycle_time and (utc_now - @start_time) > @cycle_time) or

           # Cycle daily at a specific time
           (@cycle_hour and @cycle_minute and utc_now.hour == @cycle_hour and utc_now.min == @cycle_minute and @start_time.yday != utc_now.yday) or

           # Cycle hourly at a specific time
           (@cycle_minute and not @cycle_hour and utc_now.min == @cycle_minute and @start_time.hour != utc_now.hour)
         )
        close_file(false)
      end
    end
    # Only check whether to cycle at a set interval
    break if @cycle_sleeper.sleep(CYCLE_TIME_INTERVAL)
  end
end

#extensionObject



233
234
235
# File 'lib/cosmos/logs/log_writer.rb', line 233

def extension
  '.log'.freeze
end

#first_timeObject



237
238
239
# File 'lib/cosmos/logs/log_writer.rb', line 237

def first_time
  Time.from_nsec_from_epoch(@first_time)
end

#first_timestampObject



245
246
247
# File 'lib/cosmos/logs/log_writer.rb', line 245

def first_timestamp
  first_time().to_timestamp # "YYYYMMDDHHmmSSNNNNNNNNN"
end

#graceful_killObject



123
124
125
# File 'lib/cosmos/logs/log_writer.rb', line 123

def graceful_kill
  @cancel_threads = true
end

#last_timeObject



241
242
243
# File 'lib/cosmos/logs/log_writer.rb', line 241

def last_time
  Time.from_nsec_from_epoch(@last_time)
end

#last_timestampObject



249
250
251
# File 'lib/cosmos/logs/log_writer.rb', line 249

def last_timestamp
  last_time().to_timestamp # "YYYYMMDDHHmmSSNNNNNNNNN"
end

#prepare_write(time_nsec_since_epoch, data_length, redis_offset) ⇒ Object



192
193
194
195
196
197
198
# File 'lib/cosmos/logs/log_writer.rb', line 192

def prepare_write(time_nsec_since_epoch, data_length, redis_offset)
  # This check includes logging_enabled again because it might have changed since we acquired the mutex
  if @logging_enabled and (!@file or (@cycle_size and (@file_size + data_length) > @cycle_size))
    start_new_file()
  end
  @last_offset = redis_offset # This is needed for the redis offset marker entry at the end of the log file
end

#s3_filenameObject



229
230
231
# File 'lib/cosmos/logs/log_writer.rb', line 229

def s3_filename
  "#{first_timestamp}__#{last_timestamp}" + extension
end

#shutdownObject

Stop all logging, close the current log file, and kill the logging threads.



114
115
116
117
118
119
120
121
# File 'lib/cosmos/logs/log_writer.rb', line 114

def shutdown
  stop()
  if @cycle_thread
    @cycle_sleeper.cancel
    Cosmos.kill_thread(self, @cycle_thread)
    @cycle_thread = nil
  end
end

#startObject

Starts a new log file by closing the existing log file. New log files are not created until packets are written by #write so this does not immediately create a log file on the filesystem.



104
105
106
# File 'lib/cosmos/logs/log_writer.rb', line 104

def start
  @mutex.synchronize { close_file(false); @logging_enabled = true }
end

#start_new_fileObject

Starting a new log file is a critical operation so the entire method is wrapped with a rescue and handled with handle_critical_exception Assumes mutex has already been taken



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/cosmos/logs/log_writer.rb', line 174

def start_new_file
  close_file(false)

  # Start log file
  @filename = create_unique_filename()
  @file = File.new(@filename, 'wb')
  @file_size = 0

  @start_time = Time.now.utc
  @first_time = nil
  @last_time = nil
  Logger.debug "Log File Opened : #{@filename}"
rescue => err
  Logger.error "Error starting new log file: #{err.formatted}"
  @logging_enabled = false
  Cosmos.handle_critical_exception(err)
end

#stopObject

Stops all logging and closes the current log file.



109
110
111
# File 'lib/cosmos/logs/log_writer.rb', line 109

def stop
  @mutex.synchronize { @logging_enabled = false; close_file(false) }
end