Class: DeliveryBoy::Fake

Inherits:
Object
  • Object
show all
Defined in:
lib/delivery_boy/fake.rb

Overview

A fake implementation that is useful for testing.

Defined Under Namespace

Classes: FakeMessage

Instance Method Summary collapse

Constructor Details

#initializeFake

Returns a new instance of Fake.



11
12
13
14
15
# File 'lib/delivery_boy/fake.rb', line 11

def initialize
  @messages = Hash.new {|h, k| h[k] = [] }
  @buffer = Hash.new {|h, k| h[k] = [] }
  @delivery_lock = Mutex.new
end

Instance Method Details

#buffer_sizeObject



60
61
62
63
64
# File 'lib/delivery_boy/fake.rb', line 60

def buffer_size
  @delivery_lock.synchronize do
    @buffer.values.flatten.size
  end
end

#clearObject

Clear all messages stored in memory.



67
68
69
70
71
72
# File 'lib/delivery_boy/fake.rb', line 67

def clear
  @delivery_lock.synchronize do
    @messages.clear
    @buffer.clear
  end
end

#clear_bufferObject



54
55
56
57
58
# File 'lib/delivery_boy/fake.rb', line 54

def clear_buffer
  @delivery_lock.synchronize do
    @buffer.clear
  end
end

#deliver(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now) ⇒ Object Also known as: deliver_async!



17
18
19
20
21
22
23
24
25
26
# File 'lib/delivery_boy/fake.rb', line 17

def deliver(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now)
  @delivery_lock.synchronize do
    offset = @messages[topic].count
    message = FakeMessage.new(value, topic, key, headers, offset, partition, partition_key, create_time)

    @messages[topic] << message
  end

  nil
end

#deliver_messagesObject



41
42
43
44
45
46
47
48
# File 'lib/delivery_boy/fake.rb', line 41

def deliver_messages
  @delivery_lock.synchronize do
    @buffer.each do |topic, messages|
      @messages[topic].push(*messages)
    end
    @buffer.clear
  end
end

#messages_for(topic) ⇒ Object

Return all messages written to the specified topic.



75
76
77
78
79
80
81
# File 'lib/delivery_boy/fake.rb', line 75

def messages_for(topic)
  @delivery_lock.synchronize do
    # Return a clone so that the list of messages can be traversed
    # without worrying about a concurrent modification
    @messages[topic].clone
  end
end

#produce(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now) ⇒ Object



30
31
32
33
34
35
36
37
38
39
# File 'lib/delivery_boy/fake.rb', line 30

def produce(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now)
  @delivery_lock.synchronize do
    offset = @buffer[topic].count
    message = FakeMessage.new(value, topic, key, headers, offset, partition, partition_key, create_time)

    @buffer[topic] << message
  end

  nil
end

#shutdownObject



50
51
52
# File 'lib/delivery_boy/fake.rb', line 50

def shutdown
  clear
end