Class: Pgq::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/pgq/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(h) ⇒ Worker

Returns a new instance of Worker.

[View source]

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pgq/worker.rb', line 28

def initialize(h)
  @logger = h[:logger] || (defined?(Rails) && Rails.logger) || Logger.new(STDOUT)
  @consumers = []
  
  @queues = h[:queues]
  raise "Queue not selected" if @queues.blank?
  
  if @queues == ['all'] || @queues == 'all'
    if h[:queues_list]
      @queues = YAML.load_file(h[:queues_list])
    elsif defined?(Rails) && File.exists?(Rails.root + "config/queues_list.yml")
      @queues = YAML.load_file(Rails.root + "config/queues_list.yml")
    else
      raise "You shoud create config/queues_list.yml for all queues"
    end
  end
  
  @queues = @queues.split(',') if @queues.is_a?(String)
  
  @queues.each do |queue|
    klass = Pgq::Worker.predict_queue_class(queue)
    if klass
      @consumers << klass.new(@logger, queue)
    else
      raise "Unknown queue: #{queue}"
    end
  end

  @watch_file = h[:watch_file]
  @sleep_time = h[:sleep_time] || 0.5
end

Instance Attribute Details

#consumersObject (readonly)

Returns the value of attribute consumers.


4
5
6
# File 'lib/pgq/worker.rb', line 4

def consumers
  @consumers
end

#loggerObject (readonly)

Returns the value of attribute logger.


4
5
6
# File 'lib/pgq/worker.rb', line 4

def logger
  @logger
end

#queuesObject (readonly)

Returns the value of attribute queues.


4
5
6
# File 'lib/pgq/worker.rb', line 4

def queues
  @queues
end

#sleep_timeObject (readonly)

Returns the value of attribute sleep_time.


4
5
6
# File 'lib/pgq/worker.rb', line 4

def sleep_time
  @sleep_time
end

#watch_fileObject (readonly)

Returns the value of attribute watch_file.


4
5
6
# File 'lib/pgq/worker.rb', line 4

def watch_file
  @watch_file
end

Class Method Details

.connection(queue) ⇒ Object

[View source]

19
20
21
22
23
24
25
26
# File 'lib/pgq/worker.rb', line 19

def self.connection(queue)
  klass = predict_queue_class(queue)
  if klass
    klass.connection
  else
    raise "can't find klass for queue #{queue}"
  end
end

.predict_queue_class(queue) ⇒ Object

[View source]

6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/pgq/worker.rb', line 6

def self.predict_queue_class(queue)
  klass = nil
  unless klass
    queue.to_s.match(/([a-z_]+)/i)
    klass_s = $1.to_s
    klass_s.chop! if klass_s.size > 0 && klass_s[-1].chr == '_'
    klass_s = "pgq_" + klass_s unless klass_s.start_with?("pgq_")
    klass = klass_s.camelize.constantize rescue nil
    klass = nil unless klass.is_a?(Class)
  end    
  klass    
end

Instance Method Details

#process_batchObject

[View source]

60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/pgq/worker.rb', line 60

def process_batch
  process_count = 0

  @consumers.each do |consumer|
    process_count += consumer.perform_batch

    if @watch_file && File.exists?(@watch_file)
      logger.info "Found file #{@watch_file}, exiting!"
      File.unlink(@watch_file)
      return processed_count
    end
  end

  process_count
end

#runObject

[View source]

76
77
78
79
80
81
82
83
# File 'lib/pgq/worker.rb', line 76

def run
  logger.info "Worker for (#{@queues.join(",")}) started"

  loop do
    processed_count = process_batch
    sleep(@sleep_time) if processed_count == 0
  end
end