Class: ThreadStorm
- Defined in:
- lib/thread_storm.rb,
lib/thread_storm/queue.rb,
lib/thread_storm/worker.rb,
lib/thread_storm/execution.rb
Overview
Simple but powerful thread pool implementation.
Defined Under Namespace
Classes: Execution, Queue, TimeoutError, Worker
Constant Summary collapse
- VERSION =
Version of ThreadStorm that you are using.
File.read(File.dirname(__FILE__)+"/../VERSION").chomp
- DEFAULTS =
Default options found in ThreadStorm.options.
{ :size => 2, :execute_blocks => false, :timeout => nil, :timeout_method => Timeout.method(:timeout), :timeout_exception => Timeout::Error, :default_value => nil, :reraise => true }.freeze
Instance Attribute Summary collapse
-
#executions ⇒ Object
readonly
Array of executions in order as they are defined by calls to ThreadStorm#execute.
-
#options ⇒ Object
readonly
Options specific to a ThreadStorm instance.
Instance Method Summary collapse
-
#clear_executions(method_name = nil, &block) ⇒ Object
Removes executions stored at ThreadStorm#executions.
-
#execute(*args, &block) ⇒ Object
call-seq: storm.execute(*args){ |*args| … } -> execution storm.execute(execution) -> execution.
-
#initialize(options = {}) ⇒ ThreadStorm
constructor
call-seq: new(options = {}) -> thread_storm new(options = {}){ |self| … } -> thread_storm.
-
#join ⇒ Object
Block until all pending executions are finished running.
-
#new_execution(*args, &block) ⇒ Object
This is like Execution.new except the default options are specific this ThreadStorm instance.
- #run {|_self| ... } ⇒ Object
-
#shutdown ⇒ Object
Signals the worker threads to terminate immediately (ignoring any pending executions) and blocks until they do.
-
#threads ⇒ Object
(also: #primitives)
Returns an array of Ruby threads in the pool.
-
#values ⇒ Object
Calls ThreadStorm#join, then collects the values of each execution.
Constructor Details
#initialize(options = {}) ⇒ ThreadStorm
call-seq:
new(options = {}) -> thread_storm
new(options = {}){ |self| ... } -> thread_storm
Valid options are…
:size => How many threads to spawn.
:timeout => Max time an execution is allowed to run before terminating it. Nil means no timeout.
:timeout_method => An object that implements something like Timeout.timeout via #call..
:default_value => Value of an execution if it times out or errors..
:reraise => True if you want exceptions to be reraised when ThreadStorm#join is called.
:execute_blocks => True if you want #execute to block until there is an available thread.
For defaults, see DEFAULTS.
When given a block, ThreadStorm#join and ThreadStorm#shutdown are called for you. In other words…
ThreadStorm.new do |storm|
storm.execute{ sleep(1) }
end
…is the same as…
storm = ThreadStorm.new
storm.execute{ sleep(1) }
storm.join
storm.shutdown
60 61 62 63 64 65 66 |
# File 'lib/thread_storm.rb', line 60 def initialize( = {}) @options = .reverse_merge(self.class.) @queue = Queue.new(@options[:size], @options[:execute_blocks]) @executions = [] @workers = (1..@options[:size]).collect{ Worker.new(@queue) } run{ yield(self) } if block_given? end |
Instance Attribute Details
#executions ⇒ Object (readonly)
Array of executions in order as they are defined by calls to ThreadStorm#execute.
35 36 37 |
# File 'lib/thread_storm.rb', line 35 def executions @executions end |
#options ⇒ Object (readonly)
Options specific to a ThreadStorm instance.
32 33 34 |
# File 'lib/thread_storm.rb', line 32 def @options end |
Instance Method Details
#clear_executions(method_name = nil, &block) ⇒ Object
Removes executions stored at ThreadStorm#executions. You can selectively remove them by passing in a block or a symbol. The following two lines are equivalent.
storm.clear_executions(:finished?)
storm.clear_executions{ |e| e.finished? }
Because of the nature of threading, the following code could happen:
storm.clear_executions(:finished?)
storm.executions.any?{ |e| e.finished? }
Some executions could have finished between the two calls.
162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/thread_storm.rb', line 162 def clear_executions(method_name = nil, &block) cleared, @executions = @executions.separate do |execution| if block_given? yield(execution) elsif method_name.nil? true else execution.send(method_name) end end cleared end |
#execute(*args, &block) ⇒ Object
call-seq:
storm.execute(*args){ |*args| ... } -> execution
storm.execute(execution) -> execution
Schedules an execution to be run (i.e. moves it to the :queued state). When given a block, it is the same as
execution = ThreadStorm::Execution.new(*args){ |*args| ... }
storm.execute(execution)
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/thread_storm.rb', line 107 def execute(*args, &block) if block_given? execution = new_execution(*args, &block) elsif args.length == 1 and args.first.instance_of?(Execution) execution = args.first else raise ArgumentError, "execution or arguments and block expected" end @queue.synchronize do |q| q.enqueue(execution) execution.queued! # This needs to be in here or we'll get a race condition to set the execution's state. end @executions << execution execution end |
#join ⇒ Object
Block until all pending executions are finished running. Reraises any exceptions caused by executions unless :reraise => false
was passed to ThreadStorm#new.
128 129 130 131 132 |
# File 'lib/thread_storm.rb', line 128 def join @executions.each do |execution| execution.join end end |
#new_execution(*args, &block) ⇒ Object
This is like Execution.new except the default options are specific this ThreadStorm instance.
ThreadStorm.[:timeout]
# => nil
storm = ThreadStorm.new :timeout => 1
execution = storm.new_execution
execution.[:timeout]
# => 1
execution = ThreadStorm::Execution.new
execution.[:timeout]
# => nil
78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/thread_storm.rb', line 78 def new_execution(*args, &block) # It has to be this way because of how options are merged. if block_given? Execution.new(.dup).define(*args, &block) elsif args.length == 0 Execution.new(.dup) elsif args.length == 1 and args.first.kind_of?(Hash) Execution.new(.merge(args.first)) else raise ArgumentError, "illegal call-seq" end end |
#run {|_self| ... } ⇒ Object
93 94 95 96 97 |
# File 'lib/thread_storm.rb', line 93 def run yield(self) join shutdown end |
#shutdown ⇒ Object
Signals the worker threads to terminate immediately (ignoring any pending executions) and blocks until they do.
141 142 143 144 145 |
# File 'lib/thread_storm.rb', line 141 def shutdown @queue.shutdown threads.each{ |thread| thread.join } true end |
#threads ⇒ Object Also known as: primitives
Returns an array of Ruby threads in the pool.
148 149 150 |
# File 'lib/thread_storm.rb', line 148 def threads @workers.collect{ |worker| worker.thread } end |
#values ⇒ Object
Calls ThreadStorm#join, then collects the values of each execution.
135 136 137 |
# File 'lib/thread_storm.rb', line 135 def values join and @executions.collect{ |execution| execution.value } end |