Module: MessageDriver::Client
Overview
The client module is the primary client API for MessageDriver. It can either be included in a class that is using it, or used directly.
Defining and Looking up Destinations collapse
- #dynamic_destination(dest_name, dest_options = {}, message_props = {}) ⇒ Object
-
#find_destination(destination_name) ⇒ Destination::Base
Find a previously declared Destination.
Defining and Looking Up Consumers collapse
Sending Messages collapse
Receiving Messages collapse
- #ack_message(message, options = {}) ⇒ Object
- #nack_message(message, options = {}) ⇒ Object
- #pop_message(destination, options = {}) ⇒ Object
- #subscribe(destination_name, consumer_name, options = {}) ⇒ Object
- #subscribe_with(destination_name, options = {}, &consumer) ⇒ Object
Transaction Management collapse
Instance Method Summary collapse
-
#[](index) ⇒ Client
The client for the specified broker.
-
#broker ⇒ Broker
The broker associated with this Client module.
-
#broker_name ⇒ Symbol
The name of the broker associated with this Client module.
Instance Method Details
#[](index) ⇒ Client
Returns the client for the specified broker.
180 181 182 |
# File 'lib/message_driver/client.rb', line 180 def [](index) Broker.client(index) end |
#ack_message(message, options = {}) ⇒ Object
72 73 74 |
# File 'lib/message_driver/client.rb', line 72 def (, = {}) .ack() end |
#broker ⇒ Broker
Returns the broker associated with this Client module.
153 154 155 |
# File 'lib/message_driver/client.rb', line 153 def broker Broker.broker(broker_name) end |
#broker_name ⇒ Symbol
Returns the name of the broker associated with this Client module.
158 159 160 |
# File 'lib/message_driver/client.rb', line 158 def broker_name Broker::DEFAULT_BROKER_NAME end |
#consumer(key, &block) ⇒ Object
48 49 50 |
# File 'lib/message_driver/client.rb', line 48 def consumer(key, &block) broker.consumer(key, &block) end |
#dynamic_destination(dest_name, dest_options = {}, message_props = {}) ⇒ Object
28 29 30 |
# File 'lib/message_driver/client.rb', line 28 def dynamic_destination(dest_name, = {}, = {}) current_adapter_context.create_destination(dest_name, , ) end |
#find_consumer(consumer) ⇒ Object
52 53 54 |
# File 'lib/message_driver/client.rb', line 52 def find_consumer(consumer) broker.find_consumer(consumer) end |
#find_destination(destination_name) ⇒ Destination::Base
Note:
if destination_name is a Destination::Base, find_destination will just return that destination back
Find a previously declared Destination
35 36 37 38 39 40 41 42 |
# File 'lib/message_driver/client.rb', line 35 def find_destination(destination_name) case destination_name when Destination::Base destination_name else broker.find_destination(destination_name) end end |
#nack_message(message, options = {}) ⇒ Object
76 77 78 |
# File 'lib/message_driver/client.rb', line 76 def (, = {}) .nack() end |
#pop_message(destination, options = {}) ⇒ Object
68 69 70 |
# File 'lib/message_driver/client.rb', line 68 def (destination, = {}) find_destination(destination).() end |
#publish(destination, body, headers = {}, properties = {}) ⇒ Object
60 61 62 |
# File 'lib/message_driver/client.rb', line 60 def publish(destination, body, headers = {}, properties = {}) find_destination(destination).publish(body, headers, properties) end |
#subscribe(destination_name, consumer_name, options = {}) ⇒ Object
80 81 82 83 |
# File 'lib/message_driver/client.rb', line 80 def subscribe(destination_name, consumer_name, = {}) consumer = find_consumer(consumer_name) subscribe_with(destination_name, , &consumer) end |
#subscribe_with(destination_name, options = {}, &consumer) ⇒ Object
85 86 87 88 |
# File 'lib/message_driver/client.rb', line 85 def subscribe_with(destination_name, = {}, &consumer) destination = find_destination(destination_name) current_adapter_context.subscribe(destination, , &consumer) end |
#with_message_transaction(options = {}) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/message_driver/client.rb', line 94 def ( = {}) wrapper = fetch_context_wrapper wrapper.increment_transaction_depth begin if wrapper.ctx.supports_transactions? if wrapper.transaction_depth == 1 wrapper.ctx.begin_transaction() begin yield rescue begin wrapper.ctx.rollback_transaction rescue => e logger.error exception_to_str(e) end raise end wrapper.ctx.commit_transaction else yield end else logger.debug('this adapter does not support transactions') yield end ensure wrapper.decrement_transaction_depth end end |