Class: Threasy::Work

Inherits:
Object
  • Object
show all
Defined in:
lib/threasy/work.rb

Defined Under Namespace

Classes: TimeoutQueue, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeWork

Returns a new instance of Work.



18
19
20
21
22
# File 'lib/threasy/work.rb', line 18

def initialize
  @queue = TimeoutQueue.new
  @pool = Set.new
  @semaphore = Mutex.new
end

Instance Attribute Details

#poolObject (readonly)

Threasy::Work

Class to manage the work queue.

An instance of ‘Threasy::Work` will manage a pool of worker threads and a queue of jobs. As the job queue grows, new worker threads are created process the jobs. When the Queue remains empty, worker threads are culled.

Example

work = Threasy::Work.new
work.enqueue { puts "Hello from the background!" }
# Outputs: Hello from the background!


16
17
18
# File 'lib/threasy/work.rb', line 16

def pool
  @pool
end

#queueObject (readonly)

Threasy::Work

Class to manage the work queue.

An instance of ‘Threasy::Work` will manage a pool of worker threads and a queue of jobs. As the job queue grows, new worker threads are created process the jobs. When the Queue remains empty, worker threads are culled.

Example

work = Threasy::Work.new
work.enqueue { puts "Hello from the background!" }
# Outputs: Hello from the background!


16
17
18
# File 'lib/threasy/work.rb', line 16

def queue
  @queue
end

Instance Method Details

#add_worker(size) ⇒ Object



68
69
70
71
72
73
# File 'lib/threasy/work.rb', line 68

def add_worker(size)
  log "Adding new worker to pool"
  worker = Worker.new(self, size)
  pool.add worker
  worker.work
end

#check_workersObject



57
58
59
60
61
62
63
64
65
66
# File 'lib/threasy/work.rb', line 57

def check_workers
  sync do
    pool_size = pool.size
    queue_size = queue.size
    log "Checking workers. Pool: #{pool_size}, Max: #{max_workers}, Queue: #{queue_size}"
    if pool_size < max_workers
      add_worker(pool_size) if pool_size == 0 || queue_size > max_workers
    end
  end
end

#clearObject



75
76
77
# File 'lib/threasy/work.rb', line 75

def clear
  queue.clear
end

#enqueue(job = nil, &block) ⇒ Object Also known as: enqueue_block

Enqueue a job into the work queue

Examples

work = Threasy::Work.new

# Enqueue blocks
work.enqueue { do_some_background_work }

# Enqueue job objects that respond to `perform` or `call`
work.enqueue BackgroundJob.new(some: data)

# Enqueue string that evals to a job object
Threasy.enqueue("BackgroundJob.new")


39
40
41
# File 'lib/threasy/work.rb', line 39

def enqueue(job = nil, &block)
  queue.push(block_given? ? block : job).tap { check_workers }
end

#grabObject



49
50
51
# File 'lib/threasy/work.rb', line 49

def grab
  queue.pop
end

#log(msg) ⇒ Object



79
80
81
# File 'lib/threasy/work.rb', line 79

def log(msg)
  Threasy.logger.debug msg
end

#max_workersObject



53
54
55
# File 'lib/threasy/work.rb', line 53

def max_workers
  Threasy.config.max_workers
end

#sync(&block) ⇒ Object



45
46
47
# File 'lib/threasy/work.rb', line 45

def sync(&block)
  @semaphore.synchronize &block
end