Class: Mqlight::Command

Inherits:
Object
  • Object
show all
Includes:
Logging, Qpid::Proton::Util::ErrorHandler
Defined in:
lib/mqlight/command.rb

Overview

This class handles the inter-communication between the threads

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, logger

Methods included from Qpid::Proton::Util::ErrorHandler

#can_raise_error, #check_for_error, #create_exception_handler_wrapper, included

Constructor Details

#initialize(args) ⇒ Command

Returns a new instance of Command.



33
34
35
36
37
38
39
40
41
42
# File 'lib/mqlight/command.rb', line 33

def initialize(args)
  @id = args[:id]
  @thread_vars = args[:thread_vars]

  # Setup queue for sending request to the command thread
  @request_queue = Queue.new
  @request_queue_mutex = Mutex.new
  @request_queue_resource = ConditionVariable.new
  @shutdown = false
end

Instance Attribute Details

#request_queueObject (readonly)

Returns the value of attribute request_queue.



26
27
28
# File 'lib/mqlight/command.rb', line 26

def request_queue
  @request_queue
end

#request_queue_mutexObject (readonly)

Returns the value of attribute request_queue_mutex.



27
28
29
# File 'lib/mqlight/command.rb', line 27

def request_queue_mutex
  @request_queue_mutex
end

#request_queue_resourceObject (readonly)

Returns the value of attribute request_queue_resource.



28
29
30
# File 'lib/mqlight/command.rb', line 28

def request_queue_resource
  @request_queue_resource
end

Instance Method Details

#check_for_messages(destination, timeout = nil) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/mqlight/command.rb', line 183

def check_for_messages(destination, timeout = nil)
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }
  parms = Hash[method(__method__).parameters.map do |parm|
    [parm[1], eval(parm[1].to_s)]
  end]
  logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s }

  @thread_vars.proton.check_for_out_of_sequence_messages
  link = @thread_vars.proton.open_for_message(destination)
  fail Mqlight::InternalError,
       'No link for ' + destination.to_s + ' could be found' if link.nil?

  message_present = false
  unless link.nil?
    begin
      Timeout.timeout(timeout) do
        sleep(0.1) until @thread_vars.proton.message? || !started?
        message_present = true if started?
      end
    rescue Timeout::Error
      logger.data(@id, 'Timeout received inside checking_for_messages') do
        self.class.to_s + '#' + __method__.to_s
      end
      message_present = @thread_vars.proton.drain_message(link) if started?
    end
  end

  unless message_present
    @thread_vars.reply_queue.push(nil)
    logger.exit(@id, 'none') { self.class.to_s + '#' + __method__.to_s }
    return
  end

  # Collect the message
  msg = @thread_vars.proton.collect_message
  message = Mqlight::Delivery.new(msg, destination, @thread_vars)

  @thread_vars.reply_queue.push(message)

  # QoS 0
  @thread_vars.proton.accept(link) if destination.qos == QOS_AT_MOST_ONCE

  # QoS 1 / auto-confirm
  @thread_vars.proton.settle(link) if
    destination.qos == QOS_AT_LEAST_ONCE &&
    destination.auto_confirm

  logger.exit(@id, 'Present') { self.class.to_s + '#' + __method__.to_s }
rescue StandardError => e
  logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s }
  raise e
end

#command_loopObject

The request processing loop for the command thread. This method loops awaiting for requests to be process Should there be a request present but the link is in the retry state this method will wait notification of when the link is reinstate. If the link is closed (stopped) then this method returns, then the thread dies.



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/mqlight/command.rb', line 350

def command_loop
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }

  shutting_down = false
  until shutting_down
    @request_queue_mutex.synchronize do
      # Wait for a command request
      while @request_queue.empty?
        logger.data(@id,
                    'Command loop waiting for command') do
          self.class.to_s + '#' + __method__.to_s
        end
        # Wait for a trigger from the outer thread(Blocking_client).
        @request_queue_resource.wait(@request_queue_mutex)
        return if stopped?
      end

      # Process all the requests on the queue.
      process_request_queue unless @request_queue.empty?

      # Signal client command completed.
      @request_queue_resource.signal

      if stopped? then
        shutting_down = true
        @shutdown = true
      end
    end
  end

  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
