Class: Sqreen::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/sqreen/runner.rb

Overview

Main running job class for the agent

Constant Summary collapse

HEARTBEAT_WARMUP =

During one hour

60 * 60
HEARTBEAT_MAX_DELAY =

Initail delay is 5 minutes

5 * 60

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration, framework, set_at_exit = true, session_class = Sqreen::Session) ⇒ Runner

we may want to do that in a thread in order to prevent delaying app startup set_at_exit do not place a global at_exit (used for testing)



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/sqreen/runner.rb', line 107

def initialize(configuration, framework, set_at_exit = true, session_class = Sqreen::Session)
  @logged_out_tried = false
  @configuration = configuration
  @framework = framework
  @heartbeat_delay = HEARTBEAT_MAX_DELAY
  @last_heartbeat_request = Time.now
  @next_command_results = {}
  @next_metrics = []
  @running = true

  @proxy_url = @configuration.get(:proxy_url)
  chosen_endpoints = determine_endpoints

  @token = @configuration.get(:token)
  @app_name = @configuration.get(:app_name)
  @url = chosen_endpoints.control.url
  @cert_store = chosen_endpoints.control.ca_store

  Sqreen.update_whitelisted_paths([])
  Sqreen.update_whitelisted_ips({})
  Sqreen.update_performance_budget(nil)
  raise(Sqreen::TokenNotFoundException, 'no token found') unless @token

  Sqreen::Kit::Configuration.logger = Sqreen.log
  Sqreen::Kit::Configuration.ingestion_url = chosen_endpoints.ingestion.url
  Sqreen::Kit::Configuration.certificate_store = chosen_endpoints.ingestion.ca_store
  Sqreen::Kit::Configuration.proxy_url = @proxy_url
  Sqreen::Kit::Configuration.default_source = "sqreen:agent:ruby:#{Sqreen::VERSION}"

  register_exit_cb if set_at_exit

  self.metrics_engine = MetricsStore.new

  needs_weave = proc do
    Gem::Specification.select { |s| s.name == 'scout_apm' && Gem::Requirement.new('>= 2.5.2').satisfied_by?(Gem::Version.new(s.version)) }.any?
  end

  if @configuration.get(:weave) || needs_weave.call
    @instrumenter = Sqreen::Weave::Legacy::Instrumentation.new(metrics_engine)
  else
    @instrumenter = Sqreen::Legacy::Instrumentation.new(metrics_engine)
  end

  Sqreen.log.debug "Using token #{@token}"
  response = create_session(session_class)
  post_endpoint_testing_msgs(chosen_endpoints)
  wanted_features = response.fetch('features', {})
  conf_initial_features = configuration.get(:initial_features)
  unless conf_initial_features.nil?
    begin
      conf_features = JSON.parse(conf_initial_features)
      raise 'Invalid Type' unless conf_features.is_a?(Hash)
      Sqreen.log.debug do
        "Override initial features with #{conf_features.inspect}"
      end
      wanted_features = wanted_features.merge(conf_features)
    rescue
      Sqreen.log.warn do
        "NOT using invalid initial features #{conf_initial_features}"
      end
    end
  end
  self.features = wanted_features

  @ecosystem_integration = EcosystemIntegration.new(framework, Sqreen.queue)
  framework.req_start_cb = @ecosystem_integration.method(:request_start)
  framework.req_end_cb = @ecosystem_integration.method(:request_end)

  # Ensure a deliverer is there unless features have set it first
  self.deliverer ||= Deliveries::Simple.new(session)
  context_infos = {}
  %w[rules pack_id].each do |p|
    context_infos[p] = response[p] unless response[p].nil?
  end
  process_commands(response.fetch('commands', []), context_infos)
end

Instance Attribute Details

#delivererSqreen::Deliveries::Simple



95
96
97
# File 'lib/sqreen/runner.rb', line 95

def deliverer
  @deliverer
end

#heartbeat_delayObject

Returns the value of attribute heartbeat_delay.



92
93
94
# File 'lib/sqreen/runner.rb', line 92

def heartbeat_delay
  @heartbeat_delay
end

#instrumenterObject (readonly)

Returns the value of attribute instrumenter.



98
99
100
# File 'lib/sqreen/runner.rb', line 98

