Class: Mqlight::ThreadVars

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/mqlight/thread_vars.rb

Overview

This class handles the inter-communication between the threads The class serves two purposes (1) Central location for all module shared variables. (2) Access control

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, logger

Constructor Details

#initialize(id) ⇒ ThreadVars

Returns a new instance of ThreadVars.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/mqlight/thread_vars.rb', line 39

def initialize(id)
  #
  @id = id
  #
  @thread_vars_mutex = Mutex.new
  @thread_vars_resource = ConditionVariable.new

  @proton = Mqlight::ProtonContainer.new(self, @id)

  @state = :stopped
  @processing_command = false

  # Setup queue for returning acknowledgements or exception
  # from the proton loop to the block caller.
  @reply_queue = Queue.new
  # Setup queue for running any user callbacks in
  @callback_queue = Queue.new
  # The list of subscription for this client.
  @destinations = []
  # Number of reconnections
  @connect_id = 0
  # The error information associated with the last state change
  @last_state_error = nil
end

Instance Attribute Details

#callback_queueObject (readonly)

Returns the value of attribute callback_queue.



31
32
33
# File 'lib/mqlight/thread_vars.rb', line 31

def callback_queue
  @callback_queue
end

#connect_idObject (readonly)

Returns the value of attribute connect_id.



33
34
35
# File 'lib/mqlight/thread_vars.rb', line 33

def connect_id
  @connect_id
end

#last_state_errorObject (readonly)

Returns the value of attribute last_state_error.



35
36
37
# File 'lib/mqlight/thread_vars.rb', line 35

def last_state_error
  @last_state_error
end

#message_queueObject (readonly)

Returns the value of attribute message_queue.



30
31
32
# File 'lib/mqlight/thread_vars.rb', line 30

def message_queue
  @message_queue
end

#protonObject (readonly)

Returns the value of attribute proton.



32
33
34
# File 'lib/mqlight/thread_vars.rb', line 32

def proton
  @proton
end

#reply_queueObject (readonly)

These variable do not need to be protected by mutex.



29
30
31
# File 'lib/mqlight/thread_vars.rb', line 29

def reply_queue
  @reply_queue
end

#state_callback=(value) ⇒ Object (writeonly)

Sets the attribute state_callback

Parameters:

  • value

    the value to set the attribute state_callback to.



34
35
36
# File 'lib/mqlight/thread_vars.rb', line 34

def state_callback=(value)
  @state_callback = value
end

Instance Method Details

#change_state(new_state, reason = nil) ⇒ Object

Handle the change of state and when change Send message to the callback to report back to Client



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/mqlight/thread_vars.rb', line 147

def change_state(new_state, reason = nil)
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }
  parms = Hash[method(__method__).parameters.map do |parm|
    [parm[1], eval(parm[1].to_s)]
  end]
  logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s }
  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } if
     state == new_state

  @last_state_error = reason
  return if @state == new_state
  @state = new_state
  @callback_queue.push([@state_callback, @state, reason]) if
    @state_callback
  @thread_vars_resource.signal

  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
rescue StandardError => e
  logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s }
  raise e
end

#destinationsObject

Return the current connection state.



118
119
120
121
122
# File 'lib/mqlight/thread_vars.rb', line 118

def destinations
  @thread_vars_mutex.synchronize do
    @destinations
  end
end

#processing_command=(new_processing_command) ⇒ Object

Defines in the command thread is processing a command



100
101
102
103
104
# File 'lib/mqlight/thread_vars.rb', line 100

def processing_command=(new_processing_command)
  @thread_vars_mutex.synchronize do
    @processing_command = new_processing_command
  end
end

#processing_command?Boolean

Indicate if the command thread is processing a command.

Returns:

  • (Boolean)


109
110
111
112
113
# File 'lib/mqlight/thread_vars.rb', line 109

def processing_command?
  @thread_vars_mutex.synchronize do
    @processing_command
  end
end

#reconnectedObject



188
189
190
# File 'lib/mqlight/thread_vars.rb', line 188

def reconnected
  @connect_id += 1
end

#serviceObject

Indicate if the command thread is processing a command.



127
128
129
130
131
# File 'lib/mqlight/thread_vars.rb', line 127

def service
  @thread_vars_mutex.synchronize do
    @service_tools
  end
end

#service=(service_tools) ⇒ Object

Set the connections service URL



136
137
138
139
140
# File 'lib/mqlight/thread_vars.rb', line 136

def service=(service_tools)
  @thread_vars_mutex.synchronize do
    @service_tools = service_tools
  end
end

#stateObject

Return the current connection state.



80
81
82
83
84
# File 'lib/mqlight/thread_vars.rb', line 80

def state
  @thread_vars_mutex.synchronize do
    @state
  end
end

#state=(new_state) ⇒ Object

Will update the connection state and notify those interest in the change.



68
69
70
71
72
73
74
75
# File 'lib/mqlight/thread_vars.rb', line 68

def state=(new_state)
  @thread_vars_mutex.synchronize do
    if @state != new_state
      @state = new_state
      @thread_vars_resource.signal
    end
  end
end

#subscriptions_clearObject



179
180
181
182
183
# File 'lib/mqlight/thread_vars.rb', line 179

def subscriptions_clear
  logger.entry(@id) { self.class.to_s + '#' + __method__.to_s }
  @destinations = []
  logger.exit(@id) { self.class.to_s + '#' + __method__.to_s }
end

#subscriptions_present?Boolean

Returns:

  • (Boolean)


172
173
174
# File 'lib/mqlight/thread_vars.rb', line 172

def subscriptions_present?
  !@destinations.empty?
end

#wait_for_state_change(timeout) ⇒ Object

Will block the calling thread until there is a change of connection state.



90
91
92
93
94
95
# File 'lib/mqlight/thread_vars.rb', line 90

def wait_for_state_change(timeout)
  @thread_vars_mutex.synchronize do
    @thread_vars_resource.wait(@thread_vars_mutex, timeout)
  end
  @state
end