Class: Threasy::Work

Inherits:
Object
  • Object
show all
Defined in:
lib/threasy/work.rb

Defined Under Namespace

Classes: TimeoutQueue, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeWork



18
19
20
21
22
23
# File 'lib/threasy/work.rb', line 18

def initialize
  @queue = TimeoutQueue.new
  @pool = Set.new
  @semaphore = Mutex.new
  min_workers.times { add_worker }
end

Instance Attribute Details

#poolObject (readonly)

Threasy::Work

Class to manage the work queue.

An instance of ‘Threasy::Work` will manage a pool of worker threads and a queue of jobs. As the job queue grows, new worker threads are created process the jobs. When the Queue remains empty, worker threads are culled.

Example

work = Threasy::Work.new
work.enqueue { puts "Hello from the background!" }
# Outputs: Hello from the background!


16
17
18
# File 'lib/threasy/work.rb', line 16

def pool
  @pool
end

#queueObject (readonly)

Threasy::Work

Class to manage the work queue.

An instance of ‘Threasy::Work` will manage a pool of worker threads and a queue of jobs. As the job queue grows, new worker threads are created process the jobs. When the Queue remains empty, worker threads are culled.

Example

work = Threasy::Work.new
work.enqueue { puts "Hello from the background!" }
# Outputs: Hello from the background!


16
17
18
# File 'lib/threasy/work.rb', line 16

def queue
  @queue
end

Instance Method Details

#add_workerObject



73
74
75
76
77
78
# File 'lib/threasy/work.rb', line 73

def add_worker
  log "Adding new worker to pool"
  worker = Worker.new(self, pool.size)
  pool.add worker
  worker.work
end

#check_workersObject



63
64
65
66
67
68
69
70
71
# File 'lib/threasy/work.rb', line 63

def check_workers
  sync do
    pool_size = pool.size
    log "Checking workers. Pool: #{pool_size} (min: #{min_workers}, max: #{max_workers})"
    if pool_size < max_workers
      add_worker if pool_size == 0 || queue.size > max_workers
    end
  end
end

#clearObject



80
81
82
# File 'lib/threasy/work.rb', line 80

def clear
  queue.clear
end

#enqueue(*args, &block) ⇒ Object Also known as: enqueue_block

Enqueue a job into the work queue

Examples

work = Threasy::Work.new

# Enqueue blocks
work.enqueue { do_some_background_work }

# Enqueue job objects that respond to `perform` or `call`
work.enqueue BackgroundJob.new(some: data)

# Enqueue string that evals to a job object
Threasy.enqueue("BackgroundJob.new")


40
41
42
43
# File 'lib/threasy/work.rb', line 40

def enqueue(*args, &block)
  args.unshift(block) if block_given?
  queue.push(args).tap { check_workers }
end

#grabObject



51
52
53
# File 'lib/threasy/work.rb', line 51

def grab
  queue.pop
end

#log(msg) ⇒ Object



84
85
86
# File 'lib/threasy/work.rb', line 84

def log(msg)
  Threasy.logger.debug msg
end

#max_workersObject



59
60
61
# File 'lib/threasy/work.rb', line 59

def max_workers
  Threasy.config.max_workers
end

#min_workersObject



55
56
57
# File 'lib/threasy/work.rb', line 55

def min_workers
  Threasy.config.min_workers
end

#sync(&block) ⇒ Object



47
48
49
# File 'lib/threasy/work.rb', line 47

def sync(&block)
  @semaphore.synchronize &block
end