Class: Handlers::ReceiverHandler
- Inherits:
-
SRCommonHandler
- Object
- Qpid::Proton::MessagingHandler
- BasicHandler
- SRCommonHandler
- Handlers::ReceiverHandler
- Defined in:
- lib/handlers/receiver_handler.rb
Overview
Receiver events handler for receiver client
Instance Attribute Summary collapse
-
#browse ⇒ Object
Browse.
-
#count ⇒ Object
Count of expected messages to be received.
-
#prefetch ⇒ Object
Credit for messages to be pre-fetched.
-
#process_reply_to ⇒ Object
Process reply to.
-
#recv_listen ⇒ Object
Receiver listen.
-
#recv_listen_port ⇒ Object
Receiver listen port.
-
#selector ⇒ Object
Selector.
Attributes inherited from SRCommonHandler
#auto_settle_off, #idle_timeout, #log_lib, #log_msgs, #max_frame_size, #msg_content_hashed, #sasl_enabled
Attributes inherited from BasicHandler
#broker, #exit_timer, #idle_timeout, #log_lib, #max_frame_size, #sasl_enabled, #sasl_mechs
Instance Method Summary collapse
-
#do_process_reply_to(message) ⇒ Object
Processing reply to reply-to address of message.
-
#initialize(broker, log_msgs, msg_content_hashed, count, prefetch, process_reply_to, browse, selector, sasl_mechs, idle_timeout, max_frame_size, sasl_enabled, log_lib, recv_listen, recv_listen_port, auto_settle_off, exit_timer, duration, duration_mode) ⇒ ReceiverHandler
constructor
- Initialization of receiver events handler ==== Receiver events handler arguments broker
- URI of broker log_msgs
- format of message(s) log count
- number of messages to receive process-reply-to
- send message to reply-to address if enabled and message got reply-to address browse
- browse messages instead of reading sasl_mechs
-
allowed SASL mechanisms.
-
#on_container_start(container) ⇒ Object
Called when the event loop starts, connects receiver client to SRCommonHandler#broker and creates receiver.
-
#on_message(delivery, message) ⇒ Object
Called when a message is received, receiving ReceiverHandler#count messages.
-
#on_tracker_accept(_tracker) ⇒ Object
Called when the remote peer accepts an outgoing message, accepting ReceiverHandler#sent messages.
Methods inherited from SRCommonHandler
Constructor Details
#initialize(broker, log_msgs, msg_content_hashed, count, prefetch, process_reply_to, browse, selector, sasl_mechs, idle_timeout, max_frame_size, sasl_enabled, log_lib, recv_listen, recv_listen_port, auto_settle_off, exit_timer, duration, duration_mode) ⇒ ReceiverHandler
Initialization of receiver events handler
Receiver events handler arguments
- broker
-
URI of broker
- log_msgs
-
format of message(s) log
- count
-
number of messages to receive
- process-reply-to
-
send message to reply-to address if enabled and message got reply-to address
- browse
-
browse messages instead of reading
- sasl_mechs
-
allowed SASL mechanisms
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/handlers/receiver_handler.rb', line 49 def initialize( broker, log_msgs, msg_content_hashed, count, prefetch, process_reply_to, browse, selector, sasl_mechs, idle_timeout, max_frame_size, sasl_enabled, log_lib, recv_listen, recv_listen_port, auto_settle_off, exit_timer, duration, duration_mode ) super( broker, log_msgs, msg_content_hashed, sasl_mechs, idle_timeout, max_frame_size, sasl_enabled, log_lib, auto_settle_off, exit_timer ) # Save count of expected messages to be received @count = count # Save credit for messages to be pre-fetched @prefetch = prefetch # Save process reply to @process_reply_to = process_reply_to # Save browse @browse = browse # Save selector @selector = selector # Save recv-listen value @recv_listen = recv_listen # Save recv-listen port value @recv_listen_port = recv_listen_port # Number of received messages @received = 0 # Flag indicating that all expected messages were received @all_received = false # Hash with senders for replying @senders = {} # Counter of sent messages when processing reply-to @sent = 0 # Counter of accepted messages @accepted = 0 # Duration @duration = Duration.new(duration, count, duration_mode) end |
Instance Attribute Details
#browse ⇒ Object
Browse
32 33 34 |
# File 'lib/handlers/receiver_handler.rb', line 32 def browse @browse end |
#count ⇒ Object
Count of expected messages to be received
26 27 28 |
# File 'lib/handlers/receiver_handler.rb', line 26 def count @count end |
#prefetch ⇒ Object
Credit for messages to be pre-fetched
28 29 30 |
# File 'lib/handlers/receiver_handler.rb', line 28 def prefetch @prefetch end |
#process_reply_to ⇒ Object
Process reply to
30 31 32 |
# File 'lib/handlers/receiver_handler.rb', line 30 def process_reply_to @process_reply_to end |
#recv_listen ⇒ Object
Receiver listen
36 37 38 |
# File 'lib/handlers/receiver_handler.rb', line 36 def recv_listen @recv_listen end |
#recv_listen_port ⇒ Object
Receiver listen port
38 39 40 |
# File 'lib/handlers/receiver_handler.rb', line 38 def recv_listen_port @recv_listen_port end |
#selector ⇒ Object
Selector
34 35 36 |
# File 'lib/handlers/receiver_handler.rb', line 34 def selector @selector end |
Instance Method Details
#do_process_reply_to(message) ⇒ Object
Processing reply to reply-to address of message
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/handlers/receiver_handler.rb', line 184 def do_process_reply_to() # If sender for actual reply-to address does not exist unless @senders.include?(.reply_to) # Create new sender for reply-to address @senders[.reply_to] = @receiver.connection.open_sender({ # Set target address :target => .reply_to, # Set auto settle :auto_settle => @auto_settle_off ? false : true, }) end # Set target address of message to be send to reply-to address .address = .reply_to # Increase number of sent messages @sent = @sent + 1 # Send message to reply-to address @senders[.reply_to].send() end |
#on_container_start(container) ⇒ Object
Called when the event loop starts, connects receiver client to SRCommonHandler#broker and creates receiver
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/handlers/receiver_handler.rb', line 113 def on_container_start(container) if @recv_listen # P2P @listener = container.listen("0.0.0.0:#{@recv_listen_port}") else # Broker # Prepare source options source = {} source[:address] = @broker.amqp_address source[:filter] = { :selector => make_apache_selector(@selector)} if @selector # Connecting to broker and creating receiver @receiver = container.connect( # Set broker URI @broker, # Enabled SASL authentication sasl_enabled: @sasl_enabled, # Enabled insecure SASL mechanisms sasl_allow_insecure_mechs: true, # Set allowed SASL mechanisms sasl_allowed_mechs: @sasl_mechs, # Set idle timeout idle_timeout: @idle_timeout, # Set max frame size max_frame_size: @max_frame_size, ).open_receiver( # Set source options :source => source, # Set prefetch :credit_window => @prefetch, ) # If browse messages instead of reading if browse # Set browsing mode @receiver.source.distribution_mode = \ Qpid::Proton::Terminus::DIST_MODE_COPY end end end |
#on_message(delivery, message) ⇒ Object
Called when a message is received, receiving ReceiverHandler#count messages
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/handlers/receiver_handler.rb', line 152 def (delivery, ) @duration.delay("before-receive") { |d| sleep d } exit_timer.reset if exit_timer # Print received message () # If process reply to if @process_reply_to and !.reply_to.nil? self.do_process_reply_to() end # Increase number of received messages @received = @received + 1 # If expected count of messages to be received is not zero # and all expected messages are received if @count > 0 and @received == @count # Set flag indicating that all expected messages were received to true @all_received = true # Close listener when listening if recv_listen # Close listener if not processing reply-to @listener.close unless process_reply_to # Close receiver when not listening, but receiving else # Close receiver delivery.receiver.close # Close connection if not processing reply-to delivery.receiver.connection.close unless process_reply_to end end # if @duration.delay("after-receive") { |d| sleep d } end |
#on_tracker_accept(_tracker) ⇒ Object
Called when the remote peer accepts an outgoing message, accepting ReceiverHandler#sent messages
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/handlers/receiver_handler.rb', line 205 def on_tracker_accept(_tracker) # Increase number of accepted messages @accepted = @accepted + 1 # If all expected messages were received # and all sent messages were accepted if @all_received and @accepted == @sent # Close all senders and their connections @senders.each do |_, i_sender| # Close sender i_sender.close # Close connection of sender i_sender.connection.close end end # if end |