Class: Mqlight::Command
- Inherits:
-
Object
- Object
- Mqlight::Command
- Includes:
- Logging, Qpid::Proton::Util::ErrorHandler
- Defined in:
- lib/mqlight/command.rb
Overview
This class handles the inter-communication between the threads
Instance Attribute Summary collapse
-
#request_queue ⇒ Object
readonly
Returns the value of attribute request_queue.
-
#request_queue_mutex ⇒ Object
readonly
Returns the value of attribute request_queue_mutex.
-
#request_queue_resource ⇒ Object
readonly
Returns the value of attribute request_queue_resource.
Instance Method Summary collapse
- #check_for_messages(destination, timeout = nil) ⇒ Object
-
#command_loop ⇒ Object
The request processing loop for the command thread.
- #error ⇒ Object
-
#initialize(args) ⇒ Command
constructor
A new instance of Command.
-
#join ⇒ Object
Blocks until the command thread has terminated.
- #process_queued_send(msg, qos) ⇒ Object
- #process_queued_subscription(destination) ⇒ Object
- #process_queued_unsubscribe(destination, ttl) ⇒ Object
-
#process_request_queue ⇒ Object
Process all the requests on the queue.
-
#push_request(hash) ⇒ Object
Pushes the specified request onto the request queue and waits for it to be sent.
- #retrying? ⇒ Boolean
-
#start_thread ⇒ Object
Generates and starts the command thread.
- #started? ⇒ Boolean
- #starting? ⇒ Boolean
- #stopped? ⇒ Boolean
Methods included from Logging
Methods included from Qpid::Proton::Util::ErrorHandler
#can_raise_error, #check_for_error, #create_exception_handler_wrapper, included
Constructor Details
#initialize(args) ⇒ Command
Returns a new instance of Command.
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/mqlight/command.rb', line 33 def initialize(args) @id = args[:id] @thread_vars = args[:thread_vars] # Setup queue for sending request to the command thread @request_queue = Queue.new @request_queue_mutex = Mutex.new @request_queue_resource = ConditionVariable.new @shutdown = false end |
Instance Attribute Details
#request_queue ⇒ Object (readonly)
Returns the value of attribute request_queue.
26 27 28 |
# File 'lib/mqlight/command.rb', line 26 def request_queue @request_queue end |
#request_queue_mutex ⇒ Object (readonly)
Returns the value of attribute request_queue_mutex.
27 28 29 |
# File 'lib/mqlight/command.rb', line 27 def request_queue_mutex @request_queue_mutex end |
#request_queue_resource ⇒ Object (readonly)
Returns the value of attribute request_queue_resource.
28 29 30 |
# File 'lib/mqlight/command.rb', line 28 def request_queue_resource @request_queue_resource end |
Instance Method Details
#check_for_messages(destination, timeout = nil) ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/mqlight/command.rb', line 183 def (destination, timeout = nil) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } @thread_vars.proton. link = @thread_vars.proton.(destination) fail Mqlight::InternalError, 'No link for ' + destination.to_s + ' could be found' if link.nil? = false unless link.nil? begin Timeout.timeout(timeout) do sleep(0.1) until @thread_vars.proton. || !started? = true if started? end rescue Timeout::Error logger.data(@id, 'Timeout received inside checking_for_messages') do self.class.to_s + '#' + __method__.to_s end = @thread_vars.proton.(link) if started? end end unless @thread_vars.reply_queue.push(nil) logger.exit(@id, 'none') { self.class.to_s + '#' + __method__.to_s } return end # Collect the message msg = @thread_vars.proton. = Mqlight::Delivery.new(msg, destination, @thread_vars) @thread_vars.reply_queue.push() # QoS 0 @thread_vars.proton.accept(link) if destination.qos == QOS_AT_MOST_ONCE # QoS 1 / auto-confirm @thread_vars.proton.settle(link) if destination.qos == QOS_AT_LEAST_ONCE && destination.auto_confirm logger.exit(@id, 'Present') { self.class.to_s + '#' + __method__.to_s } rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
#command_loop ⇒ Object
The request processing loop for the command thread. This method loops awaiting for requests to be process Should there be a request present but the link is in the retry state this method will wait notification of when the link is reinstate. If the link is closed (stopped) then this method returns, then the thread dies.
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 |
# File 'lib/mqlight/command.rb', line 350 def command_loop logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } shutting_down = false until shutting_down @request_queue_mutex.synchronize do # Wait for a command request while @request_queue.empty? logger.data(@id, 'Command loop waiting for command') do self.class.to_s + '#' + __method__.to_s end # Wait for a trigger from the outer thread(Blocking_client). @request_queue_resource.wait(@request_queue_mutex) return if stopped? end # Process all the requests on the queue. process_request_queue unless @request_queue.empty? # Signal client command completed. @request_queue_resource.signal if stopped? then shutting_down = true @shutdown = true end end end logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
#error ⇒ Object
417 418 419 |
# File 'lib/mqlight/command.rb', line 417 def error @thread_vars.proton.error end |
#join ⇒ Object
Blocks until the command thread has terminated.
258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/mqlight/command.rb', line 258 def join @request_queue_mutex.synchronize do @request_queue_resource.signal end Timeout.timeout(5) do @command_thread.join end rescue Timeout::Error @command_thread.kill end |
#process_queued_send(msg, qos) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/mqlight/command.rb', line 64 def process_queued_send(msg, qos) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } @thread_vars.proton.(msg, qos) sleep(0.02) while @thread_vars.proton.outbound_pending? # Push back a message: nil if no problem detected otherwise an exception # describing the issue. exception = nil status = @thread_vars.proton.tracker_status while status == Cproton::PN_STATUS_PENDING && started? sleep(0.02) status = @thread_vars.proton.tracker_status end fail RetryError,'Change of state from started detected' unless started? case status when Cproton::PN_STATUS_ACCEPTED # No action when Cproton::PN_STATUS_SETTLED # No action when Cproton::PN_STATUS_REJECTED reject_msg = @thread_vars.proton.tracker_condition_description( 'send failed - message was rejected') exception = Mqlight::ExceptionContainer.new( RangeError.new(reject_msg)) when Cproton::PN_STATUS_RELEASED exception = Mqlight::ExceptionContainer.new( Mqlight::InternalError.new( 'send failed - message was released')) when Cproton::PN_STATUS_MODIFIED exception = Mqlight::ExceptionContainer.new( Mqlight::InternalError.new( 'send failed - message was modified')) when Cproton::PN_STATUS_ABORTED # An abortion is assumed to be a lost of disconnect # and therefore mark the request as a retry, fail Mqlight::NetworkError, 'send failed - message was aborted' when Cproton::PN_STATUS_PENDING # No action when 0 # ignoring these as appear to be # generated by 'rspec' else exception = Mqlight::ExceptionContainer.new( Mqlight::InternalError.new( "send failed - unknown status #{status}")) end @thread_vars.reply_queue.push(exception) logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } rescue Qpid::Proton::TimeoutError # Specific capture of the QPid timeout condition # Reply back to user with TimeoutError. @thread_vars.reply_queue.push( TimeoutError.new( 'Send request did not complete within the requested period')) rescue Qpid::Proton::ProtonError => error @thread_vars.reply_queue.push( Mqlight::ExceptionContainer.new( Mqlight::InternalError.new(error))) rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
#process_queued_subscription(destination) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/mqlight/command.rb', line 135 def process_queued_subscription(destination) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } link = @thread_vars.proton.create_subscription destination # block until link is active or error condition detected exception = nil begin until @thread_vars.proton.link_up?(link) # Short pause sleep 0.1 end rescue StandardError => e exception = e end # Return the acknowledgement @thread_vars.reply_queue.push(exception) @thread_vars.destinations.push(destination) logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
#process_queued_unsubscribe(destination, ttl) ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/mqlight/command.rb', line 166 def process_queued_unsubscribe(destination, ttl) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } # find and close the link @thread_vars.proton.close_link(destination, ttl) logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
#process_request_queue ⇒ Object
Process all the requests on the queue.
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
# File 'lib/mqlight/command.rb', line 273 def process_request_queue @thread_vars.processing_command = true op = @request_queue.pop(true) timeout = op[:timeout] # Receive timeout handled inside 'check_for_messages' timeout = nil if op[:action] == 'receive' Timeout.timeout(timeout) do until op.nil? begin # Waiting while proton thread is trying to # recover the connection. @thread_vars.wait_for_state_change(nil) while || starting? return if stopped? # Process request case op[:action] when 'send' process_queued_send op[:params], op[:qos] when 'subscribe' process_queued_subscription op[:params] when 'unsubscribe' process_queued_unsubscribe op[:params], op[:ttl] when 'receive' (op[:destination], op[:timeout]) end # Request has been completed. op = nil rescue Mqlight::NetworkError # The request has failed due to the connection # to the server failing. Change the state to # :retrying in case the proton thread hasn't # work it out yet. @thread_vars.change_state(:retrying) rescue Qpid::Proton::StateError # The request has failed due to the connection # to the server failing. Change the state to # :retrying in case the proton thread hasn't # work it out yet. @thread_vars.proton. rescue RetryError # No action as the default will be to wait for connect # and retry. logger.data(@id, "Retry error detected") do self.class.to_s + '#' + __method__.to_s end end end end rescue Timeout::Error logger.data(@id, "Request #{op[:action]} terminated by timeout") do self.class.to_s + '#' + __method__.to_s end # The command request has timed out, report back to # outer thread. @thread_vars.reply_queue.push( Mqlight::ExceptionContainer.new( Mqlight::TimeoutError.new('Command timeout has expired'))) rescue => e # A catch all for reporting to a FFDC logger.ffdc(self.class.to_s + '#' + __method__.to_s, 'ffdc001', self, 'Uncaught exception', e) @thread_vars.reply_queue.push(Mqlight::ExceptionContainer.new(e)) ensure @thread_vars.processing_command = false end |
#push_request(hash) ⇒ Object
Pushes the specified request onto the request queue and waits for it to be sent.
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 |
# File 'lib/mqlight/command.rb', line 390 def push_request(hash) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } @request_queue_mutex.synchronize do if @shutdown then fail Mqlight::StoppedError, 'Client in stopped state' end @request_queue.push(hash) @request_queue_resource.signal # Wait for the command to be taken. until @request_queue.empty? @request_queue_resource.wait(@request_queue_mutex) end @request_queue_resource.signal end logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } end |
#retrying? ⇒ Boolean
54 55 56 |
# File 'lib/mqlight/command.rb', line 54 def @thread_vars.state == :retrying end |
#start_thread ⇒ Object
Generates and starts the command thread.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/mqlight/command.rb', line 239 def start_thread # Command handle thread. @command_thread = Thread.new do Thread.current['name'] = 'command_thread' begin command_loop logger.data(@id, 'Command loop terminating') do self.class.to_s + '#' + __method__.to_s end rescue => e logger.ffdc(self.class.to_s + '#' + __method__.to_s, 'ffdc002', self, 'Uncaught exception', e) end end end |
#started? ⇒ Boolean
44 45 46 |
# File 'lib/mqlight/command.rb', line 44 def started? @thread_vars.state == :started end |
#starting? ⇒ Boolean
59 60 61 |
# File 'lib/mqlight/command.rb', line 59 def starting? @thread_vars.state == :starting end |
#stopped? ⇒ Boolean
49 50 51 |
# File 'lib/mqlight/command.rb', line 49 def stopped? @thread_vars.state == :stopped end |