Class: Cosmos::ReactionWorker
- Defined in:
- lib/cosmos/microservices/reaction_microservice.rb
Overview
The Reaction worker is a very simple thread pool worker. Once the manager queues a trigger to evaluate against the reactions. The worker will check the reactions to see if it needs to fire any reactions.
Constant Summary collapse
- REACTION_METRIC_NAME =
'reaction_duration_seconds'.freeze
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#share ⇒ Object
readonly
Returns the value of attribute share.
Instance Method Summary collapse
-
#generate_auth ⇒ Object
generate the auth object.
-
#initialize(name:, scope:, share:, ident:) ⇒ ReactionWorker
constructor
A new instance of ReactionWorker.
- #process_enabled_trigger(data:) ⇒ Object
- #reaction(data:) ⇒ Object
- #run ⇒ Object
- #run_action(reaction:, action:) ⇒ Object
- #run_command(reaction:, action:) ⇒ Object
- #run_reaction(reaction:) ⇒ Object
- #run_script(reaction:, action:) ⇒ Object
Constructor Details
#initialize(name:, scope:, share:, ident:) ⇒ ReactionWorker
Returns a new instance of ReactionWorker.
226 227 228 229 230 231 232 233 234 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 226 def initialize(name:, scope:, share:, ident:) @name = name @scope = scope @share = share @ident = ident @metric_output_time = 0 @metric = Metric.new(microservice: @name, scope: @scope) @authentication = generate_auth() end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
224 225 226 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 224 def name @name end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
224 225 226 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 224 def scope @scope end |
#share ⇒ Object (readonly)
Returns the value of attribute share.
224 225 226 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 224 def share @share end |
Instance Method Details
#generate_auth ⇒ Object
generate the auth object
237 238 239 240 241 242 243 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 237 def generate_auth if ENV['COSMOS_API_USER'].nil? || ENV['COSMOS_API_CLIENT'].nil? return CosmosAuthentication.new() else return CosmosKeycloakAuthentication.new(ENV['COSMOS_KEYCLOAK_URL']) end end |
#process_enabled_trigger(data:) ⇒ Object
273 274 275 276 277 278 279 280 281 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 273 def process_enabled_trigger(data:) start = Process.clock_gettime(Process::CLOCK_MONOTONIC) @share.reaction_base.get_reactions(trigger_name: data['name']).each do | reaction | run_reaction(reaction: reaction) end diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float metric_labels = { 'type' => 'trigger', 'thread' => "worker-#{@ident}" } @metric.add_sample(name: REACTION_METRIC_NAME, value: diff, labels: metric_labels) end |
#reaction(data:) ⇒ Object
245 246 247 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 245 def reaction(data:) return ReactionModel.from_json(data, name: data['name'], scope: data['scope']) end |
#run ⇒ Object
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 249 def run Logger.info "ReactionWorker-#{@ident} running" loop do begin kind, data = @share.queue_base.queue.pop break if kind.nil? || data.nil? case kind when 'reaction' run_reaction(reaction: reaction(data: data)) when 'trigger' process_enabled_trigger(data: data) end current_time = Time.now.to_i if @metric_output_time < current_time @metric.output @metric_output_time = current_time + 120 end rescue StandardError => e Logger.error "ReactionWorker-#{@ident} failed to evaluate kind: #{kind} data: #{data}\n#{e.formatted}" end end Logger.info "ReactionWorker-#{@ident} exiting" end |
#run_action(reaction:, action:) ⇒ Object
294 295 296 297 298 299 300 301 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 294 def run_action(reaction:, action:) case action['type'] when 'command' run_command(reaction: reaction, action: action) when 'script' run_script(reaction: reaction, action: action) end end |
#run_command(reaction:, action:) ⇒ Object
303 304 305 306 307 308 309 310 311 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 303 def run_command(reaction:, action:) Logger.debug "ReactionWorker-#{@ident} running reaction #{reaction.name}, command: '#{action['value']}' " begin cmd_no_hazardous_check(action['value'], scope: @scope) Logger.info "ReactionWorker-#{@ident} #{reaction.name} command action complete, #{action['value']}" rescue StandardError => e Logger.error "ReactionWorker-#{@ident} #{reaction.name} command action failed, #{action}\n#{e.}" end end |
#run_reaction(reaction:) ⇒ Object
283 284 285 286 287 288 289 290 291 292 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 283 def run_reaction(reaction:) start = Process.clock_gettime(Process::CLOCK_MONOTONIC) reaction.actions.each do |action| run_action(reaction: reaction, action: action) end @share.reaction_base.sleep(name: reaction.name) diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float metric_labels = { 'type' => 'reaction', 'thread' => "worker-#{@ident}" } @metric.add_sample(name: REACTION_METRIC_NAME, value: diff, labels: metric_labels) end |
#run_script(reaction:, action:) ⇒ Object
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/cosmos/microservices/reaction_microservice.rb', line 313 def run_script(reaction:, action:) Logger.debug "ReactionWorker-#{@ident} running reaction #{reaction.name}, script: '#{action['value']}'" begin request = Net::HTTP::Post.new( "/script-api/scripts/#{action['value']}/run?scope=#{@scope}", 'Content-Type' => 'application/json', 'Authorization' => @authentication.token() ) request.body = JSON.generate({ 'scope' => @scope, 'environment' => action['environment'], 'reaction' => reaction.name, 'id' => Time.now.to_i }) hostname = ENV['COSMOS_SCRIPT_HOSTNAME'] || 'cosmos-script-runner-api' response = Net::HTTP.new(hostname, 2902).request(request) raise "failed to call #{hostname}, for script: #{action['value']}, response code: #{response.code}" if response.code != '200' Logger.info "ReactionWorker-#{@ident} #{reaction.name} script action complete, #{action['value']} => #{response.body}" rescue StandardError => e Logger.error "ReactionWorker-#{@ident} #{reaction.name} script action failed, #{action}\n#{e.}" end end |