Class: Cosmos::ReactionWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmos/microservices/reaction_microservice.rb

Overview

The Reaction worker is a very simple thread pool worker. Once the manager queues a trigger to evaluate against the reactions. The worker will check the reactions to see if it needs to fire any reactions.

Constant Summary collapse

REACTION_METRIC_NAME =
'reaction_duration_seconds'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, scope:, share:, ident:) ⇒ ReactionWorker

Returns a new instance of ReactionWorker.



226
227
228
229
230
231
232
233
234
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 226

def initialize(name:, scope:, share:, ident:)
  @name = name
  @scope = scope
  @share = share
  @ident = ident
  @metric_output_time = 0
  @metric = Metric.new(microservice: @name, scope: @scope)
  @authentication = generate_auth()
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



224
225
226
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 224

def name
  @name
end

#scopeObject (readonly)

Returns the value of attribute scope.



224
225
226
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 224

def scope
  @scope
end

#shareObject (readonly)

Returns the value of attribute share.



224
225
226
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 224

def share
  @share
end

Instance Method Details

#generate_authObject

generate the auth object



237
238
239
240
241
242
243
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 237

def generate_auth
  if ENV['COSMOS_API_USER'].nil? || ENV['COSMOS_API_CLIENT'].nil?
    return CosmosAuthentication.new()
  else
    return CosmosKeycloakAuthentication.new(ENV['COSMOS_KEYCLOAK_URL'])
  end
end

#process_enabled_trigger(data:) ⇒ Object



273
274
275
276
277
278
279
280
281
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 273

def process_enabled_trigger(data:)
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @share.reaction_base.get_reactions(trigger_name: data['name']).each do | reaction |
    run_reaction(reaction: reaction)
  end
  diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
  metric_labels = { 'type' => 'trigger', 'thread' => "worker-#{@ident}" }
  @metric.add_sample(name: REACTION_METRIC_NAME, value: diff, labels: metric_labels)
end

#reaction(data:) ⇒ Object



245
246
247
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 245

def reaction(data:)
  return ReactionModel.from_json(data, name: data['name'], scope: data['scope'])
end

#runObject



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 249

def run
  Logger.info "ReactionWorker-#{@ident} running"
  loop do
    begin
      kind, data = @share.queue_base.queue.pop
      break if kind.nil? || data.nil?
      case kind
      when 'reaction'
        run_reaction(reaction: reaction(data: data))
      when 'trigger'
        process_enabled_trigger(data: data)
      end
      current_time = Time.now.to_i
      if @metric_output_time < current_time
        @metric.output
        @metric_output_time = current_time + 120
      end
    rescue StandardError => e
      Logger.error "ReactionWorker-#{@ident} failed to evaluate kind: #{kind} data: #{data}\n#{e.formatted}"
    end
  end
  Logger.info "ReactionWorker-#{@ident} exiting"
end

#run_action(reaction:, action:) ⇒ Object



294
295
296
297
298
299
300
301
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 294

def run_action(reaction:, action:)
  case action['type']
  when 'command'
    run_command(reaction: reaction, action: action)
  when 'script'
    run_script(reaction: reaction, action: action)
  end
end

#run_command(reaction:, action:) ⇒ Object



303
304
305
306
307
308
309
310
311
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 303

def run_command(reaction:, action:)
  Logger.debug "ReactionWorker-#{@ident} running reaction #{reaction.name}, command: '#{action['value']}' "
  begin
    cmd_no_hazardous_check(action['value'], scope: @scope)
    Logger.info "ReactionWorker-#{@ident} #{reaction.name} command action complete, #{action['value']}"
  rescue StandardError => e
    Logger.error "ReactionWorker-#{@ident} #{reaction.name} command action failed, #{action}\n#{e.message}"
  end
end

#run_reaction(reaction:) ⇒ Object



283
284
285
286
287
288
289
290
291
292
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 283

def run_reaction(reaction:)
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  reaction.actions.each do |action|
    run_action(reaction: reaction, action: action)
  end
  @share.reaction_base.sleep(name: reaction.name)
  diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
  metric_labels = { 'type' => 'reaction', 'thread' => "worker-#{@ident}" }
  @metric.add_sample(name: REACTION_METRIC_NAME, value: diff, labels: metric_labels)
end

#run_script(reaction:, action:) ⇒ Object



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 313

def run_script(reaction:, action:)
  Logger.debug "ReactionWorker-#{@ident} running reaction #{reaction.name}, script: '#{action['value']}'"
  begin
    request = Net::HTTP::Post.new(
      "/script-api/scripts/#{action['value']}/run?scope=#{@scope}",
      'Content-Type' => 'application/json',
      'Authorization' => @authentication.token()
    )
    request.body = JSON.generate({
      'scope' => @scope,
      'environment' => action['environment'],
      'reaction' => reaction.name,
      'id' => Time.now.to_i
    })
    hostname = ENV['COSMOS_SCRIPT_HOSTNAME'] || 'cosmos-script-runner-api'
    response = Net::HTTP.new(hostname, 2902).request(request)
    raise "failed to call #{hostname}, for script: #{action['value']}, response code: #{response.code}" if response.code != '200'

    Logger.info "ReactionWorker-#{@ident} #{reaction.name} script action complete, #{action['value']} => #{response.body}"
  rescue StandardError => e
    Logger.error "ReactionWorker-#{@ident} #{reaction.name} script action failed, #{action}\n#{e.message}"
  end
end