Class: PgqConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/pgq_consumer.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, consumer_id) ⇒ PgqConsumer

Returns a new instance of PgqConsumer.



5
6
7
8
# File 'lib/pgq_consumer.rb', line 5

def initialize(queue, consumer_id)
  self.queue = queue
  self.consumer_id = consumer_id
end

Instance Attribute Details

#consumer_idObject

Returns the value of attribute consumer_id.



3
4
5
# File 'lib/pgq_consumer.rb', line 3

def consumer_id
  @consumer_id
end

#loggerObject

Returns the value of attribute logger.



3
4
5
# File 'lib/pgq_consumer.rb', line 3

def logger
  @logger
end

#queueObject

Returns the value of attribute queue.



3
4
5
# File 'lib/pgq_consumer.rb', line 3

def queue
  @queue
end

Class Method Details

.add_event(data) ⇒ Object



97
98
99
# File 'lib/pgq_consumer.rb', line 97

def self.add_event data
  ActiveRecord::Base.pgq_insert_event(self.const_get('QUEUE_NAME'), self.const_get('TYPE'), data.to_yaml)
end

.failed_event_count(queue_name, consumer) ⇒ Object



28
29
30
# File 'lib/pgq_consumer.rb', line 28

def self.failed_event_count queue_name, consumer
   ActiveRecord::Base.connection.select_value("select * from pgq.failed_event_count(#{self.quote queue_name}, #{self.quote consumer})")
end

.failed_event_delete(queue_name, consumer, event_id) ⇒ Object



23
24
25
26
# File 'lib/pgq_consumer.rb', line 23

def self.failed_event_delete queue_name, consumer,event_id
   ActiveRecord::Base.connection.select_value(
           "select * from    pgq.failed_event_delete(#{self.quote queue_name}, #{self.quote consumer},#{event_id.to_i})")
end

.failed_event_list(queue_name, consumer, cnt = nil, offset = nil) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/pgq_consumer.rb', line 32

def self.failed_event_list queue_name, consumer, cnt=nil, offset=nil
    off=''
   if(cnt)
      off=",#{cnt.to_i},#{offset.to_i}"
   end
   ActiveRecord::Base.connection.select_all("select * from pgq.failed_event_list(#{self.quote queue_name}, #{self.quote consumer} #{off}) order by ev_id desc")
end

.failed_event_retry(queue_name, consumer, event_id) ⇒ Object



19
20
21
22
# File 'lib/pgq_consumer.rb', line 19

def self.failed_event_retry queue_name, consumer,event_id
   ActiveRecord::Base.connection.select_value(
           "select * from    pgq.failed_event_retry(#{self.quote queue_name}, #{self.quote consumer},#{event_id.to_i})")
end

.get_consumer_infoObject



14
15
16
# File 'lib/pgq_consumer.rb', line 14

def self.get_consumer_info
   @get_consumer_info||=ActiveRecord::Base.connection.select_all("select * from pgq.get_consumer_info()")
end

.quote(text) ⇒ Object



10
11
12
# File 'lib/pgq_consumer.rb', line 10

def self.quote text
  ActiveRecord::Base.connection.quote text 
end

Instance Method Details

#add_event(data) ⇒ Object



93
94
95
# File 'lib/pgq_consumer.rb', line 93

def add_event data
  self.class.add_event data
end

#event_failed(event_id, reason) ⇒ Object



55
56
57
# File 'lib/pgq_consumer.rb', line 55

def event_failed(event_id, reason)
  ActiveRecord::Base.pgq_event_failed(@batch_id, event_id, reason)
end

#event_retry(event_id, retry_seconds) ⇒ Object



59
60
61
# File 'lib/pgq_consumer.rb', line 59

def event_retry(event_id, retry_seconds)
  ActiveRecord::Base.pgq_event_retry(@batch_id, event_id, retry_seconds)
end

#finish_batch(count = nil) ⇒ Object



51
52
53
# File 'lib/pgq_consumer.rb', line 51

def finish_batch(count = nil)
  ActiveRecord::Base.pgq_finish_batch(@batch_id)
end

#get_batch_eventsObject



40
41
42
43
44
45
# File 'lib/pgq_consumer.rb', line 40

def get_batch_events
  @batch_id = get_next_batch
  return if !@batch_id
  res = ActiveRecord::Base.pgq_get_batch_events(@batch_id)
  res
end

#get_next_batchObject



47
48
49
# File 'lib/pgq_consumer.rb', line 47

def get_next_batch
  ActiveRecord::Base.pgq_next_batch(queue, consumer_id)
end

#perform_batch(watch_file = nil) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/pgq_consumer.rb', line 63

def perform_batch(watch_file = nil)
  events = get_batch_events
  logger.info "batch(#{queue}): #{get_next_batch} events: #{events.size}" if events.present?

  return if !events

  events.each do |event|
    if watch_file and File.exists?(watch_file)
      event_retry(event['ev_id'], 0)
    else
      if RAILS_ENV and (RAILS_ENV == 'development' or RAILS_ENV == 'test')
        perform_event(prepare_event(event))
      else
        begin
          perform_event(prepare_event(event))
        rescue StandardError => ex
          event_failed event['ev_id'], ex
        end
      end
    end
  end

  finish_batch(events.size)
  true
end

#prepare_event(event) ⇒ Object



89
90
91
# File 'lib/pgq_consumer.rb', line 89

def prepare_event event
  PgqEvent.new event
end