Class: PgqConsumer
- Inherits:
-
Object
- Object
- PgqConsumer
- Defined in:
- lib/pgq_consumer.rb
Instance Attribute Summary collapse
-
#consumer_id ⇒ Object
Returns the value of attribute consumer_id.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#queue ⇒ Object
Returns the value of attribute queue.
Class Method Summary collapse
- .add_event(data) ⇒ Object
- .failed_event_count(queue_name, consumer) ⇒ Object
- .failed_event_delete(queue_name, consumer, event_id) ⇒ Object
- .failed_event_list(queue_name, consumer, cnt = nil, offset = nil) ⇒ Object
- .failed_event_retry(queue_name, consumer, event_id) ⇒ Object
- .get_consumer_info ⇒ Object
- .quote(text) ⇒ Object
Instance Method Summary collapse
- #add_event(data) ⇒ Object
- #event_failed(event_id, reason) ⇒ Object
- #event_retry(event_id, retry_seconds) ⇒ Object
- #finish_batch(count = nil) ⇒ Object
- #get_batch_events ⇒ Object
- #get_next_batch ⇒ Object
-
#initialize(queue, consumer_id) ⇒ PgqConsumer
constructor
A new instance of PgqConsumer.
- #perform_batch(watch_file = nil) ⇒ Object
- #prepare_event(event) ⇒ Object
Constructor Details
#initialize(queue, consumer_id) ⇒ PgqConsumer
Returns a new instance of PgqConsumer.
5 6 7 8 |
# File 'lib/pgq_consumer.rb', line 5 def initialize(queue, consumer_id) self.queue = queue self.consumer_id = consumer_id end |
Instance Attribute Details
#consumer_id ⇒ Object
Returns the value of attribute consumer_id.
3 4 5 |
# File 'lib/pgq_consumer.rb', line 3 def consumer_id @consumer_id end |
#logger ⇒ Object
Returns the value of attribute logger.
3 4 5 |
# File 'lib/pgq_consumer.rb', line 3 def logger @logger end |
#queue ⇒ Object
Returns the value of attribute queue.
3 4 5 |
# File 'lib/pgq_consumer.rb', line 3 def queue @queue end |
Class Method Details
.add_event(data) ⇒ Object
97 98 99 |
# File 'lib/pgq_consumer.rb', line 97 def self.add_event data ActiveRecord::Base.pgq_insert_event(self.const_get('QUEUE_NAME'), self.const_get('TYPE'), data.to_yaml) end |
.failed_event_count(queue_name, consumer) ⇒ Object
28 29 30 |
# File 'lib/pgq_consumer.rb', line 28 def self.failed_event_count queue_name, consumer ActiveRecord::Base.connection.select_value("select * from pgq.failed_event_count(#{self.quote queue_name}, #{self.quote consumer})") end |
.failed_event_delete(queue_name, consumer, event_id) ⇒ Object
23 24 25 26 |
# File 'lib/pgq_consumer.rb', line 23 def self.failed_event_delete queue_name, consumer,event_id ActiveRecord::Base.connection.select_value( "select * from pgq.failed_event_delete(#{self.quote queue_name}, #{self.quote consumer},#{event_id.to_i})") end |
.failed_event_list(queue_name, consumer, cnt = nil, offset = nil) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/pgq_consumer.rb', line 32 def self.failed_event_list queue_name, consumer, cnt=nil, offset=nil off='' if(cnt) off=",#{cnt.to_i},#{offset.to_i}" end ActiveRecord::Base.connection.select_all("select * from pgq.failed_event_list(#{self.quote queue_name}, #{self.quote consumer} #{off}) order by ev_id desc") end |
.failed_event_retry(queue_name, consumer, event_id) ⇒ Object
19 20 21 22 |
# File 'lib/pgq_consumer.rb', line 19 def self.failed_event_retry queue_name, consumer,event_id ActiveRecord::Base.connection.select_value( "select * from pgq.failed_event_retry(#{self.quote queue_name}, #{self.quote consumer},#{event_id.to_i})") end |
.get_consumer_info ⇒ Object
14 15 16 |
# File 'lib/pgq_consumer.rb', line 14 def self.get_consumer_info @get_consumer_info||=ActiveRecord::Base.connection.select_all("select * from pgq.get_consumer_info()") end |
.quote(text) ⇒ Object
10 11 12 |
# File 'lib/pgq_consumer.rb', line 10 def self.quote text ActiveRecord::Base.connection.quote text end |
Instance Method Details
#add_event(data) ⇒ Object
93 94 95 |
# File 'lib/pgq_consumer.rb', line 93 def add_event data self.class.add_event data end |
#event_failed(event_id, reason) ⇒ Object
55 56 57 |
# File 'lib/pgq_consumer.rb', line 55 def event_failed(event_id, reason) ActiveRecord::Base.pgq_event_failed(@batch_id, event_id, reason) end |
#event_retry(event_id, retry_seconds) ⇒ Object
59 60 61 |
# File 'lib/pgq_consumer.rb', line 59 def event_retry(event_id, retry_seconds) ActiveRecord::Base.pgq_event_retry(@batch_id, event_id, retry_seconds) end |
#finish_batch(count = nil) ⇒ Object
51 52 53 |
# File 'lib/pgq_consumer.rb', line 51 def finish_batch(count = nil) ActiveRecord::Base.pgq_finish_batch(@batch_id) end |
#get_batch_events ⇒ Object
40 41 42 43 44 45 |
# File 'lib/pgq_consumer.rb', line 40 def get_batch_events @batch_id = get_next_batch return if !@batch_id res = ActiveRecord::Base.pgq_get_batch_events(@batch_id) res end |
#get_next_batch ⇒ Object
47 48 49 |
# File 'lib/pgq_consumer.rb', line 47 def get_next_batch ActiveRecord::Base.pgq_next_batch(queue, consumer_id) end |
#perform_batch(watch_file = nil) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/pgq_consumer.rb', line 63 def perform_batch(watch_file = nil) events = get_batch_events logger.info "batch(#{queue}): #{get_next_batch} events: #{events.size}" if events.present? return if !events events.each do |event| if watch_file and File.exists?(watch_file) event_retry(event['ev_id'], 0) else if RAILS_ENV and (RAILS_ENV == 'development' or RAILS_ENV == 'test') perform_event(prepare_event(event)) else begin perform_event(prepare_event(event)) rescue StandardError => ex event_failed event['ev_id'], ex end end end end finish_batch(events.size) true end |