Class: Mqlight::ThreadVars
- Inherits:
-
Object
- Object
- Mqlight::ThreadVars
- Includes:
- Logging
- Defined in:
- lib/mqlight/thread_vars.rb
Overview
This class handles the inter-communication between the threads The class serves two purposes (1) Central location for all module shared variables. (2) Access control
Instance Attribute Summary collapse
-
#callback_queue ⇒ Object
readonly
Returns the value of attribute callback_queue.
-
#connect_id ⇒ Object
readonly
Returns the value of attribute connect_id.
-
#last_state_error ⇒ Object
readonly
Returns the value of attribute last_state_error.
-
#message_queue ⇒ Object
readonly
Returns the value of attribute message_queue.
-
#proton ⇒ Object
readonly
Returns the value of attribute proton.
-
#reply_queue ⇒ Object
readonly
These variable do not need to be protected by mutex.
-
#state_callback ⇒ Object
writeonly
Sets the attribute state_callback.
Instance Method Summary collapse
-
#change_state(new_state, reason = nil) ⇒ Object
Handle the change of state and when change Send message to the callback to report back to Client.
-
#destinations ⇒ Object
Return the current connection state.
-
#initialize(id) ⇒ ThreadVars
constructor
A new instance of ThreadVars.
-
#processing_command=(new_processing_command) ⇒ Object
Defines in the command thread is processing a command.
-
#processing_command? ⇒ Boolean
Indicate if the command thread is processing a command.
- #reconnected ⇒ Object
-
#service ⇒ Object
Indicate if the command thread is processing a command.
-
#service=(service_tools) ⇒ Object
Set the connections service URL.
-
#state ⇒ Object
Return the current connection state.
-
#state=(new_state) ⇒ Object
Will update the connection state and notify those interest in the change.
- #subscriptions_clear ⇒ Object
- #subscriptions_present? ⇒ Boolean
-
#wait_for_state_change(timeout) ⇒ Object
Will block the calling thread until there is a change of connection state.
Methods included from Logging
Constructor Details
#initialize(id) ⇒ ThreadVars
Returns a new instance of ThreadVars.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/mqlight/thread_vars.rb', line 39 def initialize(id) # @id = id # @thread_vars_mutex = Mutex.new @thread_vars_resource = ConditionVariable.new @proton = Mqlight::ProtonContainer.new(self, @id) @state = :stopped @processing_command = false # Setup queue for returning acknowledgements or exception # from the proton loop to the block caller. @reply_queue = Queue.new # Setup queue for running any user callbacks in @callback_queue = Queue.new # The list of subscription for this client. @destinations = [] # Number of reconnections @connect_id = 0 # The error information associated with the last state change @last_state_error = nil end |
Instance Attribute Details
#callback_queue ⇒ Object (readonly)
Returns the value of attribute callback_queue.
31 32 33 |
# File 'lib/mqlight/thread_vars.rb', line 31 def callback_queue @callback_queue end |
#connect_id ⇒ Object (readonly)
Returns the value of attribute connect_id.
33 34 35 |
# File 'lib/mqlight/thread_vars.rb', line 33 def connect_id @connect_id end |
#last_state_error ⇒ Object (readonly)
Returns the value of attribute last_state_error.
35 36 37 |
# File 'lib/mqlight/thread_vars.rb', line 35 def last_state_error @last_state_error end |
#message_queue ⇒ Object (readonly)
Returns the value of attribute message_queue.
30 31 32 |
# File 'lib/mqlight/thread_vars.rb', line 30 def @message_queue end |
#proton ⇒ Object (readonly)
Returns the value of attribute proton.
32 33 34 |
# File 'lib/mqlight/thread_vars.rb', line 32 def proton @proton end |
#reply_queue ⇒ Object (readonly)
These variable do not need to be protected by mutex.
29 30 31 |
# File 'lib/mqlight/thread_vars.rb', line 29 def reply_queue @reply_queue end |
#state_callback=(value) ⇒ Object (writeonly)
Sets the attribute state_callback
34 35 36 |
# File 'lib/mqlight/thread_vars.rb', line 34 def state_callback=(value) @state_callback = value end |
Instance Method Details
#change_state(new_state, reason = nil) ⇒ Object
Handle the change of state and when change Send message to the callback to report back to Client
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/mqlight/thread_vars.rb', line 147 def change_state(new_state, reason = nil) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } if state == new_state @last_state_error = reason return if @state == new_state @state = new_state @callback_queue.push([@state_callback, @state, reason]) if @state_callback @thread_vars_resource.signal logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
#destinations ⇒ Object
Return the current connection state.
118 119 120 121 122 |
# File 'lib/mqlight/thread_vars.rb', line 118 def destinations @thread_vars_mutex.synchronize do @destinations end end |
#processing_command=(new_processing_command) ⇒ Object
Defines in the command thread is processing a command
100 101 102 103 104 |
# File 'lib/mqlight/thread_vars.rb', line 100 def processing_command=(new_processing_command) @thread_vars_mutex.synchronize do @processing_command = new_processing_command end end |
#processing_command? ⇒ Boolean
Indicate if the command thread is processing a command.
109 110 111 112 113 |
# File 'lib/mqlight/thread_vars.rb', line 109 def processing_command? @thread_vars_mutex.synchronize do @processing_command end end |
#reconnected ⇒ Object
188 189 190 |
# File 'lib/mqlight/thread_vars.rb', line 188 def reconnected @connect_id += 1 end |
#service ⇒ Object
Indicate if the command thread is processing a command.
127 128 129 130 131 |
# File 'lib/mqlight/thread_vars.rb', line 127 def service @thread_vars_mutex.synchronize do @service_tools end end |
#service=(service_tools) ⇒ Object
Set the connections service URL
136 137 138 139 140 |
# File 'lib/mqlight/thread_vars.rb', line 136 def service=(service_tools) @thread_vars_mutex.synchronize do @service_tools = service_tools end end |
#state ⇒ Object
Return the current connection state.
80 81 82 83 84 |
# File 'lib/mqlight/thread_vars.rb', line 80 def state @thread_vars_mutex.synchronize do @state end end |
#state=(new_state) ⇒ Object
Will update the connection state and notify those interest in the change.
68 69 70 71 72 73 74 75 |
# File 'lib/mqlight/thread_vars.rb', line 68 def state=(new_state) @thread_vars_mutex.synchronize do if @state != new_state @state = new_state @thread_vars_resource.signal end end end |
#subscriptions_clear ⇒ Object
179 180 181 182 183 |
# File 'lib/mqlight/thread_vars.rb', line 179 def subscriptions_clear logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } @destinations = [] logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } end |
#subscriptions_present? ⇒ Boolean
172 173 174 |
# File 'lib/mqlight/thread_vars.rb', line 172 def subscriptions_present? !@destinations.empty? end |
#wait_for_state_change(timeout) ⇒ Object
Will block the calling thread until there is a change of connection state.
90 91 92 93 94 95 |
# File 'lib/mqlight/thread_vars.rb', line 90 def wait_for_state_change(timeout) @thread_vars_mutex.synchronize do @thread_vars_resource.wait(@thread_vars_mutex, timeout) end @state end |