Class: Mqlight::Connection

Inherits:
Object
  • Object
show all
Includes:
Logging, Qpid::Proton::Util::ErrorHandler
Defined in:
lib/mqlight/connection.rb

Overview

This class monitors the following:

  • the link state

  • out of sequence error message from the server.

If the link state enters :retrying then the classes thread will periocally attempt to reconnect providing

  • There are active subscriptions.

  • A request awaiting to be processed.

Constant Summary collapse

DEFER_TABLE =

This table defines the rate at which a lost connection is attempted to be recovered. The values is in seconds

[1, 2, 4, 8, 16, 32, 60]

Instance Method Summary collapse

Methods included from Logging

#logger, logger

Methods included from Qpid::Proton::Util::ErrorHandler

#can_raise_error, #check_for_error, #create_exception_handler_wrapper, included

Constructor Details

#initialize(args) ⇒ Connection

Returns a new instance of Connection.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/mqlight/connection.rb', line 36

def initialize(args)
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }
  parms = Hash[method(__method__).parameters.map do |parm|
    [parm[1], eval(parm[1].to_s)]
  end]
  logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s }

  @args = args
  @thread_vars = args[:thread_vars]
  @service = args[:service]
  @user = args[:user]
  @password = args[:password]

  @connect_mutex = Mutex.new
  @connect_resource = ConditionVariable.new

  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
end

Instance Method Details

#close_end_pointObject



237
238
239
240
241
242
243
244
# File 'lib/mqlight/connection.rb', line 237

def close_end_point
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }
  unless @end_point.nil?
    @end_point.stop_threads
    @end_point = nil
  end
  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
end

#connect_to_a_serverObject

Will attempt to connect one of the services and will return with state Started : Successful connection, @service will have URI Stopped : Failed conn. server in @service rejected the connection request Retry : All attempt had network conn. failures; @service has last URI.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/mqlight/connection.rb', line 62

def connect_to_a_server
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }

  @service_list = []
  begin
    @service_list = Util.generate_services(@service, @user, @password)
  rescue Mqlight::NetworkError => ne
    logger.data(@id, 'Failed connection to ' + @service.to_s +
                      ' because ' + ne.to_s) do
      self.class.to_s + '#' + __method__.to_s
    end
    @thread_vars.change_state(:retrying, ne)
  rescue StandardError => se
    logger.data(@id, 'Failed to generate service list from ' +
            @service.to_s + ' because ' + se.to_s) do
      self.class.to_s + '#' + __method__.to_s
    end
    @thread_vars.change_state(:stopped, se)
  end

  items_left_in_service_list = @service_list.length
  @thread_vars.change_state(:starting) if items_left_in_service_list> 0

  @service_list.each do |service|
    @thread_vars.service = Service.new(service, @user, @password)
    begin
      items_left_in_service_list -= 1

      # If old one present then drop it
      close_end_point unless @end_point.nil?

      if @thread_vars.service.ssl?
        @end_point = SecureEndPoint.new(@args)
      else
        @end_point = UnsecureEndPoint.new(@args)
      end

      # Define the connection parameters
      @thread_vars.proton.connect(@thread_vars.service)
      # Start the bottom level to handle the socket.
      @end_point.start_connection_threads
      # Initiate the connection sequence.
      @thread_vars.proton.wait_messenger_started(@thread_vars.service)

      # Assign the service if we start successfully (without auth info)
      logger.data(@id, 'Success connection to ' + \
                       @thread_vars.service.to_s) do
        self.class.to_s + '#' + __method__.to_s
      end

      # Reinstate the active subscriptions
      @thread_vars.proton.reinstate_links \
        if @thread_vars.subscriptions_present?

      @thread_vars.change_state(:started)

    rescue Mqlight::NetworkError => ne
      logger.data(@id, 'Failed connection to ' + @thread_vars.service.to_s +
                        ' because ' + ne.to_s) do
        self.class.to_s + '#' + __method__.to_s
      end
      # Only report this on the last service in the list.
      @thread_vars.change_state(:retrying, ne) \
        if items_left_in_service_list <= 0
    rescue Mqlight::SecurityError => se
      logger.data(@id, 'Failed connection to ' + @thread_vars.service.to_s +
                        ' because ' + se.to_s) do
        self.class.to_s + '#' + __method__.to_s
      end
      @thread_vars.change_state(:stopped, se)
    rescue Mqlight::ReplacedError => re
      logger.data(@id, 'Failed connection to ' + @thread_vars.service.to_s +
                        ' because ' + re.to_s) do
        self.class.to_s + '#' + __method__.to_s
      end
      @thread_vars.change_state(:stopped, re)
    rescue Mqlight::SubscribedError => sub
      logger.data(@id, 'Failed reinstate a subscription ' +
                  @thread_vars.service.to_s + ' because ' + sub.to_s) do
        self.class.to_s + '#' + __method__.to_s
      end
      @thread_vars.change_state(:stopped, sub)
    ensure
      if stopped?
        close_end_point unless @end_point.nil?
      end
    end

    # Terminate loop if at final state.
    break if stopped? || started?
  end
  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
