Class: HomeQ::SOBS::ServerConnection
- Inherits:
-
Connection
- Object
- EventMachine::Connection
- Connection
- HomeQ::SOBS::ServerConnection
- Defined in:
- lib/homeq/sobs/server.rb
Overview
Server Connection
This class represents the server-side of a connection between a SOBS client and the SOBS server.
Constant Summary
Constants inherited from Connection
Connection::ALL_MESSAGES, Connection::ARGS_AND_BODY_MESSAGES, Connection::ARG_AND_BODY_MESSAGES, Connection::ARG_MESSAGES, Connection::BODY_MESSAGES, Connection::HEADER_REGEX, Connection::HELLO_TIMER_LENGTH, Connection::MAX_INPUT_MSGS_PER_READ_CYCLE, Connection::NO_ARG_MESSAGES, Connection::OUTBOUND_MAX, Connection::PUSH_TIMER_LENGTH, Connection::REFUSE_SEND_THRESHOLD
Constants included from HomeQ
Instance Attribute Summary collapse
-
#handler ⇒ Object
Job handler.
-
#server_state ⇒ Object
readonly
This connection’s state.
Attributes inherited from Connection
#outbound_max, #peer, #refuse_send_threshold, #server, #state
Instance Method Summary collapse
-
#closed ⇒ Object
Called from the lower-level Connection abstraction when a client connection is dead.
-
#initialize ⇒ ServerConnection
constructor
Create a new ServerConnection, which is a child of SOBS::Connection.
-
#open ⇒ Object
– Other ++.
-
#opened ⇒ Object
Called from the lower-level Connection abstraction when a client connection is complete and ready to use.
-
#receive_bury(message) ⇒ Object
Handle a ‘bury’ message from the client, which says that the job shouldn’t be available for anyone.
-
#receive_delete(message) ⇒ Object
Handle a ‘delete’ message from the client, which says that they’ve completed the work.
-
#receive_kick(message) ⇒ Object
Handle a ‘kick’ message from the client, which move some number of buried jobs to the ‘ready’ state.
-
#receive_put(message) ⇒ Object
Handle a ‘put’ message from the remote endoint.
-
#receive_release(message) ⇒ Object
Handle a ‘release’ message from the client, which says that they’re not going to finish (or delete) the work, and to make it available for someone else to work on.
-
#receive_reserve(message) ⇒ Object
Handle a ‘reserve’ message from the client, which says that they’re ready for work to be sent to them.
-
#receive_subscribe(message) ⇒ Object
Handle a ‘subscribe’ message from the client, which tells us that they want to be issued work to do whenever it’s available.
-
#receive_unsubscribed(message) ⇒ Object
Handle a ‘unsubscribe’ message from the client, which tells us that they want to send explicit requests for more work via ‘reserve’ commands.
- #release_job(job) ⇒ Object
-
#reserved(job_id, payload) ⇒ Object
Send a ‘reserved’ message to the remote endpoint, indicating that it’s now in charge of doing the indicated piece of work.
- #to_s ⇒ Object
- #unknown_message(command_name) ⇒ Object
Methods inherited from Connection
#cancel_hello_timer, #close, #closing, #connection_completed, #enter_congestion, #exit_congestion, #ok_to_receive?, #parse_header, #parse_incoming_data, #post_init, #process_data, #protocol_version_ok?, #receive_data, #receive_hello, #receive_message, #remote_endpoint, #send_to_peer, #setup_peer, #should_refuse_send?, #start_closing, #unbind, #wait_for_hello
Methods included from HomeQ
calculated_homeq_env, calculated_homeq_topology, included, queue_list_for_host_from_topology
Methods included from Sender
#book_it, #buried, #bury, #deadline_soon, #delete, #deleted, #hello, #inserted, #kick, #kicked, #not_found, #put, #release, #released, #reserve, #stats, #stats_job, #subscribe, #unsubscribe
Methods included from Base::Logging
Constructor Details
#initialize ⇒ ServerConnection
Create a new ServerConnection, which is a child of SOBS::Connection. Called from Server#start via EventMachine. When an incoming TCP connection is received, a ServerConnection is created to handle it.
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/homeq/sobs/server.rb', line 311 def initialize super # The state machine that drives our behavior. @server_state = Statemachine.build do state :start do event :open, :open event :close, :closed end state :waiting_for_job do on_entry :reserve_ready_job event :sent_job, :open event :reserve, :waiting_for_job event :received_job, :waiting_for_job, :reserve_ready_job event :subscribe, :waiting_for_job event :unsubscribe, :open event :close, :closed end state :open do event :subscribe, :waiting_for_job event :unsubscribe, :open event :reserve, :waiting_for_job event :received_job, :open event :close, :closed end state :closed do event :close, :closed end end @server_state.context = self # stats @jobs_received = 0 # An attribute of this connection. If true, the client has # indicated that they want to be sent a ready job whenever # it's available. Otherwise, they must ask for each job via # the 'reserve' command. @subscribed = nil end |
Instance Attribute Details
#handler ⇒ Object
Job handler
305 306 307 |
# File 'lib/homeq/sobs/server.rb', line 305 def handler @handler end |
#server_state ⇒ Object (readonly)
This connection’s state
302 303 304 |
# File 'lib/homeq/sobs/server.rb', line 302 def server_state @server_state end |
Instance Method Details
#closed ⇒ Object
Called from the lower-level Connection abstraction when a client connection is dead.
365 366 367 368 369 370 |
# File 'lib/homeq/sobs/server.rb', line 365 def closed logger.info { "Job pool size: #{ServerJob.pool.size}" } @server_state.close end |
#open ⇒ Object
– Other ++
475 476 477 478 |
# File 'lib/homeq/sobs/server.rb', line 475 def open super @server_state.open end |
#opened ⇒ Object
Called from the lower-level Connection abstraction when a client connection is complete and ready to use.
359 360 361 |
# File 'lib/homeq/sobs/server.rb', line 359 def opened @server_state.open end |
#receive_bury(message) ⇒ Object
Handle a ‘bury’ message from the client, which says that the job shouldn’t be available for anyone.
444 445 446 447 |
# File 'lib/homeq/sobs/server.rb', line 444 def receive_bury() ok_to_receive? || return @server.foreman.bury_job(self, .args[0]) end |
#receive_delete(message) ⇒ Object
Handle a ‘delete’ message from the client, which says that they’ve completed the work. We let the foreman know.
426 427 428 429 |
# File 'lib/homeq/sobs/server.rb', line 426 def receive_delete() ok_to_receive? || return @server.foreman.delete_job(self, .args[0]) end |
#receive_kick(message) ⇒ Object
Handle a ‘kick’ message from the client, which move some number of buried jobs to the ‘ready’ state.
451 452 453 454 |
# File 'lib/homeq/sobs/server.rb', line 451 def receive_kick() ok_to_receive? || return @server.foreman.kick(self, .args[0]) end |
#receive_put(message) ⇒ Object
Handle a ‘put’ message from the remote endoint. We create a message and reply to let the other side know we got it.
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/homeq/sobs/server.rb', line 392 def receive_put() if !ok_to_receive? logger.error { "Connection not ready for 'put'" } close return end if .args.length != 5 logger.error { "Malformed put from SOBS peer #{remote_endpoint}. " + "Closing connection." } close return end create_job() @server_state.received_job end |
#receive_release(message) ⇒ Object
Handle a ‘release’ message from the client, which says that they’re not going to finish (or delete) the work, and to make it available for someone else to work on.
434 435 436 437 438 439 440 |
# File 'lib/homeq/sobs/server.rb', line 434 def receive_release() ok_to_receive? || return @server.foreman.release_job(self, .args[0], .args[1], .args[2]) end |
#receive_reserve(message) ⇒ Object
Handle a ‘reserve’ message from the client, which says that they’re ready for work to be sent to them.
414 415 416 417 418 419 420 421 422 |
# File 'lib/homeq/sobs/server.rb', line 414 def receive_reserve() if !ok_to_receive? logger.error { "Connction not ready to 'receive'" } return end @server_state.reserve end |
#receive_subscribe(message) ⇒ Object
Handle a ‘subscribe’ message from the client, which tells us that they want to be issued work to do whenever it’s available.
458 459 460 461 |
# File 'lib/homeq/sobs/server.rb', line 458 def receive_subscribe() @subscribed = true @server_state.subscribe end |
#receive_unsubscribed(message) ⇒ Object
Handle a ‘unsubscribe’ message from the client, which tells us that they want to send explicit requests for more work via ‘reserve’ commands.
466 467 468 469 |
# File 'lib/homeq/sobs/server.rb', line 466 def receive_unsubscribed() @subscribed = false @server_state.unsubscribed end |
#release_job(job) ⇒ Object
488 489 |
# File 'lib/homeq/sobs/server.rb', line 488 def release_job(job) end |
#reserved(job_id, payload) ⇒ Object
Send a ‘reserved’ message to the remote endpoint, indicating that it’s now in charge of doing the indicated piece of work. Track the server state appropriately (switch state to ‘open’ unless the remote endpoint has subscribed).
381 382 383 384 |
# File 'lib/homeq/sobs/server.rb', line 381 def reserved(job_id, payload) super(job_id, payload) # send the msg to the client @server_state.sent_job unless @subscribed end |
#to_s ⇒ Object
491 492 493 494 495 496 497 498 |
# File 'lib/homeq/sobs/server.rb', line 491 def to_s str = "" str << "#{self.class} state: #{server_state.state} " str << "subscribed?: #{@subscribed ? 'Y' : 'N'}\n" str << "jobs in: #{@jobs_received} " str << "job pool size: #{ServerJob.pool ? ServerJob.pool.size : '?'}\n" str << super.gsub(/\n/m, "\n ") end |
#unknown_message(command_name) ⇒ Object
480 481 482 483 484 485 486 |
# File 'lib/homeq/sobs/server.rb', line 480 def (command_name) logger.info { "SOBS peer #{remote_endpoint} sent unknown/unsupported " + "command: #{command_name}" } close end |