Class: HomeQ::SOBS::ServerConnection

Inherits:
Connection
  • Object
show all
Defined in:
lib/homeq/sobs/server.rb

Overview

Server Connection

This class represents the server-side of a connection between a SOBS client and the SOBS server.

Constant Summary

Constants inherited from Connection

Connection::ALL_MESSAGES, Connection::ARGS_AND_BODY_MESSAGES, Connection::ARG_AND_BODY_MESSAGES, Connection::ARG_MESSAGES, Connection::BODY_MESSAGES, Connection::HEADER_REGEX, Connection::HELLO_TIMER_LENGTH, Connection::MAX_INPUT_MSGS_PER_READ_CYCLE, Connection::NO_ARG_MESSAGES, Connection::OUTBOUND_MAX, Connection::PUSH_TIMER_LENGTH, Connection::REFUSE_SEND_THRESHOLD

Constants included from HomeQ

VERSION

Instance Attribute Summary collapse

Attributes inherited from Connection

#outbound_max, #peer, #refuse_send_threshold, #server, #state

Instance Method Summary collapse

Methods inherited from Connection

#cancel_hello_timer, #close, #closing, #connection_completed, #enter_congestion, #exit_congestion, #ok_to_receive?, #parse_header, #parse_incoming_data, #post_init, #process_data, #protocol_version_ok?, #receive_data, #receive_hello, #receive_message, #remote_endpoint, #send_to_peer, #setup_peer, #should_refuse_send?, #start_closing, #unbind, #wait_for_hello

Methods included from HomeQ

calculated_homeq_env, calculated_homeq_topology, included, queue_list_for_host_from_topology

Methods included from Sender

#book_it, #buried, #bury, #deadline_soon, #delete, #deleted, #hello, #inserted, #kick, #kicked, #not_found, #put, #release, #released, #reserve, #stats, #stats_job, #subscribe, #unsubscribe

Methods included from Base::Logging

#logger

Constructor Details

#initializeServerConnection

Create a new ServerConnection, which is a child of SOBS::Connection. Called from Server#start via EventMachine. When an incoming TCP connection is received, a ServerConnection is created to handle it.



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/homeq/sobs/server.rb', line 311

def initialize
  super

  # The state machine that drives our behavior.
  
  @server_state = Statemachine.build do
    state :start do
      event :open, :open
      event :close, :closed
    end
    state :waiting_for_job do
      on_entry :reserve_ready_job
      event :sent_job, :open
      event :reserve, :waiting_for_job
      event :received_job, :waiting_for_job, :reserve_ready_job
      event :subscribe, :waiting_for_job
      event :unsubscribe, :open
      event :close, :closed
    end
    state :open do
      event :subscribe, :waiting_for_job
      event :unsubscribe, :open
      event :reserve, :waiting_for_job
      event :received_job, :open
      event :close, :closed
    end
    state :closed do
      event :close, :closed
    end
  end
  @server_state.context = self

  # stats
  @jobs_received                          = 0

  # An attribute of this connection.  If true, the client has
  # indicated that they want to be sent a ready job whenever
  # it's available.  Otherwise, they must ask for each job via
  # the 'reserve' command.
  @subscribed                             = nil
end

Instance Attribute Details

#handlerObject

Job handler



305
306
307
# File 'lib/homeq/sobs/server.rb', line 305

def handler
  @handler
end

#server_stateObject (readonly)

This connection’s state



302
303
304
# File 'lib/homeq/sobs/server.rb', line 302

def server_state
  @server_state
end

Instance Method Details

#closedObject

Called from the lower-level Connection abstraction when a client connection is dead.



365
366
367
368
369
370
# File 'lib/homeq/sobs/server.rb', line 365

def closed
  logger.info {
    "Job pool size: #{ServerJob.pool.size}"
  }
  @server_state.close
end

#openObject

– Other ++



475
476
477
478
# File 'lib/homeq/sobs/server.rb', line 475

def open
  super
  @server_state.open
end

#openedObject

Called from the lower-level Connection abstraction when a client connection is complete and ready to use.



359
360
361
# File 'lib/homeq/sobs/server.rb', line 359

def opened
  @server_state.open
end

#receive_bury(message) ⇒ Object

Handle a ‘bury’ message from the client, which says that the job shouldn’t be available for anyone.



