Class: Sqskiq::BatchProcessor

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Notifications
Defined in:
lib/sqskiq/batch_process.rb

Instance Method Summary collapse

Constructor Details

#initializeBatchProcessor

Returns a new instance of BatchProcessor.



9
10
11
12
13
14
# File 'lib/sqskiq/batch_process.rb', line 9

def initialize
  @manager = Celluloid::Actor[:manager]
  @processor = Celluloid::Actor[:processor]

  subscribe_interrupt
end

Instance Method Details

#batch_process(messages) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/sqskiq/batch_process.rb', line 16

def batch_process(messages)
  p "processing #{messages.size} messages"

  process_result = []
  messages.each do |message|
    process_result << @processor.future.process(message)
  end

  success_messages = []
  process_result.each do |result|
    value = result.value
    if value[:success]
      success_messages << value[:message]
    end
  end

  @manager.async.batch_process_done(success_messages)
end

#interrupt(signal) ⇒ Object



41
42
43
# File 'lib/sqskiq/batch_process.rb', line 41

def interrupt(signal)
  self.terminate
end

#subscribe_interruptObject



35
36
37
38
39
# File 'lib/sqskiq/batch_process.rb', line 35

def subscribe_interrupt
  subscribe('SIGINT', :interrupt)
  subscribe('TERM', :interrupt)
  subscribe('SIGTERM', :interrupt)
end