Module: ElasticUtil
- Defined in:
- lib/elastic_util.rb
Overview
This module provides a way to backup and restore elasticsearch data.
Defined Under Namespace
Classes: Error
Constant Summary collapse
- VERSION =
"0.1.5"
- DUMP_DIR =
The name of the data directory, relative to the user provided backup directory.
"es_data"
Class Method Summary collapse
-
.backup(url, backup_dir, opts = {}) ⇒ true
Backup elasticsearch data to a local directory.
-
.restore(url, backup_dir, opts = {}) ⇒ true
Restore elasticsearch data from a backup.
-
.save_bulk_data(path, hits, file_index = nil, opts = {}) ⇒ Object
:nodoc:.
Class Method Details
.backup(url, backup_dir, opts = {}) ⇒ true
Backup elasticsearch data to a local directory.
This uses ElasticSearch’s scroll api to fetch all records for indices and write the data to a local directory. The files it generates are given a .json.data extension. They are not valid JSON files, but rather are in the format expected by ElasticSearch’s _bulk api.
So #restore simply has to POST the contents of each file.
Use the :size option to change the number or results to fetch at once, and also the size of the data files generated. The latter correlates to the of the the api requests made in #restore.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/elastic_util.rb', line 53 def self.backup(url, backup_dir, opts={}) start_time = Time.now backup_dir = backup_dir.strip path = File.join(backup_dir.strip, DUMP_DIR) indices = [] # ping it first uri = URI(url) response = Net::HTTP.get_response(uri) http = Net::HTTP.new(uri.host, uri.port) http.read_timeout = (60*3) http.open_timeout = 5 response = http.start() {|http| http.get("/") } if !response.is_a?(Net::HTTPSuccess) raise Error, "Unable to reach elasticsearch at url '#{url}'!\n#{response.inspect}\n#{response.body.to_s}" end # determine indices to backup, default is everything. if opts[:indices] indices = opts[:indices] else uri = URI(url + "/_cat/indices?format=json") response = Net::HTTP.get_response(uri) if !response.is_a?(Net::HTTPSuccess) raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}" end json_response = JSON.parse(response.body) json_response.each do |record| indices.push(record['index']) end end if opts[:exclude_indices] indices = indices.reject {|it| opts[:exclude_indices].include?(it) } end if indices.empty? raise Error, "no indices to back up!" end opts[:scroll] ||= '5m' opts[:size] ||= 1000 # exclude _id by default. if !opts.key?(:exclude_fields) opts[:exclude_fields] = ['_id'] end # validate backup path if File.exists?(path) if opts[:force] FileUtils.rmtree(path) else raise Error, "backup path '#{path}' already exists! Delete it first or use --force" end end FileUtils.mkdir_p(path) # dump data indices.each_with_index do |index_name, i| puts "(#{i+1}/#{indices.size}) backing up index #{index_name}" unless opts[:quiet] # initial request file_index = 0 uri = URI(url + "/#{index_name}/_search") params = { :format => "json", :scroll => opts[:scroll], :size => opts[:size], :sort => ["_doc"] } uri.query = URI.encode_www_form(params) # puts "HTTP REQUEST #{uri.inspect}" response = Net::HTTP.get_response(uri) if !response.is_a?(Net::HTTPSuccess) raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}" end json_response = JSON.parse(response.body) raise Error, "No scroll_id returned in response:\n#{response.inspect}" unless json_response['_scroll_id'] scroll_id = json_response['_scroll_id'] hits = json_response['hits']['hits'] save_bulk_data(path, hits, nil, opts) file_index = 1 # scroll requests while !hits.empty? uri = URI(url + "/_search/scroll") params = { :scroll_id => scroll_id, :scroll => opts[:scroll] } uri.query = URI.encode_www_form(params) # puts "HTTP REQUEST #{uri.inspect}" response = Net::HTTP.get_response(uri) if !response.is_a?(Net::HTTPSuccess) raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}" end json_response = JSON.parse(response.body) raise Error, "No scroll_id returned in response:\n#{response.inspect}\n#{response.body.to_s}" unless json_response['_scroll_id'] scroll_id = json_response['_scroll_id'] hits = json_response['hits']['hits'] save_bulk_data(path, hits, file_index, opts) file_index += 1 end end puts "Finished backup of elasticsearch #{url} to directory #{backup_dir} (took #{(Time.now-start_time).round(3)}s)" unless opts[:quiet] return true end |
.restore(url, backup_dir, opts = {}) ⇒ true
Restore elasticsearch data from a backup. This will do a POST to the _bulk api for each file in the backup directory.
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/elastic_util.rb', line 177 def self.restore(url, backup_dir, opts={}) start_time = Time.now backup_dir = backup_dir.strip path = File.join(backup_dir.strip, DUMP_DIR) # validate backup path if !Dir.exists?(path) raise Error, "backup path '#{backup_dir}' does not exist!" end # ping it first uri = URI(url) response = Net::HTTP.get_response(uri) http = Net::HTTP.new(uri.host, uri.port) http.read_timeout = (60*3) http.open_timeout = 5 response = http.start() {|http| http.get("/") } if !response.is_a?(Net::HTTPSuccess) raise Error, "Unable to reach elasticsearch at url '#{url}'!\n#{response.inspect}\n#{response.body.to_s}" end # find files to import found_files = Dir[File.join(path, '**', '*.json.data' )] if found_files.empty? raise Error, "backup path '#{backup_dir}' does not exist!" else puts "Found #{found_files.size} files to import" unless opts[:quiet] end # bulk api request for each file found_files.each_with_index do |file, i| puts "(#{i+1}/#{found_files.size}) bulk importing file #{file}" unless opts[:quiet] payload = File.read(file) # uri = URI(url) http = Net::HTTP.new(uri.host, uri.port) response = http.post("/_bulk", payload) if !response.is_a?(Net::HTTPSuccess) raise Error, "HTTP request failure!\n#{response.inspect}\n#{response.body.to_s}" end end puts "Finished restore of elasticsearch #{url} with backup #{backup_dir} (took #{(Time.now-start_time).round(3)}s)" unless opts[:quiet] return true end |
.save_bulk_data(path, hits, file_index = nil, opts = {}) ⇒ Object
:nodoc:
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/elastic_util.rb', line 225 def self.save_bulk_data(path, hits, file_index=nil, opts={}) # :nodoc: if hits && !hits.empty? hits.each do |hit| index_name = hit['_index'] index_type = hit['_type'] dir_name = File.join(path, index_name) FileUtils.mkdir_p(dir_name) file_name = File.join(dir_name, index_type) + (file_index ? "_#{file_index}" : "") + ".json.data" # prepare record for bulk api injection action_json = {'index' => { '_index' => hit['_index'], '_type' => hit['_type'], '_id' => hit['_id'] } } source_json = hit['_source'] if opts[:exclude_fields] && source_json opts[:exclude_fields].each do |field| source_json.delete(field) end end File.open(file_name, 'a') do |file| file.write JSON.generate(action_json) + "\n" + JSON.generate(source_json) + "\n" end end end end |