Class: Journaled::DeliveryAdapters::ActiveJobAdapter

Inherits:
Journaled::DeliveryAdapter show all
Defined in:
lib/journaled/delivery_adapters/active_job_adapter.rb

Overview

Default delivery adapter that uses ActiveJob

This adapter enqueues events to Journaled::DeliveryJob which sends them to Kinesis. This is the default behavior and maintains backward compatibility with previous versions of the gem.

Class Method Summary collapse

Class Method Details

.deliver(events:, enqueue_opts:) ⇒ void

This method returns an undefined value.

Delivers events by enqueueing them to Journaled::DeliveryJob

Parameters:

  • events (Array)

    Array of journaled events to deliver

  • enqueue_opts (Hash)

    Options for ActiveJob (priority, queue, wait, wait_until, etc.)



16
17
18
# File 'lib/journaled/delivery_adapters/active_job_adapter.rb', line 16

def self.deliver(events:, enqueue_opts:)
  Journaled::DeliveryJob.set(enqueue_opts).perform_later(*delivery_perform_args(events))
end

.delivery_perform_args(events) ⇒ Array<Hash>

Serializes events into the format expected by DeliveryJob

Parameters:

  • events (Array)

    Array of journaled events

Returns:

  • (Array<Hash>)

    Array of serialized event hashes



24
25
26
27
28
29
30
31
32
# File 'lib/journaled/delivery_adapters/active_job_adapter.rb', line 24

def self.delivery_perform_args(events)
  events.map do |event|
    {
      serialized_event: event.journaled_attributes.to_json,
      partition_key: event.journaled_partition_key,
      stream_name: event.journaled_stream_name,
    }
  end
end

.transaction_connectionActiveRecord::ConnectionAdapters::AbstractAdapter

Returns the database connection to use for transactional batching

This is determined by the configured queue adapter, since ActiveJob enqueues jobs to the same database that should be used for transactions.

Returns:

  • (ActiveRecord::ConnectionAdapters::AbstractAdapter)

    The connection to use



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/journaled/delivery_adapters/active_job_adapter.rb', line 40

def self.transaction_connection
  queue_adapter = Journaled.queue_adapter

  if queue_adapter.in? %w(delayed delayed_job)
    Delayed::Job.connection
  elsif queue_adapter == 'good_job'
    GoodJob::BaseRecord.connection
  elsif queue_adapter == 'que'
    Que::ActiveRecord::Model.connection
  elsif queue_adapter == 'test' && Rails.env.test?
    ActiveRecord::Base.connection
  else
    raise "Unsupported queue adapter: #{queue_adapter}"
  end
end

.validate_configuration!void

This method returns an undefined value.

Validates that a supported queue adapter is configured



59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/journaled/delivery_adapters/active_job_adapter.rb', line 59

def self.validate_configuration!
  unless Journaled::SUPPORTED_QUEUE_ADAPTERS.include?(Journaled.queue_adapter)
    raise <<~MSG
      Journaled has detected an unsupported ActiveJob queue adapter: `:#{Journaled.queue_adapter}`

      Journaled jobs must be enqueued transactionally to your primary database.

      Please install the appropriate gems and set `queue_adapter` to one of the following:
      #{Journaled::SUPPORTED_QUEUE_ADAPTERS.map { |a| "- `:#{a}`" }.join("\n")}

      Read more at https://github.com/Betterment/journaled
    MSG
  end
end