Class: Cosmos::ReactionSnoozeManager

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmos/microservices/reaction_microservice.rb

Overview

The reaction snooze manager starts a thread pool and keeps track of when a reaction is activated and to evalute triggers when the snooze is complete.

Constant Summary collapse

SNOOZE_METRIC_NAME =
'snooze_manager_duration_seconds'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, scope:, share:) ⇒ ReactionSnoozeManager

Returns a new instance of ReactionSnoozeManager.



345
346
347
348
349
350
351
352
353
354
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 345

def initialize(name:, scope:, share:)
  @name = name
  @scope = scope
  @share = share
  @worker_count = 3
  @thread_pool = nil
  @cancel_thread = false
  @metric = Metric.new(microservice: @name, scope: @scope)
  @metric_output_time = 0
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



343
344
345
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 343

def name
  @name
end

#scopeObject (readonly)

Returns the value of attribute scope.



343
344
345
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 343

def scope
  @scope
end

#shareObject (readonly)

Returns the value of attribute share.



343
344
345
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 343

def share
  @share
end

#thread_poolObject (readonly)

Returns the value of attribute thread_pool.



343
344
345
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 343

def thread_pool
  @thread_pool
end

Instance Method Details

#active_triggers(reaction:) ⇒ Object



390
391
392
393
394
395
396
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 390

def active_triggers(reaction:)
  reaction.triggers.each do | trigger |
    t = TriggerModel.get(name: trigger['name'], group: trigger['group'], scope: @scope)
    return true if t && t.state
  end
  return false
end

#generate_thread_poolObject



356
357
358
359
360
361
362
363
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 356

def generate_thread_pool()
  thread_pool = []
  @worker_count.times do | i |
    worker = ReactionWorker.new(name: @name, scope: @scope, share: @share, ident: i)
    thread_pool << Thread.new { worker.run }
  end
  return thread_pool
end

#manage_snoozed_reactions(current_time:) ⇒ Object



398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 398

def manage_snoozed_reactions(current_time:)
  @share.reaction_base.get_snoozed.each do | reaction |
    time_difference = reaction.snoozed_until - current_time
    if time_difference <= 0 && @share.snooze_base.not_queued?(reaction: reaction)
      Logger.info "#{reaction.name} current: #{current_time}, vs #{reaction.snoozed_until}, #{time_difference}"
      unless reaction.review
        Logger.debug "#{reaction.name} review set to false, setting snoozed_until back to nil"
        @share.reaction_base.wake(name: reaction.name)
        next
      end
      if active_triggers(reaction: reaction)
        @share.queue_base.enqueue(kind: 'reaction', data: reaction.as_json)
      else
        @share.reaction_base.wake(name: reaction.name)
      end
    end
  end
end

#runObject



365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 365

def run
  Logger.info "ReactionSnoozeManager running"
  @thread_pool = generate_thread_pool()
  loop do
    begin
      current_time = Time.now.to_i
      start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      manage_snoozed_reactions(current_time: current_time)
      diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
      metric_labels = { 'type' => 'snooze', 'thread' => "manager" }
      @metric.add_sample(name: SNOOZE_METRIC_NAME, value: diff, labels: metric_labels)
      if @metric_output_time < current_time
        @metric.output
        @metric_output_time = current_time + 120
      end
    rescue StandardError => e
      Logger.error "ReactionSnoozeManager failed to snooze reactions.\n#{e.formatted}"
    end
    break if @cancel_thread
    sleep(1)
    break if @cancel_thread
  end
  Logger.info "ReactionSnoozeManager exiting"
end

#shutdownObject



417
418
419
420
421
422
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 417

def shutdown
  @cancel_thread = true
  @worker_count.times do | i |
    @share.queue_base.enqueue(kind: nil, data: nil)
  end
end