Class: RubyEventStore::InMemoryRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/in_memory_repository.rb

Defined Under Namespace

Classes: EventInStream, UnsupportedVersionAnyUsage

Instance Method Summary collapse

Constructor Details

#initialize(serializer: NULL, ensure_supported_any_usage: false) ⇒ InMemoryRepository

Returns a new instance of InMemoryRepository.



25
26
27
28
29
30
31
# File 'lib/ruby_event_store/in_memory_repository.rb', line 25

def initialize(serializer: NULL, ensure_supported_any_usage: false)
  @serializer = serializer
  @streams = Hash.new { |h, k| h[k] = Array.new }
  @mutex = Mutex.new
  @storage = Hash.new
  @ensure_supported_any_usage = ensure_supported_any_usage
end

Instance Method Details

#append_to_stream(records, stream, expected_version) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/ruby_event_store/in_memory_repository.rb', line 33

def append_to_stream(records, stream, expected_version)
  serialized_records = records.map { |record| record.serialize(serializer) }

  with_synchronize(expected_version, stream) do |resolved_version|
    ensure_supported_any_usage(resolved_version, stream)
    unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version)
      raise WrongExpectedEventVersion
    end

    serialized_records.each_with_index do |serialized_record, index|
      raise EventDuplicatedInStream if has_event?(serialized_record.event_id)
      storage[serialized_record.event_id] = serialized_record
      add_to_stream(stream, serialized_record, resolved_version, index)
    end
  end
  self
end

#count(spec) ⇒ Object



102
103
104
# File 'lib/ruby_event_store/in_memory_repository.rb', line 102

def count(spec)
  read_scope(spec).count
end

#delete_stream(stream) ⇒ Object



68
69
70
# File 'lib/ruby_event_store/in_memory_repository.rb', line 68

def delete_stream(stream)
  streams.delete(stream.name)
end

#event_in_stream?(event_id, stream) ⇒ Boolean

Returns:

  • (Boolean)


138
139
140
# File 'lib/ruby_event_store/in_memory_repository.rb', line 138

def event_in_stream?(event_id, stream)
  !streams[stream.name].find { |event_in_stream| event_in_stream.event_id.eql?(event_id) }.nil?
end

#global_position(event_id) ⇒ Object



134
135
136
# File 'lib/ruby_event_store/in_memory_repository.rb', line 134

def global_position(event_id)
  storage.keys.index(event_id) or raise EventNotFound.new(event_id)
end

#has_event?(event_id) ⇒ Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/ruby_event_store/in_memory_repository.rb', line 72

def has_event?(event_id)
  storage.has_key?(event_id)
end

#last_stream_event(stream) ⇒ Object



76
77
78
79
# File 'lib/ruby_event_store/in_memory_repository.rb', line 76

def last_stream_event(stream)
  last_id = event_ids_of_stream(stream).last
  storage.fetch(last_id).deserialize(serializer) if last_id
end


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/ruby_event_store/in_memory_repository.rb', line 51

def link_to_stream(event_ids, stream, expected_version)
  serialized_records = event_ids.map { |id| read_event(id) }

  with_synchronize(expected_version, stream) do |resolved_version|
    ensure_supported_any_usage(resolved_version, stream)
    unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version)
      raise WrongExpectedEventVersion
    end

    serialized_records.each_with_index do |serialized_record, index|
      raise EventDuplicatedInStream if has_event_in_stream?(serialized_record.event_id, stream.name)
      add_to_stream(stream, serialized_record, resolved_version, index)
    end
  end
  self
end

#position_in_stream(event_id, stream) ⇒ Object



128
129
130
131
132
# File 'lib/ruby_event_store/in_memory_repository.rb', line 128

def position_in_stream(event_id, stream)
  event_in_stream = streams[stream.name].find { |event_in_stream| event_in_stream.event_id.eql?(event_id) }
  raise EventNotFoundInStream if event_in_stream.nil?
  event_in_stream.position
end

#read(spec) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/ruby_event_store/in_memory_repository.rb', line 81

def read(spec)
  serialized_records = read_scope(spec)
  if spec.batched?
    batch_reader = ->(offset, limit) do
      serialized_records
        .drop(offset)
        .take(limit)
        .map { |serialized_record| serialized_record.deserialize(serializer) }
    end
    BatchEnumerator.new(spec.batch_size, serialized_records.size, batch_reader).each
  elsif spec.first?
    serialized_records.first&.deserialize(serializer)
  elsif spec.last?
    serialized_records.last&.deserialize(serializer)
  else
    Enumerator.new do |y|
      serialized_records.each { |serialized_record| y << serialized_record.deserialize(serializer) }
    end
  end
end

#streams_of(event_id) ⇒ Object



124
125
126
# File 'lib/ruby_event_store/in_memory_repository.rb', line 124

def streams_of(event_id)
  streams.select { |name,| has_event_in_stream?(event_id, name) }.map { |name,| Stream.new(name) }
end

#update_messages(records) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/ruby_event_store/in_memory_repository.rb', line 106

def update_messages(records)
  records.each do |record|
    read_event(record.event_id)
    serialized_record =
      Record
        .new(
          event_id: record.event_id,
          event_type: record.event_type,
          data: record.data,
          metadata: record.,
          timestamp: Time.iso8601(storage.fetch(record.event_id).timestamp),
          valid_at: record.valid_at
        )
        .serialize(serializer)
    storage[record.event_id] = serialized_record
  end
end