Class: SandthornSequelProjection::ProcessedEventsTracker
- Inherits:
-
Object
- Object
- SandthornSequelProjection::ProcessedEventsTracker
- Extended by:
- Forwardable
- Defined in:
- lib/sandthorn_sequel_projection/processed_events_tracker.rb
Constant Summary collapse
- DEFAULT_TABLE_NAME =
:processed_events_trackers
Instance Attribute Summary collapse
-
#db_connection ⇒ Object
readonly
Returns the value of attribute db_connection.
-
#event_store ⇒ Object
readonly
Returns the value of attribute event_store.
-
#identifier ⇒ Object
readonly
Returns the value of attribute identifier.
-
#lock ⇒ Object
readonly
Returns the value of attribute lock.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(identifier: required(:identifier), event_store: required(:event_store), db_connection: nil) ⇒ ProcessedEventsTracker
constructor
A new instance of ProcessedEventsTracker.
- #last_processed_sequence_number ⇒ Object
- #process_events(&block) ⇒ Object
- #reset ⇒ Object
- #row ⇒ Object
- #row_exists? ⇒ Boolean
- #table ⇒ Object
- #with_lock ⇒ Object
Constructor Details
#initialize(identifier: required(:identifier), event_store: required(:event_store), db_connection: nil) ⇒ ProcessedEventsTracker
Returns a new instance of ProcessedEventsTracker.
14 15 16 17 18 19 20 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 14 def initialize(identifier: required(:identifier), event_store: required(:event_store), db_connection: nil) @identifier = identifier.to_s @event_store = event_store @db_connection = db_connection || SandthornSequelProjection.configuration.db_connection @lock = Lock.new(identifier, @db_connection) ensure_row end |
Instance Attribute Details
#db_connection ⇒ Object (readonly)
Returns the value of attribute db_connection.
10 11 12 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 10 def db_connection @db_connection end |
#event_store ⇒ Object (readonly)
Returns the value of attribute event_store.
10 11 12 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 10 def event_store @event_store end |
#identifier ⇒ Object (readonly)
Returns the value of attribute identifier.
10 11 12 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 10 def identifier @identifier end |
#lock ⇒ Object (readonly)
Returns the value of attribute lock.
10 11 12 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 10 def lock @lock end |
Class Method Details
.migrate!(db_connection) ⇒ Object
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 68 def self.migrate!(db_connection) db_connection.create_table?(table_name) do String :identifier Integer :last_processed_sequence_number, default: 0 DateTime :locked_at, null: true index [:identifier], unique: true end rescue Exception => e raise MigrationError, e end |
.table_name ⇒ Object
64 65 66 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 64 def self.table_name DEFAULT_TABLE_NAME end |
Instance Method Details
#last_processed_sequence_number ⇒ Object
42 43 44 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 42 def last_processed_sequence_number row[:last_processed_sequence_number] end |
#process_events(&block) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 28 def process_events(&block) with_lock do cursor = Cursor.new(after_sequence_number: last_processed_sequence_number, event_store: event_store) events = cursor.get_batch until(events.empty?) transaction do block.call(events) write_sequence_number(cursor.last_sequence_number) end events = cursor.get_batch end end end |
#reset ⇒ Object
58 59 60 61 62 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 58 def reset with_lock do write_sequence_number(0) end end |
#row ⇒ Object
54 55 56 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 54 def row table.where(identifier: identifier).first end |
#row_exists? ⇒ Boolean
50 51 52 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 50 def row_exists? !row.nil? end |
#table ⇒ Object
46 47 48 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 46 def table db_connection[table_name] end |
#with_lock ⇒ Object
22 23 24 25 26 |
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 22 def with_lock @lock.acquire do yield end end |