end

#proton_loopObject

The main class thread method. This method will only return when the link state becomes ‘:stopped where the thread will die.



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/mqlight/connection.rb', line 199

def proton_loop
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }

  defer_pos = 0
  max_defer_index = DEFER_TABLE.length - 1
  until stopped?
    if starting? || (
        retrying? &&
        (
          @thread_vars.subscriptions_present? ||
          @thread_vars.processing_command?
        )
    )
      connect_to_a_server
      defer_pos += 1 if defer_pos < max_defer_index
    else
      # As link is up .. reset to first value.
      defer_pos = 0
    end

    # Monitor oos messages
    @thread_vars.proton.check_for_out_of_sequence_messages

    # Need a pause
    @connect_mutex.synchronize do
      @connect_resource.wait(@connect_mutex, DEFER_TABLE[defer_pos])
    end
  end

  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
rescue StandardError => e
  logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s }
  raise e
end

#retrying?Boolean

Returns:

  • (Boolean)


270
271
272
# File 'lib/mqlight/connection.rb', line 270

def retrying?
  @thread_vars.state == :retrying
end

#start_threadObject

Initiate and start the link monitoring thread.



159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/mqlight/connection.rb', line 159

def start_thread
  @proton_thread = Thread.new do
    Thread.current['name'] = 'proton_thread'
    begin
      proton_loop
      logger.data(@id, 'Proton loop terminating') do
        self.class.to_s + '#' + __method__.to_s
      end
    rescue => e
      logger.ffdc(self.class.to_s + '#' + __method__.to_s,
                  'ffdc001', self, 'Uncaught exception', e)
    end
  end
end

#started?Boolean

Returns:

  • (Boolean)


256
257
258
# File 'lib/mqlight/connection.rb', line 256

def started?
  @thread_vars.state == :started
end

#starting?Boolean

Returns:

  • (Boolean)


277
278
279
# File 'lib/mqlight/connection.rb', line 277

def starting?
  @thread_vars.state == :starting
end

#stop_threadObject

Issue stop and wait for all thread to terminate



177
178
179
180
181
182
183
184
185
186
187
# File 'lib/mqlight/connection.rb', line 177

def stop_thread
  @proton_thread.join
  close_end_point
rescue StandardError => e
  # This is required for the rspec unit tests.
  # and is due to the fact some of the thread
  # take time to closedown
  logger.data(@id, "Thrown error #{e}") do
    self.class.to_s + '#' + __method__.to_s
  end
end

#stopped?Boolean

Returns:

  • (Boolean)


263
264
265
# File 'lib/mqlight/connection.rb', line 263

def stopped?
  @thread_vars.state == :stopped
end

#wakeupObject



249
250
251
# File 'lib/mqlight/connection.rb', line 249

def wakeup
  @connect_resource.signal
end