Class: CassandraModel::RawConnection
- Inherits:
-
Object
- Object
- CassandraModel::RawConnection
- Extended by:
- Forwardable
- Includes:
- ConcurrencyHelper
- Defined in:
- lib/cassandra_model/raw_connection.rb,
lib/cassandra_model/mock_connection.rb
Overview
noinspection RubyTooManyInstanceVariablesInspection
Constant Summary collapse
- DEFAULT_CONFIGURATION =
{ hosts: %w(localhost), keyspace: 'default_keyspace', keyspace_options: { class: 'SimpleStrategy', replication_factor: 1 }, port: '9042', consistency: :one, connection_timeout: 10, timeout: 10 }.freeze
Instance Method Summary collapse
- #cluster ⇒ Object
- #config ⇒ Object
- #config=(value) ⇒ Object
- #counter_batch_reactor ⇒ Object
-
#initialize(config_name = nil) ⇒ RawConnection
constructor
A new instance of RawConnection.
- #keyspace ⇒ Object
- #logged_batch_reactor ⇒ Object
- #session ⇒ Object
- #shutdown ⇒ Object
- #statement(query) ⇒ Object
- #unlogged_batch_reactor ⇒ Object
Constructor Details
#initialize(config_name = nil) ⇒ RawConnection
Returns a new instance of RawConnection.
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/cassandra_model/raw_connection.rb', line 24 def initialize(config_name = nil) @config_name = config_name @statement_cache = Concurrent::Map.new @cluster_mutex = Mutex.new @session_mutex = Mutex.new @config_mutex = Mutex.new @reactor_mutex = Mutex.new @executor = Concurrent::Delay.new { Concurrent::CachedThreadPool.new } @futures_factory = Concurrent::Delay.new { Cassandra::Future::Factory.new(executor) } end |
Instance Method Details
#cluster ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/cassandra_model/raw_connection.rb', line 45 def cluster safe_getset_variable(@cluster_mutex, :@cluster) do connection_configuration = config.slice(:hosts, :compression, :consistency, :connection_timeout, :timeout, :username, :password, :address_resolution) connection_configuration[:logger] = Logging.logger connection_configuration[:futures_factory] = futures_factory Cassandra.cluster(connection_configuration) end end |
#config ⇒ Object
41 42 43 |
# File 'lib/cassandra_model/raw_connection.rb', line 41 def config safe_getset_variable(@config_mutex, :@config) { load_config } end |
#config=(value) ⇒ Object
37 38 39 |
# File 'lib/cassandra_model/raw_connection.rb', line 37 def config=(value) @config_mutex.synchronize { @config = DEFAULT_CONFIGURATION.merge(value) } end |
#counter_batch_reactor ⇒ Object
75 76 77 |
# File 'lib/cassandra_model/raw_connection.rb', line 75 def counter_batch_reactor reactor(:@counter_reactor, SingleTokenCounterBatch) end |
#keyspace ⇒ Object
63 64 65 |
# File 'lib/cassandra_model/raw_connection.rb', line 63 def keyspace cluster.keyspace(keyspace_name) || create_keyspace end |
#logged_batch_reactor ⇒ Object
71 72 73 |
# File 'lib/cassandra_model/raw_connection.rb', line 71 def logged_batch_reactor reactor(:@logged_reactor, SingleTokenLoggedBatch) end |
#session ⇒ Object
59 60 61 |
# File 'lib/cassandra_model/raw_connection.rb', line 59 def session safe_getset_variable(@session_mutex, :@session) { cluster.connect(config[:keyspace]) } end |
#shutdown ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/cassandra_model/raw_connection.rb', line 83 def shutdown @shutdown = true @reactor_mutex.synchronize do @unlogged_reactor.stop.get if @unlogged_reactor @unlogged_reactor = nil @logged_reactor.stop.get if @logged_reactor @logged_reactor = nil @counter_reactor.stop.get if @counter_reactor @counter_reactor = nil end @session_mutex.synchronize do @session.close if @session @session = nil end @cluster_mutex.synchronize do @cluster.close if @cluster @cluster = nil end end |
#statement(query) ⇒ Object
79 80 81 |
# File 'lib/cassandra_model/raw_connection.rb', line 79 def statement(query) statement_cache.fetch_or_store(query) { session.prepare(query) } end |
#unlogged_batch_reactor ⇒ Object
67 68 69 |
# File 'lib/cassandra_model/raw_connection.rb', line 67 def unlogged_batch_reactor reactor(:@unlogged_reactor, SingleTokenUnloggedBatch) end |