Class: Journaled::Outbox::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/journaled/outbox/worker.rb

Overview

Worker daemon for processing Outbox-style events

This worker polls the database for pending events and sends them to Kinesis in batches. Multiple workers can run concurrently and will coordinate using row-level locking.

The Worker handles the daemon lifecycle (start/stop, signal handling, run loop) and delegates actual batch processing to BatchProcessor.

Usage:

worker = Journaled::Outbox::Worker.new
worker.start  # Blocks until shutdown signal received

Instance Method Summary collapse

Constructor Details

#initializeWorker

Returns a new instance of Worker.



17
18
19
20
21
22
23
24
# File 'lib/journaled/outbox/worker.rb', line 17

def initialize
  @worker_id = "#{Socket.gethostname}-#{Process.pid}"
  self.running = false
  @processor = BatchProcessor.new
  @metric_emitter = MetricEmitter.new(worker_id: @worker_id)
  self.shutdown_requested = false
  @last_metrics_emission = Time.current
end

Instance Method Details

#running?Boolean

Check if worker is still running

Returns:

  • (Boolean)


47
48
49
# File 'lib/journaled/outbox/worker.rb', line 47

def running?
  running
end

#shutdownObject

Request graceful shutdown



42
43
44
# File 'lib/journaled/outbox/worker.rb', line 42

def shutdown
  self.shutdown_requested = true
end

#startObject

Start the worker (blocks until shutdown)



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/journaled/outbox/worker.rb', line 27

def start
  check_prerequisites!

  self.running = true
  Rails.logger.info("Journaled worker starting (id: #{worker_id})")

  setup_signal_handlers

  run_loop
ensure
  self.running = false
  Rails.logger.info("Journaled worker stopped (id: #{worker_id})")
end