rescue StandardError => e
  logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s }
  raise e
end

#errorObject



417
418
419
# File 'lib/mqlight/command.rb', line 417

def error
  @thread_vars.proton.error
end

#joinObject

Blocks until the command thread has terminated.



258
259
260
261
262
263
264
265
266
267
268
# File 'lib/mqlight/command.rb', line 258

def join
  @request_queue_mutex.synchronize do
    @request_queue_resource.signal
  end

  Timeout.timeout(5) do
    @command_thread.join
  end
rescue Timeout::Error
  @command_thread.kill
end

#process_queued_send(msg, qos) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/mqlight/command.rb', line 64

def process_queued_send(msg, qos)
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }
  parms = Hash[method(__method__).parameters.map do |parm|
    [parm[1], eval(parm[1].to_s)]
  end]
  logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s }

  @thread_vars.proton.put_message(msg, qos)

  sleep(0.02) while @thread_vars.proton.outbound_pending?

  # Push back a message: nil if no problem detected otherwise an exception
  # describing the issue.
  exception = nil
  status = @thread_vars.proton.tracker_status
  while status == Cproton::PN_STATUS_PENDING && started?
    sleep(0.02)
    status = @thread_vars.proton.tracker_status
  end
  fail RetryError,'Change of state from started detected' unless started?

  case status
  when Cproton::PN_STATUS_ACCEPTED
    # No action
  when Cproton::PN_STATUS_SETTLED
    # No action
  when Cproton::PN_STATUS_REJECTED
    reject_msg = @thread_vars.proton.tracker_condition_description(
      'send failed - message was rejected')
    exception  = Mqlight::ExceptionContainer.new(
      RangeError.new(reject_msg))
  when Cproton::PN_STATUS_RELEASED
    exception = Mqlight::ExceptionContainer.new(
      Mqlight::InternalError.new(
        'send failed - message was released'))
  when Cproton::PN_STATUS_MODIFIED
    exception = Mqlight::ExceptionContainer.new(
      Mqlight::InternalError.new(
        'send failed - message was modified'))
  when Cproton::PN_STATUS_ABORTED
    # An abortion is assumed to be a lost of disconnect
    # and therefore mark the request as a retry,
    fail Mqlight::NetworkError, 'send failed - message was aborted'
  when Cproton::PN_STATUS_PENDING
    # No action
  when 0
    # ignoring these as appear to be
    # generated by 'rspec'
  else
    exception = Mqlight::ExceptionContainer.new(
      Mqlight::InternalError.new(
        "send failed - unknown status #{status}"))
  end
  @thread_vars.reply_queue.push(exception)
  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
rescue Qpid::Proton::TimeoutError
  # Specific capture of the QPid timeout condition
  # Reply back to user with TimeoutError.
  @thread_vars.reply_queue.push(
    TimeoutError.new(
      'Send request did not complete within the requested period'))
rescue Qpid::Proton::ProtonError => error
  @thread_vars.reply_queue.push(
    Mqlight::ExceptionContainer.new(
      Mqlight::InternalError.new(error)))
rescue StandardError => e
  logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s }
  raise e
end

#process_queued_subscription(destination) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/mqlight/command.rb', line 135

def process_queued_subscription(destination)
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }
  parms = Hash[method(__method__).parameters.map do |parm|
    [parm[1], eval(parm[1].to_s)]
  end]
  logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s }

  link = @thread_vars.proton.create_subscription destination

  # block until link is active or error condition detected
  exception = nil
  begin
    until @thread_vars.proton.link_up?(link)
      # Short pause
      sleep 0.1
    end
  rescue StandardError => e
    exception = e
  end

  # Return the acknowledgement
  @thread_vars.reply_queue.push(exception)

  @thread_vars.destinations.push(destination)
  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
rescue StandardError => e
  logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s }
  raise e
end

#process_queued_unsubscribe(destination, ttl) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/mqlight/command.rb', line 166

def process_queued_unsubscribe(destination, ttl)
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }
  parms = Hash[method(__method__).parameters.map do |parm|
    [parm[1], eval(parm[1].to_s)]
  end]
  logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s }

  # find and close the link
  @thread_vars.proton.close_link(destination, ttl)

  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
