Class: Journaled::DeliveryAdapters::ActiveJobAdapter
- Inherits:
-
Journaled::DeliveryAdapter
- Object
- Journaled::DeliveryAdapter
- Journaled::DeliveryAdapters::ActiveJobAdapter
- Defined in:
- lib/journaled/delivery_adapters/active_job_adapter.rb
Overview
Default delivery adapter that uses ActiveJob
This adapter enqueues events to Journaled::DeliveryJob which sends them to Kinesis. This is the default behavior and maintains backward compatibility with previous versions of the gem.
Class Method Summary collapse
-
.deliver(events:, enqueue_opts:) ⇒ void
Delivers events by enqueueing them to Journaled::DeliveryJob.
-
.delivery_perform_args(events) ⇒ Array<Hash>
Serializes events into the format expected by DeliveryJob.
-
.transaction_connection ⇒ ActiveRecord::ConnectionAdapters::AbstractAdapter
Returns the database connection to use for transactional batching.
-
.validate_configuration! ⇒ void
Validates that a supported queue adapter is configured.
Class Method Details
.deliver(events:, enqueue_opts:) ⇒ void
This method returns an undefined value.
Delivers events by enqueueing them to Journaled::DeliveryJob
16 17 18 |
# File 'lib/journaled/delivery_adapters/active_job_adapter.rb', line 16 def self.deliver(events:, enqueue_opts:) Journaled::DeliveryJob.set(enqueue_opts).perform_later(*delivery_perform_args(events)) end |
.delivery_perform_args(events) ⇒ Array<Hash>
Serializes events into the format expected by DeliveryJob
24 25 26 27 28 29 30 31 32 |
# File 'lib/journaled/delivery_adapters/active_job_adapter.rb', line 24 def self.delivery_perform_args(events) events.map do |event| { serialized_event: event.journaled_attributes.to_json, partition_key: event.journaled_partition_key, stream_name: event.journaled_stream_name, } end end |
.transaction_connection ⇒ ActiveRecord::ConnectionAdapters::AbstractAdapter
Returns the database connection to use for transactional batching
This is determined by the configured queue adapter, since ActiveJob enqueues jobs to the same database that should be used for transactions.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/journaled/delivery_adapters/active_job_adapter.rb', line 40 def self.transaction_connection queue_adapter = Journaled.queue_adapter if queue_adapter.in? %w(delayed delayed_job) Delayed::Job.connection elsif queue_adapter == 'good_job' GoodJob::BaseRecord.connection elsif queue_adapter == 'que' Que::ActiveRecord::Model.connection elsif queue_adapter == 'test' && Rails.env.test? ActiveRecord::Base.connection else raise "Unsupported queue adapter: #{queue_adapter}" end end |
.validate_configuration! ⇒ void
This method returns an undefined value.
Validates that a supported queue adapter is configured
59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/journaled/delivery_adapters/active_job_adapter.rb', line 59 def self.validate_configuration! unless Journaled::SUPPORTED_QUEUE_ADAPTERS.include?(Journaled.queue_adapter) raise <<~MSG Journaled has detected an unsupported ActiveJob queue adapter: `:#{Journaled.queue_adapter}` Journaled jobs must be enqueued transactionally to your primary database. Please install the appropriate gems and set `queue_adapter` to one of the following: #{Journaled::SUPPORTED_QUEUE_ADAPTERS.map { |a| "- `:#{a}`" }.join("\n")} Read more at https://github.com/Betterment/journaled MSG end end |