Module: Pgq::Utils
- Included in:
- ConsumerBase
- Defined in:
- lib/pgq/utils.rb
Instance Method Summary collapse
-
#add_queue(queue_name, consumer_name = self.consumer_name) ⇒ Object
methods for migrations.
- #delete_failed_events(queue_name, limit = 5_000) ⇒ Object
-
#inspect_londiste_queue(queue_name) ⇒ Object
show hash stats, for londiste type of storage events { type => events_count }.
-
#inspect_queue(queue_name) ⇒ Object
inspect queue { type => events_count }.
- #inspect_self_queue ⇒ Object
- #last_event_id(queue_name) ⇒ Object
-
#proxy(method_name) ⇒ Object
proxing method for tests.
-
#queues_list ⇒ Object
all queues for database.
- #remove_queue(queue_name, consumer_name = self.consumer_name) ⇒ Object
-
#retry_failed_events(queue_name, limit = 5_000) ⇒ Object
resend failed events in queue.
Instance Method Details
#add_queue(queue_name, consumer_name = self.consumer_name) ⇒ Object
methods for migrations
9 10 11 12 |
# File 'lib/pgq/utils.rb', line 9 def add_queue(queue_name, consumer_name = self.consumer_name) database.pgq_create_queue(queue_name.to_s) database.pgq_register_consumer(queue_name.to_s, consumer_name.to_s) end |
#delete_failed_events(queue_name, limit = 5_000) ⇒ Object
107 108 109 110 111 112 113 114 115 |
# File 'lib/pgq/utils.rb', line 107 def delete_failed_events(queue_name, limit = 5_000) events = database.pgq_failed_event_list(queue_name, self.consumer_name, limit, nil, 'asc') || [] events.each do |event| database.pgq_failed_event_delete(queue_name, self.consumer_name, event['ev_id']) end events.length end |
#inspect_londiste_queue(queue_name) ⇒ Object
show hash stats, for londiste type of storage events { type => events_count }
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/pgq/utils.rb', line 57 def inspect_londiste_queue(queue_name) table, last_event = last_event_id(queue_name) if last_event stats = connection.select_all " SELECT count(*) as count, ev_type, ev_extra1\n FROM \#{table}\n WHERE ev_id > \#{last_event.to_i}\n GROUP BY ev_type, ev_extra1\n SQL\n \n stats.each do |x|\n result[\"\#{x['ev_extra1']}:\#{x['ev_type']}\"] = x['count'].to_i\n end\n \n else \n stats = connection.select_all <<-SQL\n SELECT ev_type, ev_extra1\n FROM \#{table}\n GROUP BY ev_type, ev_extra1 ORDER BY ev_extra1, ev_type\n SQL\n\n stats.each do |x|\n result[\"\#{x['ev_extra1']}:\#{x['ev_type']}\"] = 0\n end\n end\n\n result\nend\n" |
#inspect_queue(queue_name) ⇒ Object
inspect queue
{ type => events_count }
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/pgq/utils.rb', line 21 def inspect_queue(queue_name) table, last_event = last_event_id(queue_name) if last_event stats = connection.select_all " SELECT count(*) as count, ev_type\n FROM \#{table}\n WHERE ev_id > \#{last_event.to_i}\n GROUP BY ev_type\n SQL\n \n stats.each do |x|\n result[\"\#{x['ev_type']}\"] = x['count'].to_i\n end\n \n else \n stats = connection.select_all <<-SQL\n SELECT ev_type\n FROM \#{table}\n GROUP BY ev_type\n SQL\n\n stats.each do |x|\n result[\"\#{x['ev_type']}\"] = 0\n end\n end\n\n result\nend\n" |
#inspect_self_queue ⇒ Object
51 52 53 |
# File 'lib/pgq/utils.rb', line 51 def inspect_self_queue self.inspect_queue(self.queue_name) end |
#last_event_id(queue_name) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/pgq/utils.rb', line 117 def last_event_id(queue_name) ticks = database.pgq_get_consumer_queue_info(queue_name) table = connection.select_value("SELECT queue_data_pfx AS table FROM pgq.queue WHERE queue_name = #{database.sanitize(queue_name)}") result = nil if ticks['current_batch'] sql = connection.select_value("SELECT * FROM pgq.batch_event_sql(#{database.sanitize(ticks['current_batch'].to_i)})") last_event = connection.select_value("SELECT MAX(ev_id) AS count FROM (#{sql}) AS x") result = last_event.to_i end [table, result] end |
#proxy(method_name) ⇒ Object
proxing method for tests
89 90 91 92 93 94 |
# File 'lib/pgq/utils.rb', line 89 def proxy(method_name) self.should_receive(:enqueue) do |method_name, *data| x = self.coder.load(self.coder.dump(data)) self.new.send(:perform, method_name, *x) end.any_number_of_times end |
#queues_list ⇒ Object
all queues for database
4 5 6 |
# File 'lib/pgq/utils.rb', line 4 def queues_list database.pgq_get_consumer_info.map{|x| x['queue_name']}.uniq end |
#remove_queue(queue_name, consumer_name = self.consumer_name) ⇒ Object
14 15 16 17 |
# File 'lib/pgq/utils.rb', line 14 def remove_queue(queue_name, consumer_name = self.consumer_name) database.pgq_unregister_consumer(queue_name.to_s, consumer_name.to_s) database.pgq_drop_queue(queue_name.to_s) end |
#retry_failed_events(queue_name, limit = 5_000) ⇒ Object
resend failed events in queue
97 98 99 100 101 102 103 104 105 |
# File 'lib/pgq/utils.rb', line 97 def retry_failed_events(queue_name, limit = 5_000) events = database.pgq_failed_event_list(queue_name, self.consumer_name, limit, nil, 'asc') || [] events.each do |event| database.pgq_failed_event_retry(queue_name, self.consumer_name, event['ev_id']) end events.length end |