Class: CassandraModel::RawConnection

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
ConcurrencyHelper
Defined in:
lib/cassandra_model/raw_connection.rb,
lib/cassandra_model/mock_connection.rb

Overview

noinspection RubyTooManyInstanceVariablesInspection

Constant Summary collapse

DEFAULT_CONFIGURATION =
{
    hosts: %w(localhost),
    keyspace: 'default_keyspace',
    keyspace_options: {
        class: 'SimpleStrategy',
        replication_factor: 1
    },
    port: '9042',
    consistency: :one,
    connection_timeout: 10,
    timeout: 10
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(config_name = nil) ⇒ RawConnection

Returns a new instance of RawConnection.



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/cassandra_model/raw_connection.rb', line 24

def initialize(config_name = nil)
  @config_name = config_name
  @statement_cache = Concurrent::Map.new

  @cluster_mutex = Mutex.new
  @session_mutex = Mutex.new
  @config_mutex = Mutex.new
  @reactor_mutex = Mutex.new

  @executor = Concurrent::Delay.new { Concurrent::CachedThreadPool.new }
  @futures_factory = Concurrent::Delay.new { Cassandra::Future::Factory.new(executor) }
end

Instance Method Details

#clusterObject



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/cassandra_model/raw_connection.rb', line 45

def cluster
  safe_getset_variable(@cluster_mutex, :@cluster) do
    connection_configuration = config.slice(:hosts,
                                            :compression,
                                            :consistency,
                                            :connection_timeout, :timeout,
                                            :username, :password,
                                            :address_resolution)
    connection_configuration[:logger] = Logging.logger
    connection_configuration[:futures_factory] = futures_factory
    Cassandra.cluster(connection_configuration)
  end
end

#configObject



41
42
43
# File 'lib/cassandra_model/raw_connection.rb', line 41

def config
  safe_getset_variable(@config_mutex, :@config) { load_config }
end

#config=(value) ⇒ Object



37
38
39
# File 'lib/cassandra_model/raw_connection.rb', line 37

def config=(value)
  @config_mutex.synchronize { @config = DEFAULT_CONFIGURATION.merge(value) }
end

#counter_batch_reactorObject



75
76
77
# File 'lib/cassandra_model/raw_connection.rb', line 75

def counter_batch_reactor
  reactor(:@counter_reactor, SingleTokenCounterBatch)
end

#keyspaceObject



63
64
65
# File 'lib/cassandra_model/raw_connection.rb', line 63

def keyspace
  cluster.keyspace(keyspace_name) || create_keyspace
end

#logged_batch_reactorObject



71
72
73
# File 'lib/cassandra_model/raw_connection.rb', line 71

def logged_batch_reactor
  reactor(:@logged_reactor, SingleTokenLoggedBatch)
end

#sessionObject



59
60
61
# File 'lib/cassandra_model/raw_connection.rb', line 59

def session
  safe_getset_variable(@session_mutex, :@session) { cluster.connect(config[:keyspace]) }
end

#shutdownObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/cassandra_model/raw_connection.rb', line 83

def shutdown
  @shutdown = true
  @reactor_mutex.synchronize do
    @unlogged_reactor.stop.get if @unlogged_reactor
    @unlogged_reactor = nil

    @logged_reactor.stop.get if @logged_reactor
    @logged_reactor = nil

    @counter_reactor.stop.get if @counter_reactor
    @counter_reactor = nil
  end
  @session_mutex.synchronize do
    @session.close if @session
    @session = nil
  end
  @cluster_mutex.synchronize do
    @cluster.close if @cluster
    @cluster = nil
  end
end

#statement(query) ⇒ Object



79
80
81
# File 'lib/cassandra_model/raw_connection.rb', line 79

def statement(query)
  statement_cache.fetch_or_store(query) { session.prepare(query) }
end

#unlogged_batch_reactorObject



67
68
69
# File 'lib/cassandra_model/raw_connection.rb', line 67

def unlogged_batch_reactor
  reactor(:@unlogged_reactor, SingleTokenUnloggedBatch)
end