Class: MessageDriver::Adapters::InMemoryAdapter

Inherits:
Base
  • Object
show all
Defined in:
lib/message_driver/adapters/in_memory_adapter.rb

Defined Under Namespace

Classes: Destination, InMemoryContext, Message, Subscription

Instance Attribute Summary

Attributes inherited from Base

#broker

Instance Method Summary collapse

Methods inherited from Base

#contexts, #new_context

Constructor Details

#initialize(broker, _config = {}) ⇒ InMemoryAdapter

Returns a new instance of InMemoryAdapter.



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 88

def initialize(broker, _config = {})
  @broker = broker
  @destinations = {}
  begin
    require 'thread_safe'
    @message_store = ThreadSafe::Cache.new { |h, k| h[k] = [] }
    @subscriptions = ThreadSafe::Cache.new { |h, k| h[k] = [] }
  rescue LoadError
    @message_store = Hash.new { |h, k| h[k] = [] }
    @subscriptions = Hash.new { |h, k| h[k] = [] }
  end
end

Instance Method Details

#add_subscription_for(name, subscription) ⇒ Object



169
170
171
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 169

def add_subscription_for(name, subscription)
  @subscriptions[name].push subscription
end

#build_contextObject



101
102
103
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 101

def build_context
  InMemoryContext.new(self)
end

#consumer_count_for(name) ⇒ Object



177
178
179
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 177

def consumer_count_for(name)
  @subscriptions[name].size
end

#create_destination(name, dest_options = {}, message_props = {}) ⇒ Object Also known as: handle_create_destination



105
106
107
108
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 105

def create_destination(name, dest_options = {}, message_props = {})
  destination = Destination.new(self, name, dest_options, message_props)
  @destinations[name] = destination
end

#message_queue_for(name) ⇒ Object



153
154
155
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 153

def message_queue_for(name)
  @message_store[name]
end

#next_subscription_for(name) ⇒ Object



161
162
163
164
165
166
167
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 161

def next_subscription_for(name)
  unless (subs = @subscriptions[name]).empty?
    sub = subs.shift
    subs.push sub
    sub
  end
end

#remove_subscription_for(name, subscription) ⇒ Object



173
174
175
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 173

def remove_subscription_for(name, subscription)
  @subscriptions[name].delete(subscription)
end

#reset_after_testsObject



146
147
148
149
150
151
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 146

def reset_after_tests
  @message_store.keys.each do |k|
    @message_store[k] = []
  end
  @subscriptions.clear
end

#stopObject



141
142
143
144
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 141

def stop
  super
  reset_after_tests
end

#subscriptions_for(name) ⇒ Object



157
158
159
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 157

def subscriptions_for(name)
  @subscriptions[name]
end