Class: CassandraModel::RawConnection

Inherits:
Object
  • Object
show all
Includes:
ConcurrencyHelper
Defined in:
lib/cassandra_model/raw_connection.rb

Constant Summary collapse

CLUSTER_MUTEX =
Mutex.new
SESSION_MUTEX =
Mutex.new
CONFIG_MUTEX =
Mutex.new
STATEMENT_MUTEX =
Mutex.new
REACTOR_MUTEX =
Mutex.new
DEFAULT_CONFIGURATION =
{
    hosts: %w(localhost),
    keyspace: 'default_keyspace',
    keyspace_options: {
        class: 'SimpleStrategy',
        replication_factor: 1
    },
    port: '9042',
    consistency: :one,
    connection_timeout: 10,
    timeout: 10
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(config_name = nil) ⇒ RawConnection

Returns a new instance of RawConnection.



24
25
26
27
# File 'lib/cassandra_model/raw_connection.rb', line 24

def initialize(config_name = nil)
  @config_name = config_name
  @statement_cache = {}
end

Instance Method Details

#clusterObject



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/cassandra_model/raw_connection.rb', line 37

def cluster
  safe_getset_variable(CLUSTER_MUTEX, :@cluster) do
    connection_configuration = config.slice(:hosts,
                                            :compression,
                                            :consistency,
                                            :connection_timeout, :timeout,
                                            :username, :password,
                                            :address_resolution)
    connection_configuration.merge!(logger: Logging.logger)
    Cassandra.cluster(connection_configuration)
  end
end

#configObject



33
34
35
# File 'lib/cassandra_model/raw_connection.rb', line 33

def config
  safe_getset_variable(CONFIG_MUTEX, :@config) { load_config }
end

#config=(value) ⇒ Object



29
30
31
# File 'lib/cassandra_model/raw_connection.rb', line 29

def config=(value)
  CONFIG_MUTEX.synchronize { @config = DEFAULT_CONFIGURATION.merge(value) }
end

#counter_batch_reactorObject



66
67
68
# File 'lib/cassandra_model/raw_connection.rb', line 66

def counter_batch_reactor
  reactor(:@counter_reactor, SingleTokenCounterBatch)
end

#keyspaceObject



54
55
56
# File 'lib/cassandra_model/raw_connection.rb', line 54

def keyspace
  cluster.keyspace(keyspace_name) || create_keyspace
end

#logged_batch_reactorObject



62
63
64
# File 'lib/cassandra_model/raw_connection.rb', line 62

def logged_batch_reactor
  reactor(:@logged_reactor, SingleTokenLoggedBatch)
end

#sessionObject



50
51
52
# File 'lib/cassandra_model/raw_connection.rb', line 50

def session
  safe_getset_variable(SESSION_MUTEX, :@session) { cluster.connect(config[:keyspace]) }
end

#shutdownObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/cassandra_model/raw_connection.rb', line 76

def shutdown
  @shutdown = true
  REACTOR_MUTEX.synchronize do
    @unlogged_reactor.stop.get if @unlogged_reactor
    @unlogged_reactor = nil

    @logged_reactor.stop.get if @logged_reactor
    @logged_reactor = nil

    @counter_reactor.stop.get if @counter_reactor
    @counter_reactor = nil
  end
  SESSION_MUTEX.synchronize do
    @session.close if @session
    @session = nil
  end
  CLUSTER_MUTEX.synchronize do
    @cluster.close if @cluster
    @cluster = nil
  end
end

#statement(query) ⇒ Object



70
71
72
73
74
# File 'lib/cassandra_model/raw_connection.rb', line 70

def statement(query)
  statement_cache[query] || begin
    STATEMENT_MUTEX.synchronize { statement_cache[query] ||= session.prepare(query) }
  end
end

#unlogged_batch_reactorObject



58
59
60
# File 'lib/cassandra_model/raw_connection.rb', line 58

def unlogged_batch_reactor
  reactor(:@unlogged_reactor, SingleTokenUnloggedBatch)
end