Class: Piscina
- Inherits:
-
Object
- Object
- Piscina
- Defined in:
- lib/piscina/piscina.rb
Constant Summary collapse
- DEFAULT_THREADS_IN_POOL =
threads
10
- DEFAULT_MESSAGE_TIMEOUT =
seconds
15
- MESSAGE_VISIBILITY_TIMEOUT =
seconds
12*60
- MESSAGE_RETRY_VISIBILITY =
seconds
5
- AWS_REGION =
'us-east-1'
- AWS_ACCOUNT_NUM =
'682315851866'
Instance Method Summary collapse
-
#initialize(sqs_url, klass, options = {}) ⇒ Piscina
constructor
A new instance of Piscina.
-
#poll ⇒ Object
Unfortunately, we can’t use the AWS SDK’s Queue#poll with a block as it uses Queue#call_message_block which will ALWAYS delete a received message.
- #poll_sqs ⇒ Object
- #process_message(msg) ⇒ Object
- #send_message(text) ⇒ Object
- #shutdown_instance ⇒ Object
Constructor Details
#initialize(sqs_url, klass, options = {}) ⇒ Piscina
Returns a new instance of Piscina.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/piscina/piscina.rb', line 14 def initialize(sqs_url, klass, ={}) @queue = create_or_initialize_queue(sqs_url) @klass = klass pool_threads = [:pool_threads] || DEFAULT_THREADS_IN_POOL @thread_pool = Executors.newFixedThreadPool(pool_threads) # Use SQS queue name for log name. queue_name = sqs_url.split("/")[-1] @logger = PiscinaLogger.new(queue_name) # Listen for SIGTERM's and other shutdown messages at_exit do shutdown_instance end self.poll end |
Instance Method Details
#poll ⇒ Object
Unfortunately, we can’t use the AWS SDK’s Queue#poll with a block as it uses Queue#call_message_block which will ALWAYS delete a received message.
35 36 37 38 39 40 |
# File 'lib/piscina/piscina.rb', line 35 def poll # Creates a real OS thread through JRuby Thread.new do poll_sqs end end |
#poll_sqs ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/piscina/piscina.rb', line 42 def poll_sqs loop do msg = @queue.(visibility_timeout: MESSAGE_VISIBILITY_TIMEOUT, wait_time_seconds: DEFAULT_MESSAGE_TIMEOUT) # receive_message can time out and return nil next if msg.nil? break if @thread_pool.nil? || @thread_pool.isShutdown (msg) end end |
#process_message(msg) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/piscina/piscina.rb', line 55 def (msg) @thread_pool.execute do begin @klass.perform(msg) body = msg.body.delete("\n").delete(" ") @logger.info("Successfully processed message:#{msg.id};body:#{body}") rescue => e @logger.error("Could not process message:#{msg.id};body:#{body};error:#{e.}") # DLQ policy -> Messages are attempted N times and then benched. Policy is defined by the # queue itself. msg.visibility_timeout = MESSAGE_RETRY_VISIBILITY end end end |
#send_message(text) ⇒ Object
81 82 83 |
# File 'lib/piscina/piscina.rb', line 81 def (text) @queue.(text) end |
#shutdown_instance ⇒ Object
73 74 75 76 77 78 79 |
# File 'lib/piscina/piscina.rb', line 73 def shutdown_instance # Shutting down thread pools does not happen immediately; # all jobs are allowed to finished before thread is closed. #TODO make sure to wait for shutdown @thread_pool.shutdown @logger.shutdown end |