rescue StandardError => e
  logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s }
  raise e
end

#process_request_queueObject

Process all the requests on the queue.



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/mqlight/command.rb', line 273

def process_request_queue
  @thread_vars.processing_command = true
  op = @request_queue.pop(true)
  timeout = op[:timeout]
  # Receive timeout handled inside 'check_for_messages'
  timeout = nil if op[:action] == 'receive'
  Timeout.timeout(timeout) do
    until op.nil?
      begin
        # Waiting while proton thread is trying to
        # recover the connection.
        @thread_vars.wait_for_state_change(nil) while retrying? || starting?
        return if stopped?

        # Process request
        case op[:action]
        when 'send'
          process_queued_send op[:params], op[:qos]
        when 'subscribe'
          process_queued_subscription op[:params]
        when 'unsubscribe'
          process_queued_unsubscribe op[:params], op[:ttl]
        when 'receive'
          check_for_messages(op[:destination], op[:timeout])
        end

        # Request has been completed.
        op = nil

      rescue Mqlight::NetworkError
        # The request has failed due to the connection
        # to the server failing. Change the state to
        # :retrying in case the proton thread hasn't
        # work it out yet.
        @thread_vars.change_state(:retrying)
      rescue Qpid::Proton::StateError
        # The request has failed due to the connection
        # to the server failing. Change the state to
        # :retrying in case the proton thread hasn't
        # work it out yet.
        @thread_vars.proton.check_for_out_of_sequence_messages
      rescue RetryError
        # No action as the default will be to wait for connect
        # and retry.
        logger.data(@id, "Retry error detected") do
          self.class.to_s + '#' + __method__.to_s
        end
      end
    end
  end
rescue Timeout::Error
  logger.data(@id, "Request #{op[:action]} terminated by timeout") do
    self.class.to_s + '#' + __method__.to_s
  end
  # The command request has timed out, report back to
  # outer thread.
  @thread_vars.reply_queue.push(
    Mqlight::ExceptionContainer.new(
      Mqlight::TimeoutError.new('Command timeout has expired')))
rescue => e
  # A catch all for reporting to a FFDC
  logger.ffdc(self.class.to_s + '#' + __method__.to_s,
              'ffdc001', self, 'Uncaught exception', e)
  @thread_vars.reply_queue.push(Mqlight::ExceptionContainer.new(e))
ensure
  @thread_vars.processing_command = false
end

#push_request(hash) ⇒ Object

Pushes the specified request onto the request queue and waits for it to be sent.



390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File 'lib/mqlight/command.rb', line 390

def push_request(hash)
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }
  parms = Hash[method(__method__).parameters.map do |parm|
    [parm[1], eval(parm[1].to_s)]
  end]
  logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s }

  @request_queue_mutex.synchronize do
    if @shutdown then
      fail Mqlight::StoppedError, 'Client in stopped state'
    end

    @request_queue.push(hash)
    @request_queue_resource.signal
    # Wait for the command to be taken.
    until @request_queue.empty?
      @request_queue_resource.wait(@request_queue_mutex)
    end
    @request_queue_resource.signal
  end

  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
end

#retrying?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/mqlight/command.rb', line 54

def retrying?
  @thread_vars.state == :retrying
end

#start_threadObject

Generates and starts the command thread.



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/mqlight/command.rb', line 239

def start_thread
  # Command handle thread.
  @command_thread = Thread.new do
    Thread.current['name'] = 'command_thread'
    begin
      command_loop
      logger.data(@id, 'Command loop terminating') do
        self.class.to_s + '#' + __method__.to_s
      end
    rescue => e
      logger.ffdc(self.class.to_s + '#' + __method__.to_s,
                  'ffdc002', self, 'Uncaught exception', e)
    end
  end
end

#started?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/mqlight/command.rb', line 44

def started?
  @thread_vars.state == :started
end

#starting?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/mqlight/command.rb', line 59

def starting?
  @thread_vars.state == :starting
end

#stopped?Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/mqlight/command.rb', line 49

def stopped?
  @thread_vars.state == :stopped
end