Class: Threads

Inherits:
Object
  • Object
show all
Defined in:
lib/threads.rb

Overview

Threads.

This class may help you test your code for thread-safety by running it multiple times in a few parallel threads:

require 'threads'
Threads.new(5).assert do |i|
  puts "Hello from the thread no.#{i}"
end

Here, the message will be printed to the console five times, once in every thread.

Author

Yegor Bugayenko (yegor256@gmail.com)

Copyright

Copyright © 2018-2024 Yegor Bugayenko

License

MIT

Instance Method Summary collapse

Constructor Details

#initialize(total = Concurrent.processor_count * 8, log: $stdout) ⇒ Threads

Constructor.

Parameters:

  • total (Number) (defaults to: Concurrent.processor_count * 8)

    How many threads to run

  • log (Logger) (defaults to: $stdout)

    Where to print output


49
50
51
52
53
54
55
# File 'lib/threads.rb', line 49

def initialize(total = Concurrent.processor_count * 8, log: $stdout)
  raise "Total can't be nil" if total.nil?
  raise "Total can't be negative or zero: #{total}" unless total.positive?
  @total = total
  raise "Log can't be nil" if log.nil?
  @log = log
end

Instance Method Details

#assert(reps = @total) ⇒ Object

Run them all and assert that all of them finished successfully.

Parameters:

  • reps (Number) (defaults to: @total)

    How many times to repeat the testing cycle

Returns:

  • nil


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/threads.rb', line 61

def assert(reps = @total)
  raise "Repetition counter #{reps} can't be smaller than #{@total}" if reps < @total
  done = Concurrent::AtomicFixnum.new
  rep = Concurrent::AtomicFixnum.new
  pool = Concurrent::FixedThreadPool.new(@total)
  latch = Concurrent::CountDownLatch.new(1)
  finish = Concurrent::CountDownLatch.new(@total)
  @total.times do |t|
    pool.post do
      Thread.current.name = "assert-thread-#{t}"
      latch.wait(10)
      begin
        loop do
          r = rep.increment
          break if r > reps
          begin
            yield(t, r - 1)
          rescue StandardError => e
            print(Backtrace.new(e))
            raise e
          end
        end
        done.increment
      ensure
        finish.count_down
      end
    end
  end
  latch.count_down
  finish.wait(10)
  pool.shutdown
  raise "Can't stop the pool" unless pool.wait_for_termination(30)
  return if done.value == @total
  raise "Only #{done.value} out of #{@total} threads completed successfully"
end