Class: SandthornSequelProjection::ProcessedEventsTracker

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/sandthorn_sequel_projection/processed_events_tracker.rb

Constant Summary collapse

DEFAULT_TABLE_NAME =
:processed_events_trackers

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(identifier: required(:identifier), event_store: required(:event_store), db_connection: nil) ⇒ ProcessedEventsTracker

Returns a new instance of ProcessedEventsTracker.



14
15
16
17
18
19
20
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 14

def initialize(identifier: required(:identifier), event_store: required(:event_store), db_connection: nil)
  @identifier = identifier.to_s
  @event_store = event_store
  @db_connection = db_connection || SandthornSequelProjection.configuration.db_connection
  @lock = Lock.new(identifier, @db_connection)
  ensure_row
end

Instance Attribute Details

#db_connectionObject (readonly)

Returns the value of attribute db_connection.



10
11
12
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 10

def db_connection
  @db_connection
end

#event_storeObject (readonly)

Returns the value of attribute event_store.



10
11
12
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 10

def event_store
  @event_store
end

#identifierObject (readonly)

Returns the value of attribute identifier.



10
11
12
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 10

def identifier
  @identifier
end

#lockObject (readonly)

Returns the value of attribute lock.



10
11
12
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 10

def lock
  @lock
end

Class Method Details

.migrate!(db_connection) ⇒ Object



68
69
70
71
72
73
74
75
76
77
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 68

def self.migrate!(db_connection)
  db_connection.create_table?(table_name) do
    String    :identifier
    Integer   :last_processed_sequence_number, default: 0
    DateTime  :locked_at, null: true
    index [:identifier], unique: true
  end
rescue Exception => e
  raise MigrationError, e
end

.table_nameObject



64
65
66
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 64

def self.table_name
  DEFAULT_TABLE_NAME
end

Instance Method Details

#last_processed_sequence_numberObject



42
43
44
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 42

def last_processed_sequence_number
  row[:last_processed_sequence_number]
end

#process_events(&block) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 28

def process_events(&block)
  with_lock do
    cursor = Cursor.new(after_sequence_number: last_processed_sequence_number, event_store: event_store)
    events = cursor.get_batch
    until(events.empty?)
      transaction do
        block.call(events)
        write_sequence_number(cursor.last_sequence_number)
      end
      events = cursor.get_batch
    end
  end
end

#resetObject



58
59
60
61
62
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 58

def reset
  with_lock do
    write_sequence_number(0)
  end
end

#rowObject



54
55
56
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 54

def row
  table.where(identifier: identifier).first
end

#row_exists?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 50

def row_exists?
  !row.nil?
end

#tableObject



46
47
48
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 46

def table
  db_connection[table_name]
end

#with_lockObject



22
23
24
25
26
# File 'lib/sandthorn_sequel_projection/processed_events_tracker.rb', line 22

def with_lock
  @lock.acquire do
    yield
  end
end