Class: Searchyll::Indexer

Inherits:
Object
  • Object
show all
Defined in:
lib/searchyll/indexer.rb

Constant Summary collapse

BATCH_SIZE =
50

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration) ⇒ Indexer

Returns a new instance of Indexer.



17
18
19
20
21
22
23
# File 'lib/searchyll/indexer.rb', line 17

def initialize(configuration)
  self.configuration = configuration
  self.uri = URI(configuration.elasticsearch_url)
  self.queue = Queue.new
  self.working = true
  self.timestamp = Time.now
end

Instance Attribute Details

#configurationObject

Returns the value of attribute configuration.



9
10
11
# File 'lib/searchyll/indexer.rb', line 9

def configuration
  @configuration
end

#indexer_threadObject

Returns the value of attribute indexer_thread.



10
11
12
# File 'lib/searchyll/indexer.rb', line 10

def indexer_thread
  @indexer_thread
end

#old_indicesObject

Returns the value of attribute old_indices.



11
12
13
# File 'lib/searchyll/indexer.rb', line 11

def old_indices
  @old_indices
end

#queueObject

Returns the value of attribute queue.



12
13
14
# File 'lib/searchyll/indexer.rb', line 12

def queue
  @queue
end

#timestampObject

Returns the value of attribute timestamp.



13
14
15
# File 'lib/searchyll/indexer.rb', line 13

def timestamp
  @timestamp
end

#uriObject

Returns the value of attribute uri.



14
15
16
# File 'lib/searchyll/indexer.rb', line 14

def uri
  @uri
end

#workingObject

Returns the value of attribute working.



15
16
17
# File 'lib/searchyll/indexer.rb', line 15

def working
  @working
end

Instance Method Details

#<<(doc) ⇒ Object

Public: Add new documents for batch indexing.



26
27
28
# File 'lib/searchyll/indexer.rb', line 26

def <<(doc)
  self.queue << doc
end

#current_batchObject

Fetch a batch of documents from the queue. Returns a maximum of BATCH_SIZE documents.



119
120
121
122
123
124
125
126
127
# File 'lib/searchyll/indexer.rb', line 119

def current_batch
  count = 0
  batch = []
  while count < BATCH_SIZE && queue.length > 0
    batch << queue.pop
    count += 1
  end
  batch
end

#elasticsearch_index_nameObject

A versioned index name, based on the time of the indexing run. Will be later added to an alias for hot reindexing.



37
38
39
# File 'lib/searchyll/indexer.rb', line 37

def elasticsearch_index_name
  "#{configuration.elasticsearch_index_base_name}-#{timestamp.strftime('%Y%m%d%H%M%S')}"
end

#es_bulk_insert!(http, batch) ⇒ Object

Given a batch (array) of documents, index them into Elasticsearch using its Bulk Update API. www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html



109
110
111
112
113
114
115
# File 'lib/searchyll/indexer.rb', line 109

def es_bulk_insert!(http, batch)
  bulk_insert = http_post("/#{elasticsearch_index_name}/#{configuration.elasticsearch_default_type}/_bulk")
  bulk_insert.body = batch.map do |doc|
    [ { :index => {} }.to_json, doc.to_json ].join("\n")
  end.join("\n") + "\n"
  http.request(bulk_insert)
end

#finalize!Object

Once documents are done being indexed, finalize the process by adding the new index into an alias for searching.



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/searchyll/indexer.rb', line 146

def finalize!
  # refresh the index to make it searchable
  refresh = http_post("/#{elasticsearch_index_name}/_refresh")

  # add replication to the new index
  add_replication = http_put("/#{elasticsearch_index_name}/_settings")
  add_replication.body = { index: { number_of_replicas: configuration.elasticsearch_number_of_replicas }}.to_json

  # hot swap the index into the canonical alias
  update_aliases = http_post("/_aliases")
  update_aliases.body = {
    "actions": [
      { "remove": { "index": old_indices.join(','), "alias": configuration.elasticsearch_index_base_name }},
      { "add":    { "index": elasticsearch_index_name, "alias": configuration.elasticsearch_index_base_name }}
    ]
  }.to_json

  # delete old indices
  cleanup_indices = http_delete("/#{old_indices.join(',')}")
  puts %(       Old indices: #{old_indices.join(', ')})

  # run the prepared requests
  http_start do |http|
    http.request(refresh)
    http.request(add_replication)
    http.request(update_aliases)
    if !old_indices.empty?
      http.request(cleanup_indices)
    end
  end
end

#finishObject

Public: Indicate to the indexer that no new documents are being added.



130
131
132
133
134
# File 'lib/searchyll/indexer.rb', line 130

def finish
  self.working = false
  indexer_thread.join
  finalize!
end

#http_delete(path) ⇒ Object



94
95
96
# File 'lib/searchyll/indexer.rb', line 94

def http_delete(path)
  http_request(Net::HTTP::Delete, path)
end

#http_get(path) ⇒ Object



90
91
92
# File 'lib/searchyll/indexer.rb', line 90

def http_get(path)
  http_request(Net::HTTP::Get, path)
end

#http_post(path) ⇒ Object



86
87
88
# File 'lib/searchyll/indexer.rb', line 86

def http_post(path)
  http_request(Net::HTTP::Post, path)
end

#http_put(path) ⇒ Object



82
83
84
# File 'lib/searchyll/indexer.rb', line 82

def http_put(path)
  http_request(Net::HTTP::Put, path)
end

#http_request(klass, path) ⇒ Object



98
99
100
101
102
103
104
# File 'lib/searchyll/indexer.rb', line 98

def http_request(klass, path)
  req = klass.new(path)
  req.content_type = 'application/json'
  req['Accept']    = 'application/json'
  req.basic_auth(uri.user, uri.password)
  req
end

#http_start {|http| ... } ⇒ Object

Prepare an HTTP connection

Yields:

  • (http)


42
43
44
45
46
47
48
# File 'lib/searchyll/indexer.rb', line 42

def http_start(&block)
  http = Net::HTTP.start(
    uri.hostname, uri.port,
    :use_ssl => (uri.scheme == 'https')
  )
  yield(http)
end

#prepare_indexObject

Prepare our indexing run by creating a new index.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/searchyll/indexer.rb', line 51

def prepare_index
  create_index = http_put("/#{elasticsearch_index_name}")
  create_index.body = {
    index: {
      number_of_shards:   configuration.elasticsearch_number_of_shards,
      number_of_replicas: 0,
      refresh_interval:   -1
    }
  }.to_json # TODO: index settings

  http_start do |http|
    resp = http.request(create_index)
  end

  # todo: mapping?
end

#startObject

Public: start the indexer and wait for documents to index.



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/searchyll/indexer.rb', line 69

def start
  prepare_index

  self.indexer_thread = Thread.new do
    http_start do |http|
      loop do
        break unless working?
        es_bulk_insert!(http, current_batch)
      end
    end
  end
end

#working?Boolean

Signal a stop condition for our batch indexing thread.

Returns:

  • (Boolean)


31
32
33
# File 'lib/searchyll/indexer.rb', line 31

def working?
  working || queue.length > 0
end