Class: Journaled::Outbox::Adapter

Inherits:
DeliveryAdapter show all
Defined in:
lib/journaled/outbox/adapter.rb

Overview

Outbox-style delivery adapter for custom event processing

This adapter stores events in a database table instead of enqueuing to ActiveJob. Events are processed by separate worker daemons that poll the database.

Setup:

  1. Generate migrations: rails generate journaled:database_events

  2. Run migrations: rails db:migrate

  3. Configure: Journaled.delivery_adapter = Journaled::Outbox::Adapter

  4. Start workers: bundle exec rake journaled_worker:work

Defined Under Namespace

Classes: TableNotFoundError

Class Method Summary collapse

Class Method Details

.check_table_exists!Object

Check if the required database table exists

Raises:



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/journaled/outbox/adapter.rb', line 46

def self.check_table_exists!
  return if @table_exists

  unless Event.table_exists?
    raise TableNotFoundError, <<~ERROR
      Journaled::Outbox::Adapter requires the 'journaled_outbox_events' table.

      To create the required tables, run:

        rake journaled:install:migrations
        rails db:migrate

      For more information, see the README:
      https://github.com/Betterment/journaled#outbox-style-event-processing-optional
    ERROR
  end

  @table_exists = true
end

.deliver(events:) ⇒ void

This method returns an undefined value.

Delivers events by inserting them into the database

Parameters:

  • events (Array)

    Array of journaled events to deliver

  • ** (Hash)

    Additional options (ignored, for interface compatibility)



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/journaled/outbox/adapter.rb', line 23

def self.deliver(events:, **)
  check_table_exists!

  records = events.map do |event|
    # Exclude the application-level id - the database will generate its own using uuid_generate_v7()
    event_data = event.journaled_attributes.except(:id)

    {
      event_type: event.journaled_attributes[:event_type],
      event_data:,
      partition_key: event.journaled_partition_key,
      stream_name: event.journaled_stream_name,
    }
  end

  # rubocop:disable Rails/SkipsModelValidations
  Event.insert_all(records) if records.any?
  # rubocop:enable Rails/SkipsModelValidations
end

.transaction_connectionActiveRecord::ConnectionAdapters::AbstractAdapter

Returns the database connection to use for transactional batching

The Outbox adapter uses the same database as the Outbox events table, since events are staged in memory and then written to journaled_events within the same transaction.

Returns:

  • (ActiveRecord::ConnectionAdapters::AbstractAdapter)

    The connection to use



73
74
75
# File 'lib/journaled/outbox/adapter.rb', line 73

def self.transaction_connection
  Event.connection
end

.validate_configuration!Object

Validates that PostgreSQL is being used

The Outbox adapter requires PostgreSQL for UUID v7 support and row-level locking

Raises:

  • (StandardError)

    if the database adapter is not PostgreSQL



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/journaled/outbox/adapter.rb', line 82

def self.validate_configuration!
  return if Event.connection.adapter_name == 'PostgreSQL'

  raise <<~ERROR
    Journaled::Outbox::Adapter requires PostgreSQL database adapter.

    Current adapter: #{Event.connection.adapter_name}

    The Outbox pattern uses PostgreSQL-specific features like UUID v7 generation
    and row-level locking for distributed worker coordination. Other databases are not supported.
  ERROR
end