Class: MessageDriver::Adapters::InMemoryAdapter
- Inherits:
-
Base
- Object
- Base
- MessageDriver::Adapters::InMemoryAdapter
show all
- Defined in:
- lib/message_driver/adapters/in_memory_adapter.rb
Defined Under Namespace
Classes: Destination, InMemoryContext, Message, Subscription
Instance Attribute Summary
Attributes inherited from Base
#broker
Instance Method Summary
collapse
Methods inherited from Base
#contexts, #new_context
Constructor Details
#initialize(broker, _config = {}) ⇒ InMemoryAdapter
Returns a new instance of InMemoryAdapter.
88
89
90
91
92
93
94
95
96
97
98
99
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 88
def initialize(broker, _config = {})
@broker = broker
@destinations = {}
begin
require 'thread_safe'
@message_store = ThreadSafe::Cache.new { |h, k| h[k] = [] }
@subscriptions = ThreadSafe::Cache.new { |h, k| h[k] = [] }
rescue LoadError
@message_store = Hash.new { |h, k| h[k] = [] }
@subscriptions = Hash.new { |h, k| h[k] = [] }
end
end
|
Instance Method Details
#add_subscription_for(name, subscription) ⇒ Object
169
170
171
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 169
def add_subscription_for(name, subscription)
@subscriptions[name].push subscription
end
|
#build_context ⇒ Object
101
102
103
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 101
def build_context
InMemoryContext.new(self)
end
|
#consumer_count_for(name) ⇒ Object
177
178
179
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 177
def consumer_count_for(name)
@subscriptions[name].size
end
|
#create_destination(name, dest_options = {}, message_props = {}) ⇒ Object
Also known as:
handle_create_destination
105
106
107
108
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 105
def create_destination(name, dest_options = {}, message_props = {})
destination = Destination.new(self, name, dest_options, message_props)
@destinations[name] = destination
end
|
#message_queue_for(name) ⇒ Object
153
154
155
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 153
def message_queue_for(name)
@message_store[name]
end
|
#next_subscription_for(name) ⇒ Object
161
162
163
164
165
166
167
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 161
def next_subscription_for(name)
unless (subs = @subscriptions[name]).empty?
sub = subs.shift
subs.push sub
sub
end
end
|
#remove_subscription_for(name, subscription) ⇒ Object
173
174
175
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 173
def remove_subscription_for(name, subscription)
@subscriptions[name].delete(subscription)
end
|
#reset_after_tests ⇒ Object
146
147
148
149
150
151
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 146
def reset_after_tests
@message_store.keys.each do |k|
@message_store[k] = []
end
@subscriptions.clear
end
|
#stop ⇒ Object
141
142
143
144
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 141
def stop
super
reset_after_tests
end
|
#subscriptions_for(name) ⇒ Object
157
158
159
|
# File 'lib/message_driver/adapters/in_memory_adapter.rb', line 157
def subscriptions_for(name)
@subscriptions[name]
end
|