Class: Sqskiq::Manager
- Inherits:
-
Object
- Object
- Sqskiq::Manager
- Includes:
- Celluloid, Celluloid::Notifications
- Defined in:
- lib/sqskiq/manager.rb
Instance Method Summary collapse
- #batch_process_done(messages) ⇒ Object
- #bootstrap ⇒ Object
- #fetch_done(messages) ⇒ Object
-
#initialize ⇒ Manager
constructor
A new instance of Manager.
- #new_fetch(num) ⇒ Object
- #running? ⇒ Boolean
- #shutting_down(signal) ⇒ Object
- #subscribe_shutting_down ⇒ Object
Constructor Details
#initialize ⇒ Manager
Returns a new instance of Manager.
10 11 12 13 |
# File 'lib/sqskiq/manager.rb', line 10 def initialize @shutting_down = false subscribe_shutting_down end |
Instance Method Details
#batch_process_done(messages) ⇒ Object
27 28 29 30 |
# File 'lib/sqskiq/manager.rb', line 27 def batch_process_done() @deleter.async.delete() if @deleter.alive? new_fetch(1) end |
#bootstrap ⇒ Object
15 16 17 18 19 20 21 |
# File 'lib/sqskiq/manager.rb', line 15 def bootstrap @fetcher = Celluloid::Actor[:fetcher] @batch_processor = Celluloid::Actor[:batch_processor] @deleter = Celluloid::Actor[:deleter] #TODO default value. Should be configurable new_fetch(2) end |
#fetch_done(messages) ⇒ Object
23 24 25 |
# File 'lib/sqskiq/manager.rb', line 23 def fetch_done() @batch_processor.async.batch_process() unless @shutting_down end |
#new_fetch(num) ⇒ Object
32 33 34 |
# File 'lib/sqskiq/manager.rb', line 32 def new_fetch(num) num.times { @fetcher.async.fetch unless @shutting_down } end |
#running? ⇒ Boolean
40 41 42 |
# File 'lib/sqskiq/manager.rb', line 40 def running? not (@deleter.busy_size == 0 and @shutting_down) end |
#shutting_down(signal) ⇒ Object
36 37 38 |
# File 'lib/sqskiq/manager.rb', line 36 def shutting_down(signal) @shutting_down = true end |
#subscribe_shutting_down ⇒ Object
44 45 46 47 48 |
# File 'lib/sqskiq/manager.rb', line 44 def subscribe_shutting_down subscribe('SIGINT', :shutting_down) subscribe('TERM', :shutting_down) subscribe('SIGTERM', :shutting_down) end |