Module: ElasticQueue::Persistence::ClassMethods

Defined in:
lib/elastic_queue/persistence.rb

Instance Method Summary collapse

Instance Method Details

#add_mappingsObject



70
71
72
73
74
75
# File 'lib/elastic_queue/persistence.rb', line 70

def add_mappings
  model_classes.each do |klass|
    mapping = klass.queue_mapping
    search_client.indices.put_mapping index: index_name, type: klass.to_s.underscore, body: mapping if mapping.present?
  end
end

#bulk_index(scopes: {}, batch_size: 10_000) ⇒ Object

you can pass scopes into bulk_index to be used when fetching records

bulk_index(scopes: { some_model: [:scope1, :scope2], some_other_model: [:scope3] }) will fetch SomeModel.scope1.scope2 and SomeOtherModel.scope3 and index only those records.


31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/elastic_queue/persistence.rb', line 31

def bulk_index(scopes: {}, batch_size: 10_000)
  create_index unless index_exists?
  model_classes.each do |klass|
    # modelclass(model).includes(associations_for_index(model)).
    index_type = klass.to_s.underscore
    scoped_class(klass, scopes).find_in_batches(batch_size: batch_size) do |batch|
      body = []
      batch.each do |instance|
        body << { index: { _index: index_name, _id: instance.id, _type: index_type, data: instance.indexed_for_queue } }
      end
      search_client.bulk body: body
    end
  end
end

#create_indexObject



15
16
17
18
# File 'lib/elastic_queue/persistence.rb', line 15

def create_index
  search_client.indices.create index: index_name, body: default_index_settings
  add_mappings
end

#default_index_settingsObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/elastic_queue/persistence.rb', line 54

def default_index_settings
  {
    settings: {
      analysis: {
        analyzer: {
          default: {
            type: :custom,
            tokenizer: :whitespace,
            filter: [:lowercase]
          }
        }
      }
    }
  }
end

#delete_indexObject



20
21
22
# File 'lib/elastic_queue/persistence.rb', line 20

def delete_index
  search_client.indices.delete index: index_name
end

#index_exists?Boolean

Returns:

  • (Boolean)


6
7
8
# File 'lib/elastic_queue/persistence.rb', line 6

def index_exists?
  search_client.indices.exists index: index_name
end

#index_model(instance) ⇒ Object

TODO: move these to an instance?



78
79
80
# File 'lib/elastic_queue/persistence.rb', line 78

def index_model(instance)
  search_client.index index: index_name, id: instance.id, type: instance.class.to_s.underscore, body: instance.indexed_for_queue
end

#refresh_indexObject

not using it, but it is nice for debugging



25
26
27
# File 'lib/elastic_queue/persistence.rb', line 25

def refresh_index
  search_client.indices.refresh index: index_name
end

#remove_model(instance) ⇒ Object



87
88
89
90
91
92
93
# File 'lib/elastic_queue/persistence.rb', line 87

def remove_model(instance)
  begin
    search_client.delete index: index_name, id: instance.id, type: instance.class.to_s.underscore
  rescue Elasticsearch::Transport::Transport::Errors::NotFound
    # just say you deleted it if it's not there!
  end
end

#reset_indexObject



10
11
12
13
# File 'lib/elastic_queue/persistence.rb', line 10

def reset_index
  delete_index if index_exists?
  create_index
end

#scoped_class(klass, scopes) ⇒ Object



46
47
48
49
50
51
52
# File 'lib/elastic_queue/persistence.rb', line 46

def scoped_class(klass, scopes)
  return klass unless scopes[klass.to_s.underscore.to_sym]
  scopes[klass.to_s.underscore.to_sym].each do |scope|
    klass = klass.send(scope)
  end
  klass
end

#upsert_model(instance) ⇒ Object



82
83
84
85
# File 'lib/elastic_queue/persistence.rb', line 82

def upsert_model(instance)
  body = { doc: instance.indexed_for_queue, doc_as_upsert: true }
  search_client.update index: index_name, id: instance.id, type: instance.class.to_s.underscore, body: body, refresh: true, retry_on_conflict: 20
end