Class: Klomp
- Inherits:
-
Object
show all
- Defined in:
- lib/klomp.rb,
lib/klomp/frames.rb,
lib/klomp/sentinel.rb,
lib/klomp/connection.rb
Defined Under Namespace
Modules: Frames
Classes: Connection, Error, FrameError, Sentinel
Constant Summary
collapse
- VERSION =
'1.0.5'
- FRAME_SEP =
null character is frame separator
"\x00"
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(servers, options = {}) ⇒ Klomp
Returns a new instance of Klomp.
8
9
10
11
12
|
# File 'lib/klomp.rb', line 8
def initialize(servers, options = {})
servers = [servers].flatten
raise ArgumentError, "no servers given" if servers.empty?
@connections = servers.map {|s| Connection.new(s, options) }
end
|
Instance Attribute Details
#connections ⇒ Object
Returns the value of attribute connections.
6
7
8
|
# File 'lib/klomp.rb', line 6
def connections
@connections
end
|
Instance Method Details
#connected? ⇒ Boolean
40
41
42
|
# File 'lib/klomp.rb', line 40
def connected?
connections.detect(&:connected?)
end
|
#disconnect ⇒ Object
44
45
46
47
48
|
# File 'lib/klomp.rb', line 44
def disconnect
connections.map {|conn| conn.disconnect }.tap do
@connections = []
end
end
|
#publish(queue, body, headers = {}) ⇒ Object
14
15
16
17
18
19
20
21
22
23
24
|
# File 'lib/klomp.rb', line 14
def publish(queue, body, = {})
connections_remaining = connections.dup
begin
conn = connections_remaining.sample
conn.publish(queue, body, )
rescue
connections_remaining.delete conn
retry unless connections_remaining.empty?
raise
end
end
|
#subscribe(queue, subscriber = nil, &block) ⇒ Object
26
27
28
|
# File 'lib/klomp.rb', line 26
def subscribe(queue, subscriber = nil, &block)
connections.map {|conn| conn.subscribe(queue, subscriber, &block) }
end
|
#unsubscribe(queue) ⇒ Object
30
31
32
33
34
35
36
37
38
|
# File 'lib/klomp.rb', line 30
def unsubscribe(queue)
if Array === queue
raise ArgumentError,
"wrong size array for #{connections.size} (#{queue.size})" unless connections.size == queue.size
connections.zip(queue).map {|conn,arg| conn.unsubscribe arg rescue nil }
else
connections.map {|conn| conn.unsubscribe(queue) rescue nil }
end
end
|