Class: Cosmos::TimelineMicroservice

Inherits:
Microservice show all
Defined in:
lib/cosmos/microservices/timeline_microservice.rb

Overview

The timeline microservice starts a manager then gets the activities from the sorted set in redis and updates the schedule for the manager. Timeline will then wait for an update on the timeline stream this will trigger an update again to the schedule.

Constant Summary collapse

TIMELINE_METRIC_NAME =
'timeline_activities_duration_seconds'.freeze

Instance Attribute Summary

Attributes inherited from Microservice

#count, #custom, #error, #microservice_status_thread, #name, #scope, #state

Instance Method Summary collapse

Methods inherited from Microservice

#as_json, run

Constructor Details

#initialize(name) ⇒ TimelineMicroservice

Returns a new instance of TimelineMicroservice.



266
267
268
269
270
271
272
273
# File 'lib/cosmos/microservices/timeline_microservice.rb', line 266

def initialize(name)
  super(name)
  @timeline_name = name.split('__')[2]
  @schedule = Schedule.new(@timeline_name)
  @manager = TimelineManager.new(name: @timeline_name, scope: scope, schedule: @schedule)
  @manager_thread = nil
  @read_topic = true
end

Instance Method Details

#block_for_updatesObject



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/cosmos/microservices/timeline_microservice.rb', line 310

def block_for_updates
  @read_topic = true
  while @read_topic
    begin
      TimelineTopic.read_topics(@topics) do |_topic, _msg_id, msg_hash, _redis|
        if msg_hash['timeline'] == @timeline_name
          data = JSON.parse(msg_hash['data'])
          public_send(topic_lookup_functions[msg_hash['type']][msg_hash['kind']], data)
        end
      end
    rescue StandardError => e
      Logger.error "#{@timeline_name} failed to read topics #{@topics}\n#{e.formatted}"
    end
  end
end

#create_activity_from_event(data) ⇒ Object

Add the activity to the schedule. We don’t need to hold the job in memory if it is longer than an hour away. A refresh task will update that.



337
338
339
340
341
342
343
# File 'lib/cosmos/microservices/timeline_microservice.rb', line 337

def create_activity_from_event(data)
  diff = data['start'] - Time.now.to_i
  return unless (2..3600).include? diff

  activity = ActivityModel.from_json(data, name: @timeline_name, scope: @scope)
  @schedule.add_activity(activity)
end

#remove_activity_from_event(data) ⇒ Object

Remove the activity from the schedule. We don’t need to remove the activity if it is longer than an hour away. It will be removed from the data.



347
348
349
350
351
352
353
# File 'lib/cosmos/microservices/timeline_microservice.rb', line 347

def remove_activity_from_event(data)
  diff = data['start'] - Time.now.to_i
  return unless (2..3600).include? diff

  activity = ActivityModel.from_json(data, name: @timeline_name, scope: @scope)
  @schedule.remove_activity(activity)
end

#runObject



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/cosmos/microservices/timeline_microservice.rb', line 275

def run
  Logger.info "#{@name} timeine running"
  @manager_thread = Thread.new { @manager.run }
  loop do
    start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    current_activities = ActivityModel.activities(name: @timeline_name, scope: @scope)
    @schedule.update(current_activities)
    diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
    metric_labels = { 'timeline' => @timeline_name, 'thread' => 'microservice' }
    @metric.add_sample(name: TIMELINE_METRIC_NAME, value: diff, labels: metric_labels)
    break if @cancel_thread

    block_for_updates()
    break if @cancel_thread
  end
  Logger.info "#{@name} timeine exitting"
end

#schedule_refresh(data) ⇒ Object



330
331
332
333
# File 'lib/cosmos/microservices/timeline_microservice.rb', line 330

def schedule_refresh(data)
  Logger.debug "#{@name} timeline web socket schedule refresh: #{data}"
  @read_topic = false
end

#shutdownObject



355
356
357
358
359
# File 'lib/cosmos/microservices/timeline_microservice.rb', line 355

def shutdown
  @read_topic = false
  @manager.shutdown
  super
end

#timeline_nop(data) ⇒ Object



326
327
328
# File 'lib/cosmos/microservices/timeline_microservice.rb', line 326

def timeline_nop(data)
  Logger.debug "#{@name} timeline web socket event: #{data}"
end

#topic_lookup_functionsObject



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/cosmos/microservices/timeline_microservice.rb', line 293

def topic_lookup_functions
  {
    'timeline' => {
      'created' => :timeline_nop,
      'refresh' => :schedule_refresh,
      'updated' => :timeline_nop,
      'deleted' => :timeline_nop
    },
    'activity' => {
      'event' => :timeline_nop,
      'created' => :create_activity_from_event,
      'updated' => :schedule_refresh,
      'deleted' => :remove_activity_from_event
    }
  }
end