Module: Esse::Index::ClassMethods
- Included in:
- Esse::Index
- Defined in:
- lib/esse/index/base.rb,
lib/esse/index/type.rb,
lib/esse/index/search.rb,
lib/esse/index/actions.rb,
lib/esse/index/aliases.rb,
lib/esse/index/indices.rb,
lib/esse/index/plugins.rb,
lib/esse/index/mappings.rb,
lib/esse/index/settings.rb,
lib/esse/index/documents.rb,
lib/esse/index/attributes.rb,
lib/esse/index/descendants.rb,
lib/esse/index/inheritance.rb
Constant Summary collapse
- CREATE_INDEX_RESERVED_KEYWORDS =
{ alias: true, }.freeze
- INDEX_SIMPLIFIED_SETTINGS =
Elasticsearch supports passing index.* related settings directly in the body of the request. We are moving it to the index key to make it more explicit and to be the source-of-truth when merging settings. So the settings ‘{ number_of_shards: 1 }` will be transformed to `{ index: { number_of_shards: 1 } }`
%i[ number_of_shards number_of_replicas refresh_interval mapping ].freeze
- TEMPLATE_DIRS =
[ '%<dirname>s/templates', '%<dirname>s' ].freeze
Instance Attribute Summary collapse
-
#abstract_class ⇒ Object
Set this to
trueif this is an abstract class. -
#plugins ⇒ Object
readonly
Returns the value of attribute plugins.
- #repo_hash ⇒ Object
Instance Method Summary collapse
- #abstract_class? ⇒ Boolean
-
#aliases(**options) ⇒ Object
Get the aliases for the index.
-
#bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: nil, **options) ⇒ Array<Esse::Import::RequestBody>
Performs multiple indexing or delete operations in a single API call.
- #bulk_wait_interval ⇒ Object
- #bulk_wait_interval=(value) ⇒ Object
-
#close(suffix: nil, **options) ⇒ Object
Close an index (keep the data on disk, but deny operations with the index).
-
#cluster ⇒ Esse::Cluster
An instance of cluster based on its cluster_id.
-
#cluster_id ⇒ Symbol
Reads the @cluster_id instance variable or :default.
-
#cluster_id=(source) ⇒ Symbol
Sets the client_id associated with the Index class.
-
#count(type: nil, suffix: nil, **options) ⇒ Integer
Gets the number of matches for a search query.
-
#create_index(suffix: nil, body: nil, settings: nil, **options) ⇒ Hash
Creates index and applies mappings and settings.
-
#delete(doc = nil, suffix: nil, **options) ⇒ Object
Removes a JSON document from the specified index.
-
#delete_by_query(suffix: nil, **options) ⇒ Hash
Delete documents by query.
-
#delete_index(suffix: nil, **options) ⇒ Hash
Deletes an existing index.
-
#descendants ⇒ Object
:nodoc:.
-
#exist?(doc = nil, suffix: nil, **options) ⇒ Boolean
Check if a JSON document exists.
-
#get(doc = nil, suffix: nil, **options) ⇒ Hash
Retrieves the specified JSON document from an index.
-
#import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_lazy_attributes: false, preload_lazy_attributes: false, suffix: nil, **options) ⇒ Numeric
Resolve collection and index data.
-
#index(doc = nil, suffix: nil, **options) ⇒ Hash
Adds a JSON document to the specified index and makes it searchable.
- #index_directory ⇒ Object
-
#index_exist?(suffix: nil) ⇒ Boolean
Checks the index existance.
- #index_name(suffix: nil) ⇒ Object
- #index_name=(value) ⇒ Object
- #index_name? ⇒ Boolean
- #index_prefix ⇒ Object
- #index_prefix=(value) ⇒ Object
- #index_suffix ⇒ Object
- #index_suffix=(value) ⇒ Object
-
#indices_pointing_to_alias(**options) ⇒ Object
Return list of real index names for the virtual index name(alias).
- #inherited(subclass) ⇒ Object
- #inspect ⇒ Object
- #mapping_single_type=(value) ⇒ Object
- #mapping_single_type? ⇒ Boolean
-
#mappings(hash = {}, &block) ⇒ Object
This method is only used to define mapping.
- #mappings_hash ⇒ Object
-
#mget(ids:, suffix: nil, **options) ⇒ Hash
Retrieves multiple JSON documents by ID from an index.
-
#open(suffix: nil, **options) ⇒ Object
Open a previously closed index.
- #plugin(plugin, **kwargs, &block) ⇒ Object
-
#refresh(suffix: nil, **options) ⇒ Object
Performs the refresh operation in one or more indices.
-
#reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, **options) ⇒ Object
Copies documents from a source to a destination.
- #repo(name = nil) ⇒ Object
- #repo?(name = nil) ⇒ Boolean
- #repository(repo_name, *_args, **kwargs, &block) ⇒ Object
-
#reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, refresh: nil, **options) ⇒ Hash
Deletes, creates and imports data to the index.
- #search(*args, &block) ⇒ Object
-
#settings(hash = {}, &block) ⇒ Object
Define /_settings definition by each index.
- #settings_hash(settings: nil) ⇒ Object
- #template_dirs ⇒ Object
- #uname ⇒ Object
-
#update(doc = nil, suffix: nil, **options) ⇒ Hash
Updates a document using the specified script.
-
#update_aliases(suffix:, **options) ⇒ Hash
Replaces all existing aliases by the respective suffixed index from argument.
-
#update_by_query(suffix: nil, **options) ⇒ Hash
Update documents by query.
-
#update_mapping(suffix: nil, **options) ⇒ Object
Updates index mappings.
-
#update_settings(suffix: nil, settings: nil, **options) ⇒ Object
Updates index settings.
Instance Attribute Details
#abstract_class ⇒ Object
Set this to true if this is an abstract class
7 8 9 |
# File 'lib/esse/index/inheritance.rb', line 7 def abstract_class @abstract_class end |
#plugins ⇒ Object (readonly)
Returns the value of attribute plugins.
6 7 8 |
# File 'lib/esse/index/plugins.rb', line 6 def plugins @plugins end |
#repo_hash ⇒ Object
8 9 10 |
# File 'lib/esse/index/type.rb', line 8 def repo_hash @repo_hash ||= {} end |
Instance Method Details
#abstract_class? ⇒ Boolean
9 10 11 12 13 |
# File 'lib/esse/index/inheritance.rb', line 9 def abstract_class? return @abstract_class == true if defined?(@abstract_class) !index_name? end |
#aliases(**options) ⇒ Object
Get the aliases for the index.
7 8 9 10 11 12 13 14 15 |
# File 'lib/esse/index/aliases.rb', line 7 def aliases(**) response = cluster.api.aliases(**, index: index_name, name: '*') idx_name = response.keys.find { |idx| idx.start_with?(index_name) } return [] unless idx_name response.dig(idx_name, 'aliases')&.keys || [] rescue Esse::Transport::NotFoundError [] end |
#bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: nil, **options) ⇒ Array<Esse::Import::RequestBody>
Performs multiple indexing or delete operations in a single API call. This reduces overhead and can greatly increase indexing speed.
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/esse/index/documents.rb', line 201 def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: nil, **) definition = { index: index_name(suffix: suffix), type: type, }.merge() cluster.may_update_type!(definition) to_index = [] to_create = [] to_update = [] to_delete = [] Esse::ArrayUtils.wrap(index).each do |doc| if doc.is_a?(Hash) to_index << doc elsif Esse.document?(doc) && !doc.ignore_on_index? hash = doc.to_bulk hash[:_type] ||= type if type hash = request_params_for(:index, doc, bulk: true).merge(hash) if request_params_for?(:index) to_index << hash end end Esse::ArrayUtils.wrap(create).each do |doc| if doc.is_a?(Hash) to_create << doc elsif Esse.document?(doc) && !doc.ignore_on_index? hash = doc.to_bulk hash[:_type] ||= type if type hash = request_params_for(:create, doc, bulk: true).merge(hash) if request_params_for?(:create) to_create << hash end end Esse::ArrayUtils.wrap(update).each do |doc| if doc.is_a?(Hash) to_update << doc elsif Esse.document?(doc) && !doc.ignore_on_index? hash = doc.to_bulk(operation: :update) hash[:_type] ||= type if type hash = request_params_for(:update, doc, bulk: true).merge(hash) if request_params_for?(:update) to_update << hash end end Esse::ArrayUtils.wrap(delete).each do |doc| if doc.is_a?(Hash) to_delete << doc elsif Esse.document?(doc) && !doc.ignore_on_delete? hash = doc.to_bulk(data: false) hash[:_type] ||= type if type hash = request_params_for(:delete, doc, bulk: true).merge(hash) if request_params_for?(:delete) to_delete << hash end end # @TODO Wrap the return in a some other Stats object with more information Esse::Import::Bulk.new( create: to_create, delete: to_delete, index: to_index, update: to_update, ).each_request do |request_body| cluster.api.bulk(**definition, body: request_body.body) do |event_payload| event_payload[:body_stats] = request_body.stats if bulk_wait_interval > 0 event_payload[:wait_interval] = bulk_wait_interval sleep(bulk_wait_interval) else event_payload[:wait_interval] = 0.0 end end end end |
#bulk_wait_interval ⇒ Object
67 68 69 |
# File 'lib/esse/index/attributes.rb', line 67 def bulk_wait_interval @bulk_wait_interval || Esse.config.bulk_wait_interval end |
#bulk_wait_interval=(value) ⇒ Object
71 72 73 |
# File 'lib/esse/index/attributes.rb', line 71 def bulk_wait_interval=(value) @bulk_wait_interval = value.to_f end |
#close(suffix: nil, **options) ⇒ Object
Close an index (keep the data on disk, but deny operations with the index).
174 175 176 |
# File 'lib/esse/index/indices.rb', line 174 def close(suffix: nil, **) cluster.api.close(index: index_name(suffix: suffix), **) end |
#cluster ⇒ Esse::Cluster
Returns an instance of cluster based on its cluster_id.
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/esse/index/base.rb', line 54 def cluster unless Esse.config.cluster_ids.include?(cluster_id) raise NotImplementedError, <<~MSG There is no cluster configured for this index. Use `Esse.config.cluster(cluster_id) { ... }' define the elasticsearch client connection. MSG end Esse.synchronize { Esse.config.cluster(cluster_id) } end |
#cluster_id ⇒ Symbol
Returns reads the @cluster_id instance variable or :default.
49 50 51 |
# File 'lib/esse/index/base.rb', line 49 def cluster_id @cluster_id || Config::DEFAULT_CLUSTER_ID end |
#cluster_id=(source) ⇒ Symbol
Sets the client_id associated with the Index class. This can be used directly on Esse::Index to set the :default es cluster to be used by subclasses, or to override the es client used for specific indices:
Esse::Index.cluster_id = :v1
ArtistIndex = Class.new(Esse::Index)
ArtistIndex.cluster_id = :v2
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/esse/index/base.rb', line 15 def cluster_id=(source) if source.nil? @cluster_id = nil return end valid_ids = Esse.config.cluster_ids new_id = \ case source when Esse::Cluster source.id when String, Symbol id = source.to_sym id if valid_ids.include?(id) end msg = <<~MSG We could not resolve the index cluster using the argument %<arg>p. \n It must be previously defined in the `Esse.config.cluster(%<arg>p) { ... }' settings. \n Here is the list of cluster ids we have configured: %<ids>s\n You can ignore this cluster id entirely. That way the :default id will be used.\n Example: \n class UsersIndex < Esse::Index\n end\n MSG unless new_id raise ArgumentError.new, format(msg, arg: source, ids: valid_ids.map(&:inspect).join(', ')) end @cluster_id = new_id end |
#count(type: nil, suffix: nil, **options) ⇒ Integer
Gets the number of matches for a search query.
UsersIndex.count # 999
UsersIndex.count(body: { ... }) # 32
98 99 100 101 102 103 104 105 |
# File 'lib/esse/index/documents.rb', line 98 def count(type: nil, suffix: nil, **) params = { index: index_name(suffix: suffix), type: type, } cluster.may_update_type!(params) cluster.api.count(**, **params)['count'] end |
#create_index(suffix: nil, body: nil, settings: nil, **options) ⇒ Hash
Creates index and applies mappings and settings.
UsersIndex.create_index # creates index named `<cluster.index_prefix>users<index_suffix>`
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/esse/index/indices.rb', line 29 def create_index(suffix: nil, body: nil, settings: nil, **) = CREATE_INDEX_RESERVED_KEYWORDS.merge() name = build_real_index_name(suffix) definition = body || [settings_hash(settings: settings), mappings_hash].reduce(&:merge) index_alias = .delete(:alias) if index_alias && name != index_name definition[:aliases] = { index_name => {} } end retried = false begin cluster.api.create_index(index: name, body: definition, **) rescue Esse::Transport::BadRequestError => e if retried == false && e..include?('exists with the same name') && index_alias == :force cluster.api.delete_index(index: index_name) retried = true retry end raise end end |
#delete(doc = nil, suffix: nil, **options) ⇒ Object
Removes a JSON document from the specified index.
UsersIndex.delete(id: 1) # true
UsersIndex.delete(id: 'missing') # false
120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/esse/index/documents.rb', line 120 def delete(doc = nil, suffix: nil, **) if document?(doc) = request_params_for(:delete, doc).merge() if request_params_for?(:delete) [:id] = doc.id [:type] = doc.type if doc.type? [:routing] = doc.routing if doc.routing? end require_kwargs!(, :id) [:index] = index_name(suffix: suffix) cluster.may_update_type!() cluster.api.delete(**) end |
#delete_by_query(suffix: nil, **options) ⇒ Hash
Delete documents by query
362 363 364 365 366 367 368 |
# File 'lib/esse/index/documents.rb', line 362 def delete_by_query(suffix: nil, **) definition = { index: index_name(suffix: suffix), }.merge() cluster.may_update_type!(definition) cluster.api.delete_by_query(**definition) end |
#delete_index(suffix: nil, **options) ⇒ Hash
Deletes an existing index.
UsersIndex.delete_index # deletes `<cluster.index_prefix>users<index_suffix>` index
156 157 158 159 160 |
# File 'lib/esse/index/indices.rb', line 156 def delete_index(suffix: nil, **) index = suffix ? index_name(suffix: suffix) : indices_pointing_to_alias.first index ||= index_name cluster.api.delete_index(**, index: index) end |
#descendants ⇒ Object
:nodoc:
6 7 8 9 10 11 12 |
# File 'lib/esse/index/descendants.rb', line 6 def descendants # :nodoc: descendants = [] ObjectSpace.each_object(singleton_class) do |k| descendants.unshift k unless k == self end descendants.uniq end |
#exist?(doc = nil, suffix: nil, **options) ⇒ Boolean
Check if a JSON document exists
UsersIndex.exist?(id: 1) # true
UsersIndex.exist?(id: 'missing') # false
74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/esse/index/documents.rb', line 74 def exist?(doc = nil, suffix: nil, **) if document?(doc) [:id] = doc.id [:type] = doc.type if doc.type? [:routing] = doc.routing if doc.routing? end require_kwargs!(, :id) [:index] = index_name(suffix: suffix) cluster.may_update_type!() cluster.api.exist?(**) end |
#get(doc = nil, suffix: nil, **options) ⇒ Hash
Retrieves the specified JSON document from an index.
UsersIndex.get(id: 1) # { '_id' => 1, ... }
UsersIndex.get(id: 'missing') # raise Esse::Transport::NotFoundError
20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/esse/index/documents.rb', line 20 def get(doc = nil, suffix: nil, **) if document?(doc) [:id] = doc.id [:type] = doc.type if doc.type? [:routing] = doc.routing if doc.routing? end require_kwargs!(, :id) [:index] = index_name(suffix: suffix) cluster.may_update_type!() cluster.api.get(**) end |
#import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_lazy_attributes: false, preload_lazy_attributes: false, suffix: nil, **options) ⇒ Numeric
Resolve collection and index data
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/esse/index/documents.rb', line 286 def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_lazy_attributes: false, preload_lazy_attributes: false, suffix: nil, **) repo_types = repo_hash.keys if repo_types.empty? count = 0 if .key?(:eager_include_document_attributes) warn 'The `eager_include_document_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.' eager_load_lazy_attributes = .delete(:eager_include_document_attributes) end if .key?(:lazy_update_document_attributes) warn 'The `lazy_update_document_attributes` option is deprecated. Use `update_lazy_attributes` instead.' update_lazy_attributes = .delete(:lazy_update_document_attributes) end repo_hash.slice(*repo_types).each do |repo_name, repo| # Elasticsearch 6.x and older have multiple types per index. # This gem supports multiple types per index for backward compatibility, but we recommend to update # your elasticsearch to a at least 7.x version and use a single type per index. # # Note that the repository name will be used as the document type. # mapping_default_type bulk_kwargs = { suffix: suffix, type: repo_name, ** } cluster.may_update_type!(bulk_kwargs) context ||= {} context[:eager_load_lazy_attributes] = eager_load_lazy_attributes context[:preload_lazy_attributes] = preload_lazy_attributes repo.each_serialized_batch(**context) do |batch| bulk(**bulk_kwargs, index: batch) if update_lazy_attributes != false attrs = repo.lazy_document_attribute_names(update_lazy_attributes) attrs -= repo.lazy_document_attribute_names(eager_load_lazy_attributes) update_attrs = attrs.each_with_object(Hash.new { |h, k| h[k] = {} }) do |attr_name, memo| filtered_docs = batch.reject do |doc| doc.ignore_on_index? || doc.mutations.key?(attr_name) end next if filtered_docs.empty? repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).each do |doc, value| memo[doc.doc_header][attr_name] = value end end if update_attrs.any? bulk_update = update_attrs.map do |header, values| header.merge(data: {doc: values}) end bulk(**bulk_kwargs, update: bulk_update) end end count += batch.size end end count end |
#index(doc = nil, suffix: nil, **options) ⇒ Hash
Adds a JSON document to the specified index and makes it searchable. If the document already exists, updates the document and increments its version.
UsersIndex::User.index(id: 1, body: { name: 'name' }) # { '_id' => 1, ...}
173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/esse/index/documents.rb', line 173 def index(doc = nil, suffix: nil, **) if document?(doc) = request_params_for(:index, doc).merge() if request_params_for?(:index) [:id] = doc.id [:body] = doc.mutated_source [:type] = doc.type if doc.type? [:routing] = doc.routing if doc.routing? end require_kwargs!(, :id, :body) [:index] = index_name(suffix: suffix) cluster.may_update_type!() cluster.api.index(**) end |
#index_directory ⇒ Object
54 55 56 57 58 59 |
# File 'lib/esse/index/attributes.rb', line 54 def index_directory return unless uname return if uname == 'Esse::Index' Esse.config.indices_directory.join(uname).to_s end |
#index_exist?(suffix: nil) ⇒ Boolean
Checks the index existance. Returns true or false
UsersIndex.index_exist? #=> true
145 146 147 |
# File 'lib/esse/index/indices.rb', line 145 def index_exist?(suffix: nil) cluster.api.index_exist?(index: index_name(suffix: suffix)) end |
#index_name(suffix: nil) ⇒ Object
15 16 17 18 19 20 21 |
# File 'lib/esse/index/attributes.rb', line 15 def index_name(suffix: nil) iname = index_prefixed_name(@index_name || normalized_name) suffix = Hstring.new(suffix).underscore.presence return iname if !iname || !suffix [iname, suffix].join('_') end |
#index_name=(value) ⇒ Object
11 12 13 |
# File 'lib/esse/index/attributes.rb', line 11 def index_name=(value) @index_name = Hstring.new(value.to_s).underscore.presence end |
#index_name? ⇒ Boolean
23 24 25 |
# File 'lib/esse/index/attributes.rb', line 23 def index_name? !index_name.nil? end |
#index_prefix ⇒ Object
27 28 29 30 31 |
# File 'lib/esse/index/attributes.rb', line 27 def index_prefix return @index_prefix if defined? @index_prefix cluster.index_prefix end |
#index_prefix=(value) ⇒ Object
33 34 35 36 37 38 39 40 |
# File 'lib/esse/index/attributes.rb', line 33 def index_prefix=(value) if value == false @index_prefix = nil return end @index_prefix = Hstring.new(value.to_s).underscore.presence end |
#index_suffix ⇒ Object
46 47 48 |
# File 'lib/esse/index/attributes.rb', line 46 def index_suffix @index_suffix end |
#index_suffix=(value) ⇒ Object
42 43 44 |
# File 'lib/esse/index/attributes.rb', line 42 def index_suffix=(value) @index_suffix = Hstring.new(value.to_s).underscore.presence end |
#indices_pointing_to_alias(**options) ⇒ Object
Return list of real index names for the virtual index name(alias)
18 19 20 21 22 |
# File 'lib/esse/index/aliases.rb', line 18 def indices_pointing_to_alias(**) cluster.api.aliases(**, name: index_name).keys rescue Esse::Transport::NotFoundError [] end |
#inherited(subclass) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/esse/index/inheritance.rb', line 15 def inherited(subclass) super inherited_instance_variables.each do |variable_name, should_duplicate| if (variable_value = instance_variable_get(variable_name)) && should_duplicate value = case variable_value when Hash h = {} variable_value.each { |k, v| h[k] = v.dup } h else variable_value.dup end end subclass.instance_variable_set(variable_name, value) end end |
#inspect ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/esse/index/base.rb', line 65 def inspect if self == Index super elsif abstract_class? "#{super}(abstract)" elsif index_name? "#{super}(Index: #{index_name})" else "#{super}(Index is not defined)" end end |
#mapping_single_type=(value) ⇒ Object
75 76 77 |
# File 'lib/esse/index/attributes.rb', line 75 def mapping_single_type=(value) @mapping_single_type = !!value end |
#mapping_single_type? ⇒ Boolean
79 80 81 82 83 |
# File 'lib/esse/index/attributes.rb', line 79 def mapping_single_type? return @mapping_single_type if defined? @mapping_single_type @mapping_single_type = cluster.engine.mapping_single_type? end |
#mappings(hash = {}, &block) ⇒ Object
This method is only used to define mapping
11 12 13 14 15 16 |
# File 'lib/esse/index/mappings.rb', line 11 def mappings(hash = {}, &block) @mapping = Esse::IndexMapping.new(body: hash, paths: template_dirs, globals: -> { cluster.mappings }) return unless block @mapping.define_singleton_method(:to_h, &block) end |
#mappings_hash ⇒ Object
18 19 20 21 |
# File 'lib/esse/index/mappings.rb', line 18 def mappings_hash hash = mapping.body { Esse::MAPPING_ROOT_KEY => (hash.key?(Esse::MAPPING_ROOT_KEY) ? hash[Esse::MAPPING_ROOT_KEY] : hash) } end |
#mget(ids:, suffix: nil, **options) ⇒ Hash
Retrieves multiple JSON documents by ID from an index.
UsersIndex.mget(ids: [1, 2, 3])
UsersIndex.mget(ids: [Esse::HashDocument.new(id: 1), Esse::HashDocument.new(id: 2)])
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/esse/index/documents.rb', line 43 def mget(ids:, suffix: nil, **) [:body] = { docs: ids.map do |doc| if document?(doc) datum = { _id: doc.id } datum[:_type] = doc.type if doc.type? datum[:routing] = doc.routing if doc.routing? datum elsif doc.is_a?(Hash) doc else { _id: doc } end end, } [:index] = index_name(suffix: suffix) cluster.may_update_type!() cluster.api.mget(**) end |
#open(suffix: nil, **options) ⇒ Object
Open a previously closed index
166 167 168 |
# File 'lib/esse/index/indices.rb', line 166 def open(suffix: nil, **) cluster.api.open(index: index_name(suffix: suffix), **) end |
#plugin(plugin, **kwargs, &block) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/esse/index/plugins.rb', line 8 def plugin(plugin, **kwargs, &block) mod = plugin.is_a?(Module) ? plugin : load_plugin_module(plugin) unless @plugins.include?(mod) @plugins << mod mod.apply(self, **kwargs, &block) if mod.respond_to?(:apply) extend(mod::IndexClassMethods) if mod.const_defined?(:IndexClassMethods, false) if mod.const_defined?(:RepositoryClassMethods, false) repo_hash.each_value.each { |repo| repository_plugin_extend(repo, mod::RepositoryClassMethods) } end end mod.configure(self, **kwargs, &block) if mod.respond_to?(:configure) end |
#refresh(suffix: nil, **options) ⇒ Object
The refresh operation can adversely affect indexing throughput when used too frequently.
Performs the refresh operation in one or more indices.
183 184 185 |
# File 'lib/esse/index/indices.rb', line 183 def refresh(suffix: nil, **) cluster.api.refresh(index: index_name(suffix: suffix), **) end |
#reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, **options) ⇒ Object
Copies documents from a source to a destination.
To avoid http timeout, we are sending the request with ‘wait_for_completion: false` and polling the task until it is completed.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/esse/index/indices.rb', line 121 def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, **) resp = cluster.api.reindex(**, body: body, scroll: scroll, wait_for_completion: false) return resp unless wait_for_completion task_id = resp['task'] task = nil begin while (task = cluster.api.task(id: task_id))['completed'] == false sleep poll_interval.to_i end rescue Interrupt => e cluster.api.cancel_task(id: task_id) raise e end task end |
#repo(name = nil) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/esse/index/type.rb', line 12 def repo(name = nil) if name.nil? && repo_hash.size == 1 name = repo_hash.keys.first elsif name.nil? && repo_hash.size > 1 raise ArgumentError, "You can only call `repo' with a name when there is only one type defined." end name ||= DEFAULT_REPO_NAME repo_hash.fetch(name.to_s) rescue KeyError raise ArgumentError, <<~MSG No repo named "#{name}" found. Use the `repository' method to define one: repository :#{name} do # collection ... # document ... end MSG end |
#repo?(name = nil) ⇒ Boolean
32 33 34 35 36 |
# File 'lib/esse/index/type.rb', line 32 def repo?(name = nil) return repo_hash.size > 0 if name.nil? repo_hash.key?(name.to_s) end |
#repository(repo_name, *_args, **kwargs, &block) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/esse/index/type.rb', line 38 def repository(repo_name, *_args, **kwargs, &block) repo_class = Class.new(Esse::Repository) kwargs[:const] = true unless kwargs.key?(:const) # TODO Change this to false to avoid collisions with application classes kwargs[:lazy_evaluate] ||= false if kwargs[:const] const_set(Hstring.new(repo_name).camelize.demodulize.to_s, repo_class) end index = self repo_class.send(:define_singleton_method, :index) { index } repo_class.send(:define_singleton_method, :repo_name) { repo_name.to_s } plugins.each do |mod| next unless mod.const_defined?(:RepositoryClassMethods, false) repository_plugin_extend(repo_class, mod::RepositoryClassMethods) end if kwargs[:lazy_evaluate] elsif block repo_class.class_eval(&block) end self.repo_hash = repo_hash.merge(repo_class.repo_name => repo_class) repo_class end |
#reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, refresh: nil, **options) ⇒ Hash
Deletes, creates and imports data to the index. Performs zero-downtime index resetting.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/esse/index/indices.rb', line 62 def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, refresh: nil, **) cluster.throw_error_when_readonly! suffix ||= Esse. suffix = Esse. while index_exist?(suffix: suffix) syncronous_import = true syncronous_import = false if reindex.is_a?(Hash) && reindex[:wait_for_completion] == false optimized_creation = optimize && syncronous_import && (import || reindex) if optimized_creation definition = [settings_hash(settings: settings), mappings_hash].reduce(&:merge) number_of_replicas = definition.dig(Esse::SETTING_ROOT_KEY, :index, :number_of_replicas) || 1 refresh_interval = definition.dig(Esse::SETTING_ROOT_KEY, :index, :refresh_interval) || '1s' new_number_of_replicas = ((definition[Esse::SETTING_ROOT_KEY] ||= {})[:index] ||= {})[:number_of_replicas] = 0 new_refresh_interval = ((definition[Esse::SETTING_ROOT_KEY] ||= {})[:index] ||= {})[:refresh_interval] = '-1' create_index(**, suffix: suffix, alias: false, body: definition) else create_index(**, suffix: suffix, alias: false, settings: settings) end if index_exist? && aliases.none? cluster.api.delete_index(index: index_name) end if import import_kwargs = import.is_a?(Hash) ? import : {} import_kwargs[:refresh] ||= refresh unless refresh.nil? import(**, **import_kwargs, suffix: suffix) elsif reindex && (source_indexes = indices_pointing_to_alias).any? reindex_kwargs = reindex.is_a?(Hash) ? reindex : {} reindex_kwargs[:refresh] ||= refresh unless refresh.nil? source_indexes.each do |from| reindex(**reindex_kwargs, body: { source: { index: from }, dest: { index: index_name(suffix: suffix) } }) end end if optimized_creation && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval settings ||= {} settings[:index] ||= {} settings[:index][:number_of_replicas] = number_of_replicas settings[:index][:refresh_interval] = refresh_interval update_settings(suffix: suffix, settings: settings) refresh(suffix: suffix) end update_aliases(suffix: suffix) if syncronous_import true end |
#search(*args, &block) ⇒ Object
8 9 10 11 |
# File 'lib/esse/index/search.rb', line 8 def search(*args, &block) kwargs = (args) cluster.search(self, **kwargs, &block) end |
#settings(hash = {}, &block) ⇒ Object
Define /_settings definition by each index.
hash: The body of the request includes the updated settings. block: Overwrite default :to_h from IndexSetting instance
Example:
class UserIndex < Esse::Index
settings {
index: { number_of_replicas: 4 }
}
end
class UserIndex < Esse::Index
settings do
# do something to load settings..
end
end
58 59 60 61 62 63 |
# File 'lib/esse/index/settings.rb', line 58 def settings(hash = {}, &block) @setting = Esse::IndexSetting.new(body: hash, paths: template_dirs, globals: -> { cluster.settings }) return unless block @setting.define_singleton_method(:to_h, &block) end |
#settings_hash(settings: nil) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/esse/index/settings.rb', line 17 def settings_hash(settings: nil) hash = setting.body values = (hash.key?(Esse::SETTING_ROOT_KEY) ? hash[Esse::SETTING_ROOT_KEY] : hash) values = HashUtils.explode_keys(values) if settings.is_a?(Hash) values = HashUtils.deep_merge(values, HashUtils.explode_keys(settings)) end INDEX_SIMPLIFIED_SETTINGS.each do |key| next unless values.key?(key) value = values.delete(key) next if value.nil? (values[:index] ||= {}).merge!(key => value) end if values[:index].is_a?(Hash) INDEX_SIMPLIFIED_SETTINGS.each { |key| values[:index].delete(key) if values[:index][key].nil? } values.delete(:index) if values[:index].empty? end { Esse::SETTING_ROOT_KEY => values } end |
#template_dirs ⇒ Object
61 62 63 64 65 |
# File 'lib/esse/index/attributes.rb', line 61 def template_dirs return [] unless index_directory TEMPLATE_DIRS.map { |term| format(term, dirname: index_directory) } end |
#uname ⇒ Object
50 51 52 |
# File 'lib/esse/index/attributes.rb', line 50 def uname Hstring.new(name).underscore.presence end |
#update(doc = nil, suffix: nil, **options) ⇒ Hash
Updates a document using the specified script.
UsersIndex.update(id: 1, body: { doc: { ... } }) # { '_id' => 1, ...}
146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/esse/index/documents.rb', line 146 def update(doc = nil, suffix: nil, **) if document?(doc) = request_params_for(:update, doc).merge() if request_params_for?(:update) [:id] = doc.id [:body] = { doc: doc.mutated_source } [:type] = doc.type if doc.type? [:routing] = doc.routing if doc.routing? end require_kwargs!(, :id, :body) [:index] = index_name(suffix: suffix) cluster.may_update_type!() cluster.api.update(**) end |
#update_aliases(suffix:, **options) ⇒ Hash
Replaces all existing aliases by the respective suffixed index from argument.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/esse/index/aliases.rb', line 30 def update_aliases(suffix:, **) cluster.throw_error_when_readonly! raise(ArgumentError, 'index suffix cannot be nil') if suffix.nil? [:body] = { actions: [ *indices_pointing_to_alias.map do |index| { remove: { index: index, alias: index_name } } end, *Array(suffix).map do |value| { add: { index: build_real_index_name(value), alias: index_name } } end, ], } cluster.api.update_aliases(**) end |
#update_by_query(suffix: nil, **options) ⇒ Hash
Update documents by query
348 349 350 351 352 353 354 |
# File 'lib/esse/index/documents.rb', line 348 def update_by_query(suffix: nil, **) definition = { index: index_name(suffix: suffix), }.merge() cluster.may_update_type!(definition) cluster.api.update_by_query(**definition) end |
#update_mapping(suffix: nil, **options) ⇒ Object
Updates index mappings
191 192 193 194 195 196 197 198 199 200 |
# File 'lib/esse/index/indices.rb', line 191 def update_mapping(suffix: nil, **) body = mappings_hash.fetch(Esse::MAPPING_ROOT_KEY) if (type = [:type]) # Elasticsearch <= 5.x should submit request with type both in the path and in the body # Elasticsearch 6.x should submit request with type in the path but not in the body # Elasticsearch >= 7.x does not support type in the mapping body = body[type.to_s] || body[type.to_sym] || body end cluster.api.update_mapping(index: index_name(suffix: suffix), body: body, **) end |
#update_settings(suffix: nil, settings: nil, **options) ⇒ Object
Updates index settings
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/esse/index/indices.rb', line 206 def update_settings(suffix: nil, settings: nil, **) response = nil settings = HashUtils.deep_transform_keys(settings_hash(settings: settings).fetch(Esse::SETTING_ROOT_KEY), &:to_sym) if [:body] body = HashUtils.deep_transform_keys(.delete(:body), &:to_sym) settings = HashUtils.deep_merge(settings, body) end settings.delete(:number_of_shards) # Can't change number of shards for an index settings[:index]&.delete(:number_of_shards) analysis = settings.delete(:analysis) if settings.any? response = cluster.api.update_settings(index: index_name(suffix: suffix), body: settings, **) end if analysis # It is also possible to define new analyzers for the index. But it is required to close the # index first and open it after the changes are made. close(suffix: suffix) begin response = cluster.api.update_settings(index: index_name(suffix: suffix), body: { analysis: analysis }, **) ensure self.open(suffix: suffix) end end response end |