444
445
446
447
# File 'lib/homeq/sobs/server.rb', line 444

def receive_bury(message)
  ok_to_receive? || return
  @server.foreman.bury_job(self, message.args[0])
end

#receive_delete(message) ⇒ Object

Handle a ‘delete’ message from the client, which says that they’ve completed the work. We let the foreman know.



426
427
428
429
# File 'lib/homeq/sobs/server.rb', line 426

def receive_delete(message)
  ok_to_receive? || return
  @server.foreman.delete_job(self, message.args[0])
end

#receive_kick(message) ⇒ Object

Handle a ‘kick’ message from the client, which move some number of buried jobs to the ‘ready’ state.



451
452
453
454
# File 'lib/homeq/sobs/server.rb', line 451

def receive_kick(message)
  ok_to_receive? || return
  @server.foreman.kick(self, message.args[0])
end

#receive_put(message) ⇒ Object

Handle a ‘put’ message from the remote endoint. We create a message and reply to let the other side know we got it.



392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/homeq/sobs/server.rb', line 392

def receive_put(message)
  if !ok_to_receive?
    logger.error {
      "Connection not ready for 'put'"
    }
    close
    return
  end
  if message.args.length != 5
    logger.error {
      "Malformed put from SOBS peer #{remote_endpoint}.  " +
        "Closing connection."
    }
    close
    return
  end
  create_job(message)
  @server_state.received_job
end

#receive_release(message) ⇒ Object

Handle a ‘release’ message from the client, which says that they’re not going to finish (or delete) the work, and to make it available for someone else to work on.



434
435
436
437
438
439
440
# File 'lib/homeq/sobs/server.rb', line 434

def receive_release(message)
  ok_to_receive? || return
  @server.foreman.release_job(self,
                              message.args[0],
                              message.args[1],
                              message.args[2])
end

#receive_reserve(message) ⇒ Object

Handle a ‘reserve’ message from the client, which says that they’re ready for work to be sent to them.



414
415
416
417
418
419
420
421
422
# File 'lib/homeq/sobs/server.rb', line 414

def receive_reserve(message)
  if !ok_to_receive?
    logger.error {
      "Connction not ready to 'receive'"
    }
    return
  end
  @server_state.reserve
end

#receive_subscribe(message) ⇒ Object

Handle a ‘subscribe’ message from the client, which tells us that they want to be issued work to do whenever it’s available.



458
459
460
461
# File 'lib/homeq/sobs/server.rb', line 458

def receive_subscribe(message)
  @subscribed = true
  @server_state.subscribe
end

#receive_unsubscribed(message) ⇒ Object

Handle a ‘unsubscribe’ message from the client, which tells us that they want to send explicit requests for more work via ‘reserve’ commands.



466
467
468
469
# File 'lib/homeq/sobs/server.rb', line 466

def receive_unsubscribed(message)
  @subscribed = false
  @server_state.unsubscribed
end

#release_job(job) ⇒ Object



488
489
# File 'lib/homeq/sobs/server.rb', line 488

def release_job(job)
end

#reserved(job_id, payload) ⇒ Object

Send a ‘reserved’ message to the remote endpoint, indicating that it’s now in charge of doing the indicated piece of work. Track the server state appropriately (switch state to ‘open’ unless the remote endpoint has subscribed).



381
382
383
384
# File 'lib/homeq/sobs/server.rb', line 381

def reserved(job_id, payload)
  super(job_id, payload) # send the msg to the client
  @server_state.sent_job unless @subscribed
end

#to_sObject



491
492
493
494
495
496
497
498
# File 'lib/homeq/sobs/server.rb', line 491

def to_s
  str = ""
  str << "#{self.class} state: #{server_state.state} "
  str << "subscribed?: #{@subscribed ? 'Y' : 'N'}\n"
  str << "jobs in: #{@jobs_received} "
  str << "job pool size: #{ServerJob.pool ? ServerJob.pool.size : '?'}\n"
  str << super.gsub(/\n/m, "\n  ")
end

#unknown_message(command_name) ⇒ Object



480
481
482
483
484
485
486
# File 'lib/homeq/sobs/server.rb', line 480

def unknown_message(command_name)
  logger.info {
    "SOBS peer #{remote_endpoint} sent unknown/unsupported " +
      "command: #{command_name}"
  }
  close
end