Class: ScoutAgent::Database::Queue
- Inherits:
-
ScoutAgent::Database
- Object
- ScoutAgent::Database
- ScoutAgent::Database::Queue
- Defined in:
- lib/scout_agent/database/queue.rb
Overview
This database is used to stored messages queued from external processes. Such messages can be data for missions or full reports ready for submission to the Scout server.
Constant Summary collapse
- QUEUE_LIMIT =
A size limit for the queue to prevent data from building up.
3000
Instance Attribute Summary
Attributes inherited from ScoutAgent::Database
Instance Method Summary collapse
-
#dequeue(*ids) ⇒ Object
Removes queued messages from the database by their
ids
. -
#enqueue(mission_id, fields) ⇒ Object
Adds a message to the queue.
-
#peek(mission_id) ⇒ Object
Returns a message (
id
,fields
, andcreated_at
) queued formission_id
. -
#queued_reports ⇒ Object
This method returns queued reports intended for the Scout server.
-
#update_schema(version = schema_version) ⇒ Object
Builds a schema for the queue table.
Methods inherited from ScoutAgent::Database
#initialize, load, #locked?, #maintain, #migrate, #path, path, #prepare_connection, #query, #read_from_sqlite, #read_locked?, #schema_version, #write_locked?, #write_to_sqlite
Constructor Details
This class inherits a constructor from ScoutAgent::Database
Instance Method Details
#dequeue(*ids) ⇒ Object
Removes queued messages from the database by their ids
. Returns true
if the removal succeeded, or false
otherwise.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/scout_agent/database/queue.rb', line 146 def dequeue(*ids) return true if ids.empty? write_to_sqlite do |sqlite| sqlite.execute(" DELETE FROM queue WHERE ROWID IN (\#{(['?'] * ids.size).join(', ')})\n END_DELETE_QUEUED\n end\n true\nrescue Amalgalite::SQLite3::Error => error # failed to remove messages\n # \n # do nothing: messages will be delivered again,\n # mission can block duplicate\n # \n log.error(\"Database dequeuing error: \#{error.message}.\")\n false\nend\n".trim, *ids) |
#enqueue(mission_id, fields) ⇒ Object
Adds a message to the queue. The passed mission_id
needs to be an Integer ID for a mission or one of the Strings 'report'
, 'hint'
, 'alert'
, or 'error'
for a full report. The fields
parameter is expected to be a Hash of fields and should include a 'plugin_id'
key identifying what the data is for when the message is a report for the server.
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/scout_agent/database/queue.rb', line 44 def enqueue(mission_id, fields) write_to_sqlite do |sqlite| sqlite.execute(" INSERT INTO queue(mission_id, fields) VALUES(?, ?)\n END_ENQUEUE\n end\n true\nrescue Amalgalite::SQLite3::Error => error # failed to enqueue message\n log.error(\"Database queuing error: \#{error.message}.\")\n false # reject bad message\nend\n".trim, mission_id, fields.to_json) |
#peek(mission_id) ⇒ Object
Returns a message (id
, fields
, and created_at
) queued for mission_id
. The fields
are JSON parsed if possible and created_at
is converted to a Time object. This method will return nil
if no messages are queued for mission_id
.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/scout_agent/database/queue.rb', line 62 def peek(mission_id) queued = read_from_sqlite { |sqlite| sqlite.first_row_from(" SELECT ROWID AS id, fields, created_at FROM queue WHERE mission_id = ?\n END_FIND_QUEUED\n }\n if queued.empty?\n nil # not found\n else\n begin\n queued[:fields] = JSON.parse(queued[:fields].to_s)\n rescue JSON::ParserError # failed to parse\n # leave for mission to decode it\n log.warn(\"Queued fields malformed.\")\n end\n if created = Time.from_db_s(queued[:created_at])\n queued[:created_at] = created\n else\n log.warn(\"Queued timestamp missing.\")\n end\n queued\n end\nrescue Amalgalite::SQLite3::Error => error # failed to retrieve message\n log.error(\"Database peeking error: \#{error.message}.\")\n nil # not found\nend\n".trim, mission_id.to_s) |
#queued_reports ⇒ Object
This method returns queued reports intended for the Scout server.
The process is pretty much identical to how mission generated reports are pulled. See ScoutAgent::Database::MissionLog#current_reports() for details.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/scout_agent/database/queue.rb', line 96 def queued_reports write_to_sqlite { |sqlite| # read the current reports begin report_ids = Array.new reports = query(" SELECT ROWID AS id, mission_id AS type, fields, created_at\n FROM queue\n WHERE mission_id IN ('report', 'hint', 'alert', 'error')\n ORDER BY created_at\n LIMIT 500\n END_FIND_REPORTS\n begin\n row[:fields] = JSON.parse(row[:fields].to_s)\n if row[:fields].include? \"plugin_id\"\n row[:plugin_id] = row[:fields].delete(\"plugin_id\")\n end\n rescue JSON::ParserError # failed to parse\n # skip the transform since we can't parse it\n log.warn(\"Queued fields malformed.\")\n end\n if created = Time.from_db_s(row[:created_at])\n row[:created_at] = created.utc.to_db_s\n else\n log.warn(\"Queued timestamp missing.\")\n end\n report_ids << row.delete_at(:id)\n }\n rescue Amalgalite::SQLite3::Error => error # failed to find reports\n log.error(\"Database queued reports error: \#{error.message}.\")\n return Array.new # return empty results\n end\n return reports if reports.empty?\n # delete the reports we read\n unless dequeue(*report_ids)\n # cancel sending this batch\n sqlite.rollback # we can't submit unless we're sure they are gone\n return Array.new # return empty results\n end\n reports # the reports ready for sending\n }\nrescue Amalgalite::SQLite3::Error => error # failed to get a write lock\n # try again to read reports later\n log.error(\"Database queued reports locking error: \#{error.message}.\")\nend\n".trim) { |row| |
#update_schema(version = schema_version) ⇒ Object
Builds a schema for the queue table. This table is size controlled by a trigger to prevent infinite data growth.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/scout_agent/database/queue.rb', line 19 def update_schema(version = schema_version) case version when 0 " CREATE TABLE queue (\n mission_id TEXT NOT NULL\n CHECK( mission_id IN ('report', 'hint', 'alert', 'error') OR\n CAST(mission_id AS 'integer') > 0 ),\n fields REQUIRED_TEXT_TYPE,\n created_at DATETIME_TYPE\n );\n DEFAULT_LOCALTIME_TRIGGER queue created_at\n LIMIT_TABLE_SIZE_TRIGGER queue \#{QUEUE_LIMIT}\n END_INITIAL_SCHEMA\n end\nend\n".trim |