Class: Sqskiq::Manager

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Notifications
Defined in:
lib/sqskiq/manager.rb

Instance Method Summary collapse

Constructor Details

#initializeManager

Returns a new instance of Manager.



10
11
12
13
# File 'lib/sqskiq/manager.rb', line 10

def initialize
  @shutting_down = false
  subscribe_shutting_down
end

Instance Method Details

#batch_process_done(messages) ⇒ Object



27
28
29
30
# File 'lib/sqskiq/manager.rb', line 27

def batch_process_done(messages)
  @deleter.async.delete(messages) if @deleter.alive?
  new_fetch(1)
end

#bootstrapObject



15
16
17
18
19
20
21
# File 'lib/sqskiq/manager.rb', line 15

def bootstrap
  @fetcher = Celluloid::Actor[:fetcher]
  @batch_processor = Celluloid::Actor[:batch_processor]
  @deleter = Celluloid::Actor[:deleter]
  #TODO default value. Should be configurable
  new_fetch(2)
end

#fetch_done(messages) ⇒ Object



23
24
25
# File 'lib/sqskiq/manager.rb', line 23

def fetch_done(messages)
  @batch_processor.async.batch_process(messages) unless @shutting_down
end

#new_fetch(num) ⇒ Object



32
33
34
# File 'lib/sqskiq/manager.rb', line 32

def new_fetch(num)
  num.times { @fetcher.async.fetch unless @shutting_down }
end

#running?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/sqskiq/manager.rb', line 40

def running?
  not (@deleter.busy_size == 0 and @shutting_down)
end

#shutting_down(signal) ⇒ Object



36
37
38
# File 'lib/sqskiq/manager.rb', line 36

def shutting_down(signal)
  @shutting_down = true
end

#subscribe_shutting_downObject



44
45
46
47
48
# File 'lib/sqskiq/manager.rb', line 44

def subscribe_shutting_down
  subscribe('SIGINT', :shutting_down)
  subscribe('TERM', :shutting_down)
  subscribe('SIGTERM', :shutting_down)
end