Module: Pgq::Api
- Defined in:
- lib/pgq/api.rb
Instance Method Summary collapse
-
#pgq_create_queue(queue_name) ⇒ Object
manage queues.
- #pgq_drop_queue(queue_name) ⇒ Object
-
#pgq_event_failed(batch_id, event_id, reason) ⇒ Object
failed/retry.
- #pgq_event_retry(batch_id, event_id, retry_seconds) ⇒ Object
- #pgq_failed_event_count(queue_name, consumer) ⇒ Object
- #pgq_failed_event_delete(queue_name, consumer, event_id) ⇒ Object
- #pgq_failed_event_list(queue_name, consumer, limit = nil, offset = nil, order = 'desc') ⇒ Object
-
#pgq_failed_event_retry(queue_name, consumer, event_id) ⇒ Object
failed events.
- #pgq_finish_batch(batch_id) ⇒ Object
- #pgq_get_batch_events(batch_id) ⇒ Object
- #pgq_get_consumer_info ⇒ Object
- #pgq_get_consumer_queue_info(queue_name) ⇒ Object
-
#pgq_get_queue_info(queue_name) ⇒ Object
info methods.
-
#pgq_get_queues_info ⇒ Object
Get list of queues.
-
#pgq_insert_event(queue_name, ev_type, ev_data, ev_extra1 = nil, ev_extra2 = nil, ev_extra3 = nil, ev_extra4 = nil) ⇒ Object
insert events.
-
#pgq_next_batch(queue_name, consumer_id) ⇒ Object
consuming.
- #pgq_register_consumer(queue_name, consumer_id) ⇒ Object
- #pgq_unregister_consumer(queue_name, consumer_id) ⇒ Object
Instance Method Details
#pgq_create_queue(queue_name) ⇒ Object
manage queues
6 7 8 |
# File 'lib/pgq/api.rb', line 6 def pgq_create_queue(queue_name) connection.select_value(sanitize_sql_array ["SELECT pgq.create_queue(?)", queue_name]).to_i end |
#pgq_drop_queue(queue_name) ⇒ Object
10 11 12 |
# File 'lib/pgq/api.rb', line 10 def pgq_drop_queue(queue_name) connection.select_value(sanitize_sql_array ["SELECT pgq.drop_queue(?)", queue_name]).to_i end |
#pgq_event_failed(batch_id, event_id, reason) ⇒ Object
failed/retry
47 48 49 |
# File 'lib/pgq/api.rb', line 47 def pgq_event_failed(batch_id, event_id, reason) connection.select_value(sanitize_sql_array ["SELECT pgq.event_failed(?, ?, ?)", batch_id, event_id, reason]).to_i end |
#pgq_event_retry(batch_id, event_id, retry_seconds) ⇒ Object
51 52 53 |
# File 'lib/pgq/api.rb', line 51 def pgq_event_retry(batch_id, event_id, retry_seconds) connection.select_value(sanitize_sql_array ["SELECT pgq.event_retry(?, ?, ?)", batch_id, event_id, retry_seconds]).to_i end |
#pgq_failed_event_count(queue_name, consumer) ⇒ Object
65 66 67 68 |
# File 'lib/pgq/api.rb', line 65 def pgq_failed_event_count(queue_name, consumer) res = connection.select_value(sanitize_sql_array ["SELECT * FROM pgq.failed_event_count(?, ?)", queue_name, consumer]) res ? res.to_i : nil end |
#pgq_failed_event_delete(queue_name, consumer, event_id) ⇒ Object
61 62 63 |
# File 'lib/pgq/api.rb', line 61 def pgq_failed_event_delete(queue_name, consumer, event_id) connection.select_value(sanitize_sql_array ["SELECT * FROM pgq.failed_event_delete(?, ?, ?)", queue_name, consumer, event_id]) end |
#pgq_failed_event_list(queue_name, consumer, limit = nil, offset = nil, order = 'desc') ⇒ Object
70 71 72 73 |
# File 'lib/pgq/api.rb', line 70 def pgq_failed_event_list queue_name, consumer, limit = nil, offset = nil, order = 'desc' order = (order.to_s == 'desc') ? order : 'asc' connection.select_all(sanitize_sql_array ["SELECT * FROM pgq.failed_event_list(?, ?, ?, ?) order by ev_id #{order}", queue_name, consumer, limit.to_i, offset.to_i]) end |
#pgq_failed_event_retry(queue_name, consumer, event_id) ⇒ Object
failed events
57 58 59 |
# File 'lib/pgq/api.rb', line 57 def pgq_failed_event_retry(queue_name, consumer, event_id) connection.select_value(sanitize_sql_array ["SELECT * FROM pgq.failed_event_retry(?, ?, ?)", queue_name, consumer, event_id]) end |
#pgq_finish_batch(batch_id) ⇒ Object
41 42 43 |
# File 'lib/pgq/api.rb', line 41 def pgq_finish_batch(batch_id) connection.select_value(sanitize_sql_array ["SELECT pgq.finish_batch(?)", batch_id]) end |
#pgq_get_batch_events(batch_id) ⇒ Object
37 38 39 |
# File 'lib/pgq/api.rb', line 37 def pgq_get_batch_events(batch_id) connection.select_all(sanitize_sql_array ["SELECT * FROM pgq.get_batch_events(?)", batch_id]) end |
#pgq_get_consumer_info ⇒ Object
87 88 89 |
# File 'lib/pgq/api.rb', line 87 def pgq_get_consumer_info connection.select_all("SELECT *, EXTRACT(epoch FROM last_seen) AS last_seen_sec, EXTRACT(epoch FROM lag) AS lag_sec FROM pgq.get_consumer_info()") end |
#pgq_get_consumer_queue_info(queue_name) ⇒ Object
91 92 93 |
# File 'lib/pgq/api.rb', line 91 def pgq_get_consumer_queue_info(queue_name) connection.select_one(sanitize_sql_array ["SELECT *, EXTRACT(epoch FROM last_seen) AS last_seen_sec, EXTRACT(epoch FROM lag) AS lag_sec FROM pgq.get_consumer_info(?)", queue_name]) || {} end |
#pgq_get_queue_info(queue_name) ⇒ Object
info methods
77 78 79 |
# File 'lib/pgq/api.rb', line 77 def pgq_get_queue_info(queue_name) connection.select_value(sanitize_sql_array ["SELECT pgq.get_queue_info(?)", queue_name]) end |
#pgq_get_queues_info ⇒ Object
Get list of queues. Result: (queue_name, queue_ntables, queue_cur_table, queue_rotation_period, queue_switch_time, queue_external_ticker, queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period, ticker_lag)
83 84 85 |
# File 'lib/pgq/api.rb', line 83 def pgq_get_queues_info connection.select_values("SELECT pgq.get_queue_info()") end |
#pgq_insert_event(queue_name, ev_type, ev_data, ev_extra1 = nil, ev_extra2 = nil, ev_extra3 = nil, ev_extra4 = nil) ⇒ Object
insert events
24 25 26 27 28 |
# File 'lib/pgq/api.rb', line 24 def pgq_insert_event(queue_name, ev_type, ev_data, ev_extra1 = nil, ev_extra2 = nil, ev_extra3 = nil, ev_extra4 = nil) result = connection.select_value(sanitize_sql_array ["SELECT pgq.insert_event(?, ?, ?, ?, ?, ?, ?)", queue_name, ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4]) result ? result.to_i : nil end |
#pgq_next_batch(queue_name, consumer_id) ⇒ Object
consuming
32 33 34 35 |
# File 'lib/pgq/api.rb', line 32 def pgq_next_batch(queue_name, consumer_id) result = connection.select_value(sanitize_sql_array ["SELECT pgq.next_batch(?, ?)", queue_name, consumer_id]) result ? result.to_i : nil end |
#pgq_register_consumer(queue_name, consumer_id) ⇒ Object
14 15 16 |
# File 'lib/pgq/api.rb', line 14 def pgq_register_consumer(queue_name, consumer_id) connection.select_value(sanitize_sql_array ["SELECT pgq.register_consumer(?, ?)", queue_name, consumer_id]).to_i end |
#pgq_unregister_consumer(queue_name, consumer_id) ⇒ Object
18 19 20 |
# File 'lib/pgq/api.rb', line 18 def pgq_unregister_consumer(queue_name, consumer_id) connection.select_value(sanitize_sql_array ["SELECT pgq.unregister_consumer(?, ?)", queue_name, consumer_id]).to_i end |