def instrumenter
  @instrumenter
end

#metrics_engineObject

Returns the value of attribute metrics_engine.



93
94
95
# File 'lib/sqreen/runner.rb', line 93

def metrics_engine
  @metrics_engine
end

#next_command_resultsObject

Returns the value of attribute next_command_results.



100
101
102
# File 'lib/sqreen/runner.rb', line 100

def next_command_results
  @next_command_results
end

#next_metricsObject

Returns the value of attribute next_metrics.



101
102
103
# File 'lib/sqreen/runner.rb', line 101

def next_metrics
  @next_metrics
end

#runningObject

Returns the value of attribute running.



99
100
101
# File 'lib/sqreen/runner.rb', line 99

def running
  @running
end

#sessionSqreen::Session (readonly)

Returns:



97
98
99
# File 'lib/sqreen/runner.rb', line 97

def session
  @session
end

Instance Method Details

#aggregate_observationsObject



458
459
460
461
462
463
464
465
# File 'lib/sqreen/runner.rb', line 458

def aggregate_observations
  q = Sqreen.observations_queue
  conv = Sqreen.time - Time.now.utc.to_f
  q.size.times do
    cat, key, obs, t = q.pop
    metrics_engine.update(cat, conv + t.utc.to_f, key, obs)
  end
end

#batch_events(batch_size, max_staleness = nil, use_signals = false) ⇒ Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/sqreen/runner.rb', line 194

def batch_events(batch_size, max_staleness = nil, use_signals = false)
  size = batch_size.to_i

  if size <= 1 && use_signals
    Sqreen.log.warn do
      "Using signals with no delivery batching is unsupported. " \
      "Using instead batching with batch size = 30, max_staleness = 60"
    end
    size = 30
    max_staleness = 60
  end

  self.deliverer = if size < 1
                     Deliveries::Simple.new(session)
                   else
                     staleness = max_staleness.to_i
                     Deliveries::Batch.new(session, size, staleness)
                   end
end

#call_counts_metrics_period=(value) ⇒ Object



248
249
250
251
252
253
254
# File 'lib/sqreen/runner.rb', line 248

def call_counts_metrics_period=(value)
  value = value.to_i
  return unless value > 0 # else disable collection?
  metrics_engine.create_metric('name' => CallCountable::COUNT_CALLS,
                               'period' => value,
                               'kind' => 'Sum')
end

#change_features(new_features, _context_infos = {}) ⇒ Object



449
450
451
452
453
454
455
456
# File 'lib/sqreen/runner.rb', line 449

def change_features(new_features, _context_infos = {})
  old = features
  self.features = new_features
  {
    'was' => old,
    'now' => new_features,
  }
end

#change_performance_budget(budget, _context_infos = {}) ⇒ Object



406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
# File 'lib/sqreen/runner.rb', line 406

def change_performance_budget(budget, _context_infos = {})
  return false unless budget.nil? || budget.to_f > 0

  if @configuration.get(:weave)
    prev = Sqreen::Weave::Budget.current
    prev = prev.to_h if prev

    budget_s = budget.to_f / 1000 if budget

    feature = features['performance_budget']
    if feature
      budget_s = feature['threshold'] if feature.key?('threshold')
      ratio = feature['ratio'] if feature.key?('ratio')
    end

    Sqreen::Weave::Budget.update(threshold: budget_s, ratio: ratio)
  else
    prev = Sqreen.performance_budget
    Sqreen.update_performance_budget(budget)
  end

  { :was => prev }
end

#change_whitelisted_ips(ips, _context_infos = {}) ⇒ Object



443
444
445
446
447
# File 'lib/sqreen/runner.rb', line 443

def change_whitelisted_ips(ips, _context_infos = {})
  return false unless ips.respond_to?(:each)
  Sqreen.update_whitelisted_ips(ips)
  true
end

#change_whitelisted_paths(paths, _context_infos = {}) ⇒ Object



400
401
402
403
404
# File 'lib/sqreen/runner.rb', line 400

def change_whitelisted_paths(paths, _context_infos = {})
  return false unless paths.respond_to?(:each)
  Sqreen.update_whitelisted_paths(paths)
  true
end

#config_binned_metrics(level, base, factor, base_pct, factor_pct) ⇒ Object



265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/sqreen/runner.rb', line 265

def config_binned_metrics(level, base, factor, base_pct, factor_pct)
  level = level.to_i
  if level <= 0
    Sqreen.log.info('Disabling binned metrics')
    PerformanceNotifications::BinnedMetrics.disable
  else
    Sqreen.log.info('Enabling binned metrics')
    Sqreen.log.warn("Unknown value for perf_level: #{level}. Treating as 1") unless level == 1
    PerformanceNotifications::BinnedMetrics.enable(
      metrics_engine, PERF_METRICS_PERIOD, base.to_f, factor.to_f, base_pct.to_f, factor_pct.to_f
    )
  end
end

#create_session(session_class) ⇒ Object



184
185
186
187
# File 'lib/sqreen/runner.rb', line 184

def create_session(session_class)
  @session = session_class.new(@url, @cert_store, @token, @app_name, @proxy_url)
  session.(@framework)
end

#deliver_metrics_as_eventObject



360
361
362
363
364
365
366
# File 'lib/sqreen/runner.rb', line 360

def deliver_metrics_as_event
  # this is disastrous withe simple delivery strategy,
  # as each aggregated metric would trigger an http request
  # Sending of metrics is therefore not supported with simple delivery strategy
  # TODO: Confirm that only batch is used in production
  next_metrics.each { |x| deliverer.post_event(x) }
end

#do_heartbeatObject



346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/sqreen/runner.rb', line 346

def do_heartbeat
  @last_heartbeat_request = Time.now
  @next_metrics.concat(metrics_engine.publish(false)) if metrics_engine
  metrics_in_hb = use_signals? ? nil : next_metrics

  res = session.heartbeat(next_command_results, metrics_in_hb)
  next_command_results.clear

  deliver_metrics_as_event if use_signals?
  next_metrics.clear

  process_commands(res['commands'])
end

#exit_from_sinatra_startup?Boolean

Returns:

  • (Boolean)


512
513
514
515
516
# File 'lib/sqreen/runner.rb', line 512

def exit_from_sinatra_startup?
  defined?(Sinatra::Application) &&
    Sinatra::Application.respond_to?(:run?) &&
    !Sinatra::Application.run?
end

#features(_context_infos = {}) ⇒ Object



368
369
370
# File 'lib/sqreen/runner.rb', line 368

def features(_context_infos = {})
  Sqreen.features
end

#features=(features) ⇒ Object



376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# File 'lib/sqreen/runner.rb', line 376

def features=(features)
  Sqreen.update_features(features)
  session.request_compression = features['request_compression'] if session
  session.use_signals = use_signals?
  self.performance_metrics_period = features['performance_metrics_period']

  unless @configuration.get(:weave)

  config_binned_metrics(features['perf_level'] || DEFAULT_PERF_LEVEL,
                        features['perf_base'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_BASE,
                        features['perf_unit'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_UNIT,
                        features['perf_pct_base'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_PCT_BASE,
                        features['perf_pct_unit'] || PerformanceNotifications::BinnedMetrics::DEFAULT_PERF_PCT_UNIT,
                       )

  end

  self.call_counts_metrics_period = features['call_counts_metrics_period']
  hd = features['heartbeat_delay'].to_i
  self.heartbeat_delay = hd if hd > 0
  return if features['batch_size'].nil?
  batch_events(features['batch_size'], features['max_staleness'], use_signals?)
end

#handle_event(event) ⇒ Object



498
499
500
501
502
503
504
# File 'lib/sqreen/runner.rb', line 498

def handle_event(event)
  if event == METRICS_EVENT
    aggregate_observations
  else
    @deliverer.post_event(event)
  end
end

#heartbeat_needed?Boolean

Returns:

  • (Boolean)


467
468
469
# File 'lib/sqreen/runner.rb', line 467

def heartbeat_needed?
  (@last_heartbeat_request + heartbeat_delay) < Time.now
end

#load_rules(context_infos = {}) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/sqreen/runner.rb', line 214

def load_rules(context_infos = {})
  rules_pack = context_infos['rules']
  rulespack_id = context_infos['pack_id']
  if rules_pack.nil? || rulespack_id.nil?
    session_rules = session.rules
    rules_pack = session_rules['rules']
    rulespack_id = session_rules['pack_id']
  elsif @configuration.get(:rules_dump)
    rules_dir = (defined?(Rails) ? Rails.root : Pathname.pwd) + 'tmp/sqreen/rules'
    FileUtils.mkdir_p(rules_dir.to_s)
    File.open("#{rules_dir}/#{rulespack_id}.json", "wb") { |f| f.write(JSON.pretty_generate(rules_pack)) }
    FileUtils.mkdir_p("#{rules_dir}/#{rulespack_id}")
    rules_pack.each do |r|
      r = r.dup
      r['rulespack_id'] = rulespack_id
      File.open("#{rules_dir}/#{rulespack_id}/#{r['name']}.json", "wb") { |f| f.write(JSON.pretty_generate(r)) }
    end
  end
  rules = rules_pack.each { |r| r['rulespack_id'] = rulespack_id }
  Sqreen.log.info { format('retrieved rulespack id: %s', rulespack_id) }
  Sqreen.log.debug { format('retrieved %d rules', rules.size) }
  local_rules = Sqreen::Rules.local(@configuration) || []
  rules += local_rules.
           select { |rule| rule['enabled'] }.
           each { |r| r['rulespack_id'] = 'local' }
  Sqreen.log.debug do
    format('rules: %s', rules.
           sort_by { |r| r['name'] }.
           map { |r| format('(%s, %s, %s)', r[Rules::Attrs::NAME], r.to_json.size, r[Rules::Attrs::BLOCK]) }.
           join(', '))
  end
  [rulespack_id, rules]
end

#logout(retrying = true) ⇒ Object



532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
# File 'lib/sqreen/runner.rb', line 532

def logout(retrying = true)
  return unless session
  Sqreen.log.debug("Logging out")
  if @framework.development?
    @running = false
    return
  end
  if @logged_out_tried
    Sqreen.log.debug('Not running logout twice')
    return
  end
  @logged_out_tried = true
  @deliverer.drain if @deliverer
  aggregate_observations
  session.post_metrics(metrics_engine.publish) if metrics_engine
  session.logout(retrying)
  @running = false
end

#performance_metrics_period=(value) ⇒ Object



256
257
258
259
260
261
262
263
# File 'lib/sqreen/runner.rb', line 256

def performance_metrics_period=(value)
  value = value.to_i
  if value > 0
    PerformanceNotifications::Metrics.enable(metrics_engine, value)
  else
    PerformanceNotifications::Metrics.disable
  end
end

#periodic_cleanupObject



489
490
491
492
493
494
495
496
# File 'lib/sqreen/runner.rb', line 489

def periodic_cleanup
  # Nothing occured:
  # tick delivery, aggregates_metrics
  # issue a simple heartbeat if it's time (which may return commands)
  @deliverer.tick
  aggregate_observations
  do_heartbeat if heartbeat_needed?
end

#process_commands(commands, context_infos = {}) ⇒ Object



340
341
342
343
344
# File 'lib/sqreen/runner.rb', line 340

def process_commands(commands, context_infos = {})
  return if commands.nil? || commands.empty?
  res = RemoteCommand.process_list(self, commands, context_infos)
  @next_command_results = res
end

#register_exit_cb(try_again = true) ⇒ Object



551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
# File 'lib/sqreen/runner.rb', line 551

def register_exit_cb(try_again = true)
  at_exit do
    if exit_from_sinatra_startup? && try_again
      register_exit_cb(false)
    else
      begin
        logout
      rescue StandardError => e
        Sqreen.log.debug(e.inspect)
        Sqreen.log.debug(e.backtrace)
        nil
      end
    end
  end
end

#reload_actions(_context_infos = {}) ⇒ Object



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/sqreen/runner.rb', line 310

def reload_actions(_context_infos = {})
  Sqreen.log.debug 'Reloading actions'

  data = session.get_actionspack

  unless data.respond_to?(:[]) && data['status']
    Sqreen.log.warn('Could not load actions')
    return RemoteCommand::FailureOutput.new(
      :error => 'Could not load actions from /actionspack'
    )
  end

  action_hashes = data['actions']
  unless action_hashes.respond_to? :each
    Sqreen.log.warn('No action definitions in response')
    return RemoteCommand::FailureOutput.new(
      :error => 'No action definitions in response'
    )
  end
  Sqreen.log.debug("Loading actions from hashes #{action_hashes}")

  unsupported = load_actions(action_hashes)

  if unsupported.empty?
    true
  else
    RemoteCommand::FailureOutput.new(:unsupported_actions => unsupported.to_a)
  end
end

#reload_rules(_context_infos = {}) ⇒ Object



300
301
302
303
304
305
306
307
308
# File 'lib/sqreen/runner.rb', line 300

def reload_rules(_context_infos = {})
  Sqreen.log.debug 'Reloading rules'
  rulespack_id, rules = load_rules
  instrumenter.remove_all_callbacks

  @framework.instrument_when_ready!(instrumenter, rules)
  Sqreen.log.debug 'Rules reloaded'
  rulespack_id.to_s
end

#remove_instrumentation(_context_infos = {}) ⇒ Object



292
293
294
295
296
297
298
# File 'lib/sqreen/runner.rb', line 292

def remove_instrumentation(_context_infos = {})
  Sqreen.log.debug 'Removing instrumentation'
  instrumenter.remove_all_callbacks
  Sqreen::Actions::Repository.clear
  Sqreen.log.debug 'Instrumentation removed'
  true
end

#restart(_context_infos = {}) ⇒ Object



523
524
525
526
527
528
529
530
# File 'lib/sqreen/runner.rb', line 523

def restart(_context_infos = {})
  shutdown
  heartbeat_delay = @heartbeat_delay
  Thread.new do
    sleep(2 * heartbeat_delay)
    Sqreen::Worker.start(Sqreen.framework)
  end
end

#run_watcherObject



506
507
508
# File 'lib/sqreen/runner.rb', line 506

def run_watcher
  run_watcher_once while running
end

#run_watcher_onceObject



471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
# File 'lib/sqreen/runner.rb', line 471

def run_watcher_once
  event = Timeout.timeout(heartbeat_delay) do
    Sqreen.queue.pop
  end
rescue Timeout::Error
  periodic_cleanup
else
  handle_event(event)
  if heartbeat_needed?
    # Also aggregate/post metrics when cleanup has
    # not been done for a long time
    Sqreen.log.debug 'Forced an heartbeat'
    periodic_cleanup # will trigger do_heartbeat since it's time
  end
ensure
  PerformanceNotifications::BinnedMetrics.finish_watcher_run
end

#setup_instrumentation(context_infos = {}) ⇒ Object



280
281
282
283
284
285
286
287
288
289
290
# File 'lib/sqreen/runner.rb', line 280

def setup_instrumentation(context_infos = {})
  Sqreen.log.info 'Setting up instrumentation'
  rulespack_id, rules = load_rules(context_infos)
  @framework.instrument_when_ready!(instrumenter, rules)
  Sqreen.log.info 'Instrumentation set up'

  # XXX: ecosystem instrumentation should likely be deferred
  #      the same way the rest might be
  @ecosystem_integration.init
  rulespack_id.to_s
end

#shutdown(_context_infos = {}) ⇒ Object



518
519
520
521
# File 'lib/sqreen/runner.rb', line 518

def shutdown(_context_infos = {})
  remove_instrumentation
  logout
end

#tracing_enable(tracing_id_prefix, sampling_config, _context_infos = {}) ⇒ Object

Parameters:

  • tracing_id_prefix (String)
  • sampling_config (Array<Hash{String=>Object}>)


432
433
434
435
# File 'lib/sqreen/runner.rb', line 432

def tracing_enable(tracing_id_prefix, sampling_config, _context_infos = {})
  @ecosystem_integration.handle_tracing_command(tracing_id_prefix, sampling_config)
  { status: true }
end

#upload_bundle(_context_infos = {}) ⇒ Object



437
438
439
440
441
# File 'lib/sqreen/runner.rb', line 437

def upload_bundle(_context_infos = {})
  t = Time.now
  session.post_bundle(RuntimeInfos.dependencies_signature, RuntimeInfos.dependencies)
  Time.now - t
end

#use_signals?Boolean

Returns:

  • (Boolean)


372
373
374
# File 'lib/sqreen/runner.rb', line 372

def use_signals?
  features.fetch('use_signals', DEFAULT_USE_SIGNALS)
end