Piscina
Spanish for "Pool"
Blurb
JRuby SQS job processor/dispatcher and multi-threaded logger.
Dependencies
- Some flavor of JRuby
- Ruby Standard Library
- AWS SDK
- FakeSQS for tests
Piscina
Each instance of Piscina creates N + 2 OS threads. One thread to pull SQS messages off the queue and place them into a buffer, 1 thread for logging, and N threads in a pool that process those jobs. If the number of jobs that are placed into the pool are greater than N, the pool will just buffer those jobs until there is a thread ready to process them. This is handled internally by the Java class Excecutors.
Piscina instances handle SIGTERM's and other shutdowns gracefully by allowing the buffered jobs to finish before the thread pool is terminated.
Piscina makes the assumption that each SQS queue can be handled by one class. The class itself can have more complicated logic to process different types of messages based on the content of the message itself but that is left up to you (the developer).
Piscina takes two arguments SQS_URL : the specific URL for your SQS queue Klass : Klass must know Klass#perform(msg) and must handle the deletion or the immediate requeue-ing of the message if perform is unsuccessful. Otherwise default SQS visibility rules will apply. Exceptions are caught without exception (ahem). Dead Letter Queue (DLQ) policies are handled by the SQS queue itself. E.g. three attempts are made to deliver the message but all three attempts result in an exception the SQS queue will place the message in the DLQ for further inspection.
options[:pool_threads] : How many active threads there should be in each Piscina pool.
In use
If, for example, you'd like to use a local SQS API in development and in test, but hit the real end points in production you could
# Development.rb
sqs_url = "https://localhost:4658/QUEUE_NAME"
Piscina.new(sqs_url, HttpJob)
# Production.rb
sqs_url = "https://sqs.AWS_REGION.amazonaws.com/YOUR_ACCOUNT_NUM/QUEUE_NAME"
Piscina.new(sqs_url, HttpJob)
Otherwise you can initialize all of your queues at startup
Piscina Logger
PiscinaLogger is created to handle logging events for Piscina. However, it is suitable for any logging where many threads need to write to the same file at once, and the order in which they are logged is unimportant.
PiscinaLogger uses a fixed thread pool of size one to write to the log file. Writes are queued up until the thread is ready to process again.
Config Options
Configuration Defaults:
- log_directory = RailsRoot/log
- logging_level = Logger:INFO
PiscinaLogger.configure do |config|
config.log_directory = "/User/home/derp_kid/derp_code/log"
config.logging_level = Logger::DEBUG
end
The default options are pretty sane. Check them out in configuration.rb.
HTTPJob
An example job (that works!). This is just an arbitrary job that can be performed with any queue that implements the format below in the message body.
REMEMBER: SQS does NOT guarantee that messages will be delivered once. Make sure any calls to your endpoint can handle this (idempotency).
Format:
{
"headers": {
"user_token":"That's a nice token" // Any number of Key:Value pairs
},
"http_method": "GET", // or POST PUT DELETE
"uri":"https://SOMEHOST.WHATEVER/your/endpoint", // can be HTTP or HTTPS.
// we'll dynamically catch that
"body":"THIS IS SOME TEXT OR A BODY" // Body will only be processed in Post's or
// Put's
}
Build and Install without gem
gem build piscina.gemspec && gem install piscina-*.gem
Test
jruby test/simple_test.rb