Class: Searchyll::Indexer
- Inherits:
-
Object
- Object
- Searchyll::Indexer
- Defined in:
- lib/searchyll/indexer.rb
Constant Summary collapse
- BATCH_SIZE =
50
Instance Attribute Summary collapse
-
#configuration ⇒ Object
Returns the value of attribute configuration.
-
#indexer_thread ⇒ Object
Returns the value of attribute indexer_thread.
-
#old_indices ⇒ Object
Returns the value of attribute old_indices.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#timestamp ⇒ Object
Returns the value of attribute timestamp.
-
#uri ⇒ Object
Returns the value of attribute uri.
-
#working ⇒ Object
Returns the value of attribute working.
Instance Method Summary collapse
-
#<<(doc) ⇒ Object
Public: Add new documents for batch indexing.
-
#current_batch ⇒ Object
Fetch a batch of documents from the queue.
-
#elasticsearch_index_name ⇒ Object
A versioned index name, based on the time of the indexing run.
-
#es_bulk_insert!(http, batch) ⇒ Object
Given a batch (array) of documents, index them into Elasticsearch using its Bulk Update API.
-
#finalize! ⇒ Object
Once documents are done being indexed, finalize the process by adding the new index into an alias for searching.
-
#finish ⇒ Object
Public: Indicate to the indexer that no new documents are being added.
- #http_delete(path) ⇒ Object
- #http_get(path) ⇒ Object
- #http_post(path) ⇒ Object
- #http_put(path) ⇒ Object
- #http_request(klass, path) ⇒ Object
-
#http_start {|http| ... } ⇒ Object
Prepare an HTTP connection.
-
#initialize(configuration) ⇒ Indexer
constructor
A new instance of Indexer.
-
#prepare_index ⇒ Object
Prepare our indexing run by creating a new index.
-
#start ⇒ Object
Public: start the indexer and wait for documents to index.
-
#working? ⇒ Boolean
Signal a stop condition for our batch indexing thread.
Constructor Details
#initialize(configuration) ⇒ Indexer
Returns a new instance of Indexer.
17 18 19 20 21 22 23 |
# File 'lib/searchyll/indexer.rb', line 17 def initialize(configuration) self.configuration = configuration self.uri = URI(configuration.elasticsearch_url) self.queue = Queue.new self.working = true self. = Time.now end |
Instance Attribute Details
#configuration ⇒ Object
Returns the value of attribute configuration.
9 10 11 |
# File 'lib/searchyll/indexer.rb', line 9 def configuration @configuration end |
#indexer_thread ⇒ Object
Returns the value of attribute indexer_thread.
10 11 12 |
# File 'lib/searchyll/indexer.rb', line 10 def indexer_thread @indexer_thread end |
#old_indices ⇒ Object
Returns the value of attribute old_indices.
11 12 13 |
# File 'lib/searchyll/indexer.rb', line 11 def old_indices @old_indices end |
#queue ⇒ Object
Returns the value of attribute queue.
12 13 14 |
# File 'lib/searchyll/indexer.rb', line 12 def queue @queue end |
#timestamp ⇒ Object
Returns the value of attribute timestamp.
13 14 15 |
# File 'lib/searchyll/indexer.rb', line 13 def @timestamp end |
#uri ⇒ Object
Returns the value of attribute uri.
14 15 16 |
# File 'lib/searchyll/indexer.rb', line 14 def uri @uri end |
#working ⇒ Object
Returns the value of attribute working.
15 16 17 |
# File 'lib/searchyll/indexer.rb', line 15 def working @working end |
Instance Method Details
#<<(doc) ⇒ Object
Public: Add new documents for batch indexing.
26 27 28 |
# File 'lib/searchyll/indexer.rb', line 26 def <<(doc) self.queue << doc end |
#current_batch ⇒ Object
Fetch a batch of documents from the queue. Returns a maximum of BATCH_SIZE documents.
119 120 121 122 123 124 125 126 127 |
# File 'lib/searchyll/indexer.rb', line 119 def current_batch count = 0 batch = [] while count < BATCH_SIZE && queue.length > 0 batch << queue.pop count += 1 end batch end |
#elasticsearch_index_name ⇒ Object
A versioned index name, based on the time of the indexing run. Will be later added to an alias for hot reindexing.
37 38 39 |
# File 'lib/searchyll/indexer.rb', line 37 def elasticsearch_index_name "#{configuration.elasticsearch_index_base_name}-#{.strftime('%Y%m%d%H%M%S')}" end |
#es_bulk_insert!(http, batch) ⇒ Object
Given a batch (array) of documents, index them into Elasticsearch using its Bulk Update API. www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
109 110 111 112 113 114 115 |
# File 'lib/searchyll/indexer.rb', line 109 def es_bulk_insert!(http, batch) bulk_insert = http_post("/#{elasticsearch_index_name}/#{configuration.elasticsearch_default_type}/_bulk") bulk_insert.body = batch.map do |doc| [ { :index => {} }.to_json, doc.to_json ].join("\n") end.join("\n") + "\n" http.request(bulk_insert) end |
#finalize! ⇒ Object
Once documents are done being indexed, finalize the process by adding the new index into an alias for searching.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/searchyll/indexer.rb', line 146 def finalize! # refresh the index to make it searchable refresh = http_post("/#{elasticsearch_index_name}/_refresh") # add replication to the new index add_replication = http_put("/#{elasticsearch_index_name}/_settings") add_replication.body = { index: { number_of_replicas: configuration.elasticsearch_number_of_replicas }}.to_json # hot swap the index into the canonical alias update_aliases = http_post("/_aliases") update_aliases.body = { "actions": [ { "remove": { "index": old_indices.join(','), "alias": configuration.elasticsearch_index_base_name }}, { "add": { "index": elasticsearch_index_name, "alias": configuration.elasticsearch_index_base_name }} ] }.to_json # delete old indices cleanup_indices = http_delete("/#{old_indices.join(',')}") puts %( Old indices: #{old_indices.join(', ')}) # run the prepared requests http_start do |http| http.request(refresh) http.request(add_replication) http.request(update_aliases) if !old_indices.empty? http.request(cleanup_indices) end end end |
#finish ⇒ Object
Public: Indicate to the indexer that no new documents are being added.
130 131 132 133 134 |
# File 'lib/searchyll/indexer.rb', line 130 def finish self.working = false indexer_thread.join finalize! end |
#http_delete(path) ⇒ Object
94 95 96 |
# File 'lib/searchyll/indexer.rb', line 94 def http_delete(path) http_request(Net::HTTP::Delete, path) end |
#http_get(path) ⇒ Object
90 91 92 |
# File 'lib/searchyll/indexer.rb', line 90 def http_get(path) http_request(Net::HTTP::Get, path) end |
#http_post(path) ⇒ Object
86 87 88 |
# File 'lib/searchyll/indexer.rb', line 86 def http_post(path) http_request(Net::HTTP::Post, path) end |
#http_put(path) ⇒ Object
82 83 84 |
# File 'lib/searchyll/indexer.rb', line 82 def http_put(path) http_request(Net::HTTP::Put, path) end |
#http_request(klass, path) ⇒ Object
98 99 100 101 102 103 104 |
# File 'lib/searchyll/indexer.rb', line 98 def http_request(klass, path) req = klass.new(path) req.content_type = 'application/json' req['Accept'] = 'application/json' req.basic_auth(uri.user, uri.password) req end |
#http_start {|http| ... } ⇒ Object
Prepare an HTTP connection
42 43 44 45 46 47 48 |
# File 'lib/searchyll/indexer.rb', line 42 def http_start(&block) http = Net::HTTP.start( uri.hostname, uri.port, :use_ssl => (uri.scheme == 'https') ) yield(http) end |
#prepare_index ⇒ Object
Prepare our indexing run by creating a new index.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/searchyll/indexer.rb', line 51 def prepare_index create_index = http_put("/#{elasticsearch_index_name}") create_index.body = { index: { number_of_shards: configuration.elasticsearch_number_of_shards, number_of_replicas: 0, refresh_interval: -1 } }.to_json # TODO: index settings http_start do |http| resp = http.request(create_index) end # todo: mapping? end |
#start ⇒ Object
Public: start the indexer and wait for documents to index.
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/searchyll/indexer.rb', line 69 def start prepare_index self.indexer_thread = Thread.new do http_start do |http| loop do break unless working? es_bulk_insert!(http, current_batch) end end end end |
#working? ⇒ Boolean
Signal a stop condition for our batch indexing thread.
31 32 33 |
# File 'lib/searchyll/indexer.rb', line 31 def working? working || queue.length > 0 end |