Class: Thor::AppClient
- Inherits:
-
Application
- Object
- Bsl::Application
- Application
- Thor::AppClient
- Defined in:
- lib/ThorClient.rb
Defined Under Namespace
Classes: Server, StructCustomer
Instance Attribute Summary collapse
-
#clients ⇒ Object
Returns the value of attribute clients.
-
#clients_lock ⇒ Object
Returns the value of attribute clients_lock.
-
#em_server ⇒ Object
Returns the value of attribute em_server.
-
#request_exit ⇒ Object
Returns the value of attribute request_exit.
Instance Method Summary collapse
-
#amqp_exchange_connect_master(master, guid_direct) ⇒ Object
Connects to master exchange.
-
#amqp_exchange_create_direct(guid) ⇒ Object
Creates local direct exchange.
-
#amqp_handle_failure(e) ⇒ Object
Handles failure when connecting to AMQP.
-
#amqp_reset_retry_interval ⇒ Object
Resets internal AMQP connection failure counter/interval.
-
#amqp_start ⇒ Object
Starts AMQP connection.
-
#amqp_stop ⇒ Object
Stops Running AMQP connection.
-
#em_post_init(em_connection, em_options) ⇒ Object
Called when client connects.
-
#em_receive(em_connection, em_options, status, raw_data) ⇒ Object
Event machine receive handle.
-
#em_start ⇒ Object
Event machine loop.
-
#em_stop ⇒ Object
Stop EventMachine.
-
#em_unbind(em_connection, em_options) ⇒ Object
Called when client disconnects.
-
#initialize(opts = {}) ⇒ AppClient
constructor
A new instance of AppClient.
-
#main ⇒ Object
Main entry-point.
Methods inherited from Application
Constructor Details
#initialize(opts = {}) ⇒ AppClient
Returns a new instance of AppClient.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/ThorClient.rb', line 68 def initialize(opts = {}) super(opts) @@AMQP_DEFAULT_RETRY_INTERVAL = 3 @@AMQP_MAX_RETRY_INTERVAL = (30) @@AMQP_MAX_RETRY_ATTEMPS = -1 @@AMQP_RETRY_MULTIPLER = 1.5 @amqp_retry_interval = @@AMQP_DEFAULT_RETRY_INTERVAL @amqp_retry_attempt = 0 # Signalizes that application wants exit for some reason @request_exit = false @em_server = nil @clients = {} @clients_lock = Mutex.new # AMQP options [:amqp_host] = "localhost" [:amqp_port] = 8467 [:amqp_user] = "user" [:amqp_password] = "password" [:amqp_vhost] = "my-vhost" [:amqp_channel_master] = "master" # Event Machine options [:em_port] = 8467 # Thor on cell-phone keyboard [:em_auth_token] = "" initialize_optparser { |opts| ############################ # AMQP Section ############################ # AMQP Host opts.on( '-H', '--amqp-host STRING', "AMQP Server hostname") do |host| [:amqp_host] = host end # AMQP Port opts.on( '-p', '--amqp-port NUM', "AMQP Server port number") do |port| [:amqp_port] = port end # AMQP Username opts.on( '-u', '--amqp-user STRING', "AMQP Username") do |user| [:amqp_user] = user end # AMQP Password opts.on( '-P', '--amqp-password STRING', "AMQP Password") do |password| [:amqp_password] = password end # AMQP Vhost opts.on( '-V', '--amqp-vhost STRING', "AMQP Virtual Host") do |vhost| [:amqp_vhost] = vhost end ## Channels # Channel Master opts.on( '-acm', '--amqp-channel-master STRING', "AMQP Channel Mastr") do |channel| [:amqp_channel_master] = channel end ############################ # EventMachine Section ############################ # EM Port opts.on( '-ep', '--em-port NUM', "EventMachine port") do |port| [:em_port] = port end # EM Authentication token opts.on( '-eat', '--em-auth-token STRING', "Authentication token used for communication with EM") do |token| [:em_auth_token] = token end } end |
Instance Attribute Details
#clients ⇒ Object
Returns the value of attribute clients.
39 40 41 |
# File 'lib/ThorClient.rb', line 39 def clients @clients end |
#clients_lock ⇒ Object
Returns the value of attribute clients_lock.
39 40 41 |
# File 'lib/ThorClient.rb', line 39 def clients_lock @clients_lock end |
#em_server ⇒ Object
Returns the value of attribute em_server.
39 40 41 |
# File 'lib/ThorClient.rb', line 39 def em_server @em_server end |
#request_exit ⇒ Object
Returns the value of attribute request_exit.
39 40 41 |
# File 'lib/ThorClient.rb', line 39 def request_exit @request_exit end |
Instance Method Details
#amqp_exchange_connect_master(master, guid_direct) ⇒ Object
Connects to master exchange
183 184 185 186 187 |
# File 'lib/ThorClient.rb', line 183 def amqp_exchange_connect_master(master, guid_direct) if([:verbose]) Bsl::Logger::Log "GUID '#{guid_direct}', Connecting to master exchange '#{master}'." end end |
#amqp_exchange_create_direct(guid) ⇒ Object
Creates local direct exchange
176 177 178 179 180 |
# File 'lib/ThorClient.rb', line 176 def amqp_exchange_create_direct(guid) if([:verbose]) Bsl::Logger::Log "Creating direct exchange '#{guid}'." end end |
#amqp_handle_failure(e) ⇒ Object
Handles failure when connecting to AMQP
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/ThorClient.rb', line 190 def amqp_handle_failure(e) amqp_stop() Bsl::Logger::Log "AMQP Failure, reason: '#{e.inspect}'." if(@request_exit == true) return false end max_attempts_reached = false if(@@AMQP_MAX_RETRY_ATTEMPS != nil && @@AMQP_MAX_RETRY_ATTEMPS >= 0) @amqp_retry_attempt = @amqp_retry_attempt + 1 max_attempts_reached = @amqp_retry_attempt > @@AMQP_MAX_RETRY_ATTEMPS end if(max_attempts_reached == false) Bsl::Logger::Log "Next attempt in #{@amqp_retry_interval} sec(s)." sleep (@amqp_retry_interval) @amqp_retry_interval = @amqp_retry_interval * @@AMQP_RETRY_MULTIPLER @amqp_retry_interval = @@AMQP_MAX_RETRY_INTERVAL if @amqp_retry_interval > @@AMQP_MAX_RETRY_INTERVAL else if(@@AMQP_MAX_RETRY_ATTEMPS != nil) Bsl::Logger::Log "Maximum AQMP reconnect attempts limit reached (#{@@AMQP_MAX_RETRY_ATTEMPS}), quitting." end @request_exit = true end return true end |
#amqp_reset_retry_interval ⇒ Object
Resets internal AMQP connection failure counter/interval
149 150 151 152 |
# File 'lib/ThorClient.rb', line 149 def amqp_reset_retry_interval @amqp_retry_interval = @@AMQP_DEFAULT_RETRY_INTERVAL @amqp_retry_attempt = 0 end |
#amqp_start ⇒ Object
Starts AMQP connection
155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/ThorClient.rb', line 155 def amqp_start Bsl::Logger::Log "Starting AMQP - Connecting #{options[:amqp_user]}@#{options[:amqp_host]}:#{options[:amqp_port]}#{options[:amqp_vhost]}" AMQP.start(:host => [:amqp_host], :port => [:amqp_port], :vhost => [:amqp_vhost], :user => [:amqp_user], :password => [:amqp_password] ) do Bsl::Logger::Log "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..." amqp_reset_retry_interval() em_start() guid = Thor::generate_guid amqp_exchange_create_direct(guid) # Create local direct exchange amqp_exchange_connect_master([:amqp_channel_master], guid) end end |
#amqp_stop ⇒ Object
Stops Running AMQP connection
170 171 172 173 |
# File 'lib/ThorClient.rb', line 170 def amqp_stop Bsl::Logger::Log "Stopping AMQP" AMQP.stop { EM.stop } end |
#em_post_init(em_connection, em_options) ⇒ Object
Called when client connects
243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/ThorClient.rb', line 243 def em_post_init(em_connection, ) pid = Digest::MD5.hexdigest(em_connection.to_s) # em_connection.get_pid port, ip = Socket.unpack_sockaddr_in(em_connection.get_peername) if([:verbose]) Bsl::Logger::Log "Client connected #{ip}:#{port}, pid: #{pid}" end @clients_lock.synchronize { c = StructCustomer.new(pid, em_connection) @clients[pid] = c #puts "Clients now: #{@clients.size}" } end |
#em_receive(em_connection, em_options, status, raw_data) ⇒ Object
Event machine receive handle
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/ThorClient.rb', line 271 def em_receive(em_connection, , status, raw_data) port, ip = Socket.unpack_sockaddr_in(em_connection.get_peername) if([:verbose]) Bsl::Logger::Log "EventMachine received data from #{ip}:#{port}, :status => '#{status}', :raw_data => #{raw_data.chomp}" end data = nil begin data = JSON.parse(raw_data) rescue Exception => e Bsl::Logger::Log "Unable to parse incomming json, reason: #{e.message}." return false end if(data['type'] == "system" && data['code'] == "stop") @request_exit = true em_stop() amqp_stop() end return true end |
#em_start ⇒ Object
Event machine loop
222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/ThorClient.rb', line 222 def em_start Bsl::Logger::Log "Starting EventMachine at port #{options[:em_port]}" EM.run do em_opts = {:thor_client => self} EM.start_server 'localhost', [:em_port], Server, em_opts do |conn| em_opts[:thor_client].em_server = conn em_opts[:conn] = conn conn. = em_opts conn.status = :OK end end end |
#em_stop ⇒ Object
Stop EventMachine
236 237 238 239 240 |
# File 'lib/ThorClient.rb', line 236 def em_stop Bsl::Logger::Log "Stopping EventMachine" #@em_server.stop EventMachine::stop_event_loop end |
#em_unbind(em_connection, em_options) ⇒ Object
Called when client disconnects
258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/ThorClient.rb', line 258 def em_unbind(em_connection, ) pid = Digest::MD5.hexdigest(em_connection.to_s) # em_connection.get_pid if([:verbose]) Bsl::Logger::Log "Client disconnected, pid: #{pid}" end @clients_lock.synchronize { @clients.delete(pid) #puts "Clients now: #{@clients.size}" } end |
#main ⇒ Object
Main entry-point
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/ThorClient.rb', line 295 def main super() # Run loop while exit is not requested while(@request_exit == false) begin amqp_start() rescue SystemExit, Interrupt Bsl::Logger::Log "Received interrupt, quitting!" @request_exit = true #em_stop() amqp_stop() rescue Exception => e amqp_handle_failure(e) end end end |