Module: Pgq::Utils

Included in:
ConsumerBase
Defined in:
lib/pgq/utils.rb

Instance Method Summary collapse

Instance Method Details

#add_queue(queue_name, consumer_name = self.consumer_name) ⇒ Object

methods for migrations



9
10
11
12
# File 'lib/pgq/utils.rb', line 9

def add_queue(queue_name, consumer_name = self.consumer_name)
  database.pgq_create_queue(queue_name.to_s)
  database.pgq_register_consumer(queue_name.to_s, consumer_name.to_s)
end

#delete_failed_events(queue_name, limit = 5_000) ⇒ Object



107
108
109
110
111
112
113
114
115
# File 'lib/pgq/utils.rb', line 107

def delete_failed_events(queue_name, limit = 5_000)
  events = database.pgq_failed_event_list(queue_name, self.consumer_name, limit, nil, 'asc') || []
     
  events.each do |event|
    database.pgq_failed_event_delete(queue_name, self.consumer_name, event['ev_id'])
  end
                    
  events.length
end

#inspect_londiste_queue(queue_name) ⇒ Object

show hash stats, for londiste type of storage events { type => events_count }



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/pgq/utils.rb', line 57

def inspect_londiste_queue(queue_name)
  table, last_event = last_event_id(queue_name)
  
  if last_event
    stats = connection.select_all "      SELECT count(*) as count, ev_type, ev_extra1\n      FROM \#{table}\n      WHERE ev_id > \#{last_event.to_i}\n      GROUP BY ev_type, ev_extra1\n    SQL\n    \n    stats.each do |x|\n      result[\"\#{x['ev_extra1']}:\#{x['ev_type']}\"] = x['count'].to_i\n    end\n    \n  else    \n    stats = connection.select_all <<-SQL\n      SELECT ev_type, ev_extra1\n      FROM \#{table}\n      GROUP BY ev_type, ev_extra1 ORDER BY ev_extra1, ev_type\n    SQL\n\n    stats.each do |x|\n      result[\"\#{x['ev_extra1']}:\#{x['ev_type']}\"] = 0\n    end\n  end\n\n  result\nend\n"

#inspect_queue(queue_name) ⇒ Object

inspect queue

{ type => events_count }



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/pgq/utils.rb', line 21

def inspect_queue(queue_name)
  table, last_event = last_event_id(queue_name)
  
  if last_event
    stats = connection.select_all "      SELECT count(*) as count, ev_type\n      FROM \#{table}\n      WHERE ev_id > \#{last_event.to_i}\n      GROUP BY ev_type\n    SQL\n    \n    stats.each do |x|\n      result[\"\#{x['ev_type']}\"] = x['count'].to_i\n    end\n    \n  else    \n    stats = connection.select_all <<-SQL\n      SELECT ev_type\n      FROM \#{table}\n      GROUP BY ev_type\n    SQL\n\n    stats.each do |x|\n      result[\"\#{x['ev_type']}\"] = 0\n    end\n  end\n\n  result\nend\n"

#inspect_self_queueObject



51
52
53
# File 'lib/pgq/utils.rb', line 51

def inspect_self_queue
  self.inspect_queue(self.queue_name)
end

#last_event_id(queue_name) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/pgq/utils.rb', line 117

def last_event_id(queue_name) 
  ticks = database.pgq_get_consumer_queue_info(queue_name)
  table = connection.select_value("SELECT queue_data_pfx AS table FROM pgq.queue WHERE queue_name = #{database.sanitize(queue_name)}")
  
  result = nil
      
  if ticks['current_batch']
    sql = connection.select_value("SELECT * FROM pgq.batch_event_sql(#{database.sanitize(ticks['current_batch'].to_i)})")
    last_event = connection.select_value("SELECT MAX(ev_id) AS count FROM (#{sql}) AS x")
    result = last_event.to_i
  end
  
  [table, result]
end

#proxy(method_name) ⇒ Object

proxing method for tests



89
90
91
92
93
94
# File 'lib/pgq/utils.rb', line 89

def proxy(method_name)
  self.should_receive(:enqueue) do |method_name, *data|
    x = self.coder.load(self.coder.dump(data))
    self.new.send(:perform, method_name, *x)
  end.any_number_of_times
end

#queues_listObject

all queues for database



4
5
6
# File 'lib/pgq/utils.rb', line 4

def queues_list
  database.pgq_get_consumer_info.map{|x| x['queue_name']}.uniq
end

#remove_queue(queue_name, consumer_name = self.consumer_name) ⇒ Object



14
15
16
17
# File 'lib/pgq/utils.rb', line 14

def remove_queue(queue_name, consumer_name = self.consumer_name)
  database.pgq_unregister_consumer(queue_name.to_s, consumer_name.to_s)
  database.pgq_drop_queue(queue_name.to_s)
end

#retry_failed_events(queue_name, limit = 5_000) ⇒ Object

resend failed events in queue



97
98
99
100
101
102
103
104
105
# File 'lib/pgq/utils.rb', line 97

def retry_failed_events(queue_name, limit = 5_000)
  events = database.pgq_failed_event_list(queue_name, self.consumer_name, limit, nil, 'asc') || []

  events.each do |event|
    database.pgq_failed_event_retry(queue_name, self.consumer_name, event['ev_id'])
  end
  
  events.length  
end