Class: ScoutAgent::Database::Queue

Inherits:
ScoutAgent::Database show all
Defined in:
lib/scout_agent/database/queue.rb

Overview

This database is used to stored messages queued from external processes. Such messages can be data for missions or full reports ready for submission to the Scout server.

Constant Summary collapse

QUEUE_LIMIT =

A size limit for the queue to prevent data from building up.

3000

Instance Attribute Summary

Attributes inherited from ScoutAgent::Database

#log

Instance Method Summary collapse

Methods inherited from ScoutAgent::Database

#initialize, load, #locked?, #maintain, #migrate, #path, path, #prepare_connection, #query, #read_from_sqlite, #read_locked?, #schema_version, #write_locked?, #write_to_sqlite

Constructor Details

This class inherits a constructor from ScoutAgent::Database

Instance Method Details

#dequeue(*ids) ⇒ Object

Removes queued messages from the database by their ids. Returns true if the removal succeeded, or false otherwise.



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/scout_agent/database/queue.rb', line 146

def dequeue(*ids)
  return true if ids.empty?
  write_to_sqlite do |sqlite|
    sqlite.execute("    DELETE FROM queue WHERE ROWID IN (\#{(['?'] * ids.size).join(', ')})\n    END_DELETE_QUEUED\n  end\n  true\nrescue Amalgalite::SQLite3::Error => error  # failed to remove messages\n  # \n  # do nothing:  messages will be delivered again,\n  #              mission can block duplicate\n  # \n  log.error(\"Database dequeuing error:  \#{error.message}.\")\n  false\nend\n".trim, *ids)

#enqueue(mission_id, fields) ⇒ Object

Adds a message to the queue. The passed mission_id needs to be an Integer ID for a mission or one of the Strings 'report', 'hint', 'alert', or 'error' for a full report. The fields parameter is expected to be a Hash of fields and should include a 'plugin_id' key identifying what the data is for when the message is a report for the server.



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/scout_agent/database/queue.rb', line 44

def enqueue(mission_id, fields)
  write_to_sqlite do |sqlite|
    sqlite.execute("    INSERT INTO queue(mission_id, fields) VALUES(?, ?)\n    END_ENQUEUE\n  end\n  true\nrescue Amalgalite::SQLite3::Error => error  # failed to enqueue message\n  log.error(\"Database queuing error:  \#{error.message}.\")\n  false  # reject bad message\nend\n".trim, mission_id, fields.to_json)

#peek(mission_id) ⇒ Object

Returns a message (id, fields, and created_at) queued for mission_id. The fields are JSON parsed if possible and created_at is converted to a Time object. This method will return nil if no messages are queued for mission_id.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/scout_agent/database/queue.rb', line 62

def peek(mission_id)
  queued = read_from_sqlite { |sqlite|
    sqlite.first_row_from("    SELECT ROWID AS id, fields, created_at FROM queue WHERE mission_id = ?\n    END_FIND_QUEUED\n  }\n  if queued.empty?\n    nil  # not found\n  else\n    begin\n      queued[:fields] = JSON.parse(queued[:fields].to_s)\n    rescue JSON::ParserError  # failed to parse\n      # leave for mission to decode it\n      log.warn(\"Queued fields malformed.\")\n    end\n    if created = Time.from_db_s(queued[:created_at])\n      queued[:created_at] = created\n    else\n      log.warn(\"Queued timestamp missing.\")\n    end\n    queued\n  end\nrescue Amalgalite::SQLite3::Error => error  # failed to retrieve message\n  log.error(\"Database peeking error:  \#{error.message}.\")\n  nil  # not found\nend\n".trim, mission_id.to_s)

#queued_reportsObject

This method returns queued reports intended for the Scout server.

The process is pretty much identical to how mission generated reports are pulled. See ScoutAgent::Database::MissionLog#current_reports() for details.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/scout_agent/database/queue.rb', line 96

def queued_reports
  write_to_sqlite { |sqlite|
    # read the current reports
    begin
      report_ids = Array.new
      reports    = query("      SELECT     ROWID AS id, mission_id AS type, fields, created_at\n      FROM       queue\n      WHERE      mission_id IN ('report', 'hint', 'alert', 'error')\n      ORDER BY   created_at\n      LIMIT      500\n      END_FIND_REPORTS\n        begin\n          row[:fields] = JSON.parse(row[:fields].to_s)\n          if row[:fields].include? \"plugin_id\"\n            row[:plugin_id] = row[:fields].delete(\"plugin_id\")\n          end\n        rescue JSON::ParserError  # failed to parse\n          # skip the transform since we can't parse it\n          log.warn(\"Queued fields malformed.\")\n        end\n        if created = Time.from_db_s(row[:created_at])\n          row[:created_at] = created.utc.to_db_s\n        else\n          log.warn(\"Queued timestamp missing.\")\n        end\n        report_ids << row.delete_at(:id)\n      }\n    rescue Amalgalite::SQLite3::Error => error  # failed to find reports\n      log.error(\"Database queued reports error:  \#{error.message}.\")\n      return Array.new  # return empty results\n    end\n    return reports if reports.empty?\n    # delete the reports we read\n    unless dequeue(*report_ids)\n      # cancel sending this batch\n      sqlite.rollback   # we can't submit unless we're sure they are gone\n      return Array.new  # return empty results\n    end\n    reports  # the reports ready for sending\n  }\nrescue Amalgalite::SQLite3::Error => error  # failed to get a write lock\n  # try again to read reports later\n  log.error(\"Database queued reports locking error:  \#{error.message}.\")\nend\n".trim) { |row|

#update_schema(version = schema_version) ⇒ Object

Builds a schema for the queue table. This table is size controlled by a trigger to prevent infinite data growth.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/scout_agent/database/queue.rb', line 19

def update_schema(version = schema_version)
  case version
  when 0
    "    CREATE TABLE queue (\n      mission_id TEXT NOT NULL\n        CHECK( mission_id IN ('report', 'hint', 'alert', 'error') OR\n               CAST(mission_id AS 'integer') > 0 ),\n      fields     REQUIRED_TEXT_TYPE,\n      created_at DATETIME_TYPE\n    );\n    DEFAULT_LOCALTIME_TRIGGER queue created_at\n    LIMIT_TABLE_SIZE_TRIGGER  queue \#{QUEUE_LIMIT}\n    END_INITIAL_SCHEMA\n  end\nend\n".trim