Class: Sqskiq::BatchProcessor
- Inherits:
-
Object
- Object
- Sqskiq::BatchProcessor
- Includes:
- Celluloid, Celluloid::Notifications
- Defined in:
- lib/sqskiq/batch_process.rb
Instance Method Summary collapse
- #batch_process(messages) ⇒ Object
-
#initialize ⇒ BatchProcessor
constructor
A new instance of BatchProcessor.
- #interrupt(signal) ⇒ Object
- #subscribe_interrupt ⇒ Object
Constructor Details
#initialize ⇒ BatchProcessor
Returns a new instance of BatchProcessor.
9 10 11 12 13 14 |
# File 'lib/sqskiq/batch_process.rb', line 9 def initialize @manager = Celluloid::Actor[:manager] @processor = Celluloid::Actor[:processor] subscribe_interrupt end |
Instance Method Details
#batch_process(messages) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/sqskiq/batch_process.rb', line 16 def batch_process() p "processing #{.size} messages" process_result = [] .each do || process_result << @processor.future.process() end = [] process_result.each do |result| value = result.value if value[:success] << value[:message] end end @manager.async.batch_process_done() end |
#interrupt(signal) ⇒ Object
41 42 43 |
# File 'lib/sqskiq/batch_process.rb', line 41 def interrupt(signal) self.terminate end |
#subscribe_interrupt ⇒ Object
35 36 37 38 39 |
# File 'lib/sqskiq/batch_process.rb', line 35 def subscribe_interrupt subscribe('SIGINT', :interrupt) subscribe('TERM', :interrupt) subscribe('SIGTERM', :interrupt) end |