Class: Piscina

Inherits:
Object
  • Object
show all
Defined in:
lib/piscina/piscina.rb

Constant Summary collapse

DEFAULT_THREADS_IN_POOL =

threads

10
DEFAULT_MESSAGE_TIMEOUT =

seconds

15
MESSAGE_VISIBILITY_TIMEOUT =

seconds

12*60
MESSAGE_RETRY_VISIBILITY =

seconds

5
AWS_REGION =
'us-east-1'
AWS_ACCOUNT_NUM =
'682315851866'

Instance Method Summary collapse

Constructor Details

#initialize(sqs_url, klass, options = {}) ⇒ Piscina

Returns a new instance of Piscina.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/piscina/piscina.rb', line 14

def initialize(sqs_url, klass, options={})
  @queue = create_or_initialize_queue(sqs_url)
  @klass = klass

  pool_threads = options[:pool_threads] || DEFAULT_THREADS_IN_POOL
  @thread_pool = Executors.newFixedThreadPool(pool_threads)

  # Use SQS queue name for log name.
  queue_name = sqs_url.split("/")[-1]
  @logger = PiscinaLogger.new(queue_name)

  # Listen for SIGTERM's and other shutdown messages
  at_exit do
    shutdown_instance
  end

  self.poll
end

Instance Method Details

#pollObject

Unfortunately, we can’t use the AWS SDK’s Queue#poll with a block as it uses Queue#call_message_block which will ALWAYS delete a received message.



35
36
37
38
39
40
# File 'lib/piscina/piscina.rb', line 35

def poll
  # Creates a real OS thread through JRuby
  Thread.new do
    poll_sqs
  end
end

#poll_sqsObject



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/piscina/piscina.rb', line 42

def poll_sqs
  loop do
    msg = @queue.receive_messages(visibility_timeout: MESSAGE_VISIBILITY_TIMEOUT,
                                  wait_time_seconds: DEFAULT_MESSAGE_TIMEOUT)

    # receive_message can time out and return nil
    next  if msg.nil?
    break if @thread_pool.nil? || @thread_pool.isShutdown

    process_message(msg)
  end
end

#process_message(msg) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/piscina/piscina.rb', line 55

def process_message(msg)
  @thread_pool.execute do
    begin
      @klass.perform(msg)

      body = msg.body.delete("\n").delete(" ")

      @logger.info("Successfully processed message:#{msg.id};body:#{body}")
    rescue => e
      @logger.error("Could not process message:#{msg.id};body:#{body};error:#{e.message}")

      # DLQ policy -> Messages are attempted N times and then benched. Policy is defined by the
      # queue itself.
      msg.visibility_timeout = MESSAGE_RETRY_VISIBILITY
    end
  end
end

#send_message(text) ⇒ Object



81
82
83
# File 'lib/piscina/piscina.rb', line 81

def send_message(text)
  @queue.send_message(text)
end

#shutdown_instanceObject



73
74
75
76
77
78
79
# File 'lib/piscina/piscina.rb', line 73

def shutdown_instance
  # Shutting down thread pools does not happen immediately;
  # all jobs are allowed to finished before thread is closed.
  #TODO make sure to wait for shutdown
  @thread_pool.shutdown
  @logger.shutdown
end