Class: Cassandra::Cluster

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/cassandra/cluster.rb,
lib/cassandra/cluster/client.rb,
lib/cassandra/cluster/schema.rb,
lib/cassandra/cluster/options.rb,
lib/cassandra/cluster/metadata.rb,
lib/cassandra/cluster/registry.rb,
lib/cassandra/cluster/connector.rb,
lib/cassandra/cluster/connection_pool.rb,
lib/cassandra/cluster/schema/fetchers.rb,
lib/cassandra/cluster/failed_connection.rb,
lib/cassandra/cluster/control_connection.rb,
lib/cassandra/cluster/schema/cql_type_parser.rb,
lib/cassandra/cluster/schema/fqcn_type_parser.rb,
lib/cassandra/cluster/schema/partitioners/random.rb,
lib/cassandra/cluster/schema/partitioners/murmur3.rb,
lib/cassandra/cluster/schema/partitioners/ordered.rb,
lib/cassandra/cluster/schema/replication_strategies/none.rb,
lib/cassandra/cluster/schema/replication_strategies/simple.rb,
lib/cassandra/cluster/schema/replication_strategies/network_topology.rb

Overview

Cluster represents a cassandra cluster. It serves as a session factory factory and a collection of metadata.

Instance Method Summary collapse

Instance Method Details

#closeself

Synchronously closes all sessions managed by this cluster

Returns:

  • (self)

    this cluster

See Also:



306
307
308
# File 'lib/cassandra/cluster.rb', line 306

def close
  close_async.get
end

#close_asyncCassandra::Future<Cassandra::Cluster>

Asynchronously closes all sessions managed by this cluster

Returns:



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/cassandra/cluster.rb', line 285

def close_async
  promise = @futures.promise

  @control_connection.close_async.on_complete do |f|
    if f.resolved?
      promise.fulfill(self)
    else
      f.on_failure {|e| promise.break(e)}
    end

    @executor.shutdown
  end

  promise.future
end

#connect(keyspace = nil) ⇒ Cassandra::Session

Synchronously create a new session, optionally scoped to a keyspace

Parameters:

  • keyspace (String) (defaults to: nil)

    optional keyspace to scope the session to

Returns:

Raises:

See Also:



277
278
279
# File 'lib/cassandra/cluster.rb', line 277

def connect(keyspace = nil)
  connect_async(keyspace).get
end

#connect_async(keyspace = nil) ⇒ Cassandra::Future<Cassandra::Session>

Asynchronously create a new session, optionally scoped to a keyspace

Parameters:

  • keyspace (String) (defaults to: nil)

    optional keyspace to scope session to

Returns:

See Also:



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/cassandra/cluster.rb', line 226

def connect_async(keyspace = nil)
  if !keyspace.nil? && !keyspace.is_a?(::String)
    return @futures.error(::ArgumentError.new("keyspace must be a string, #{keyspace.inspect} given"))
  end

  client  = Client.new(@logger,
                       @registry,
                       @schema,
                       @io_reactor,
                       @connector,
                       @profile_manager,
                       @reconnection_policy,
                       @address_resolver,
                       @connection_options,
                       @futures,
                       @timestamp_generator)
  session = Session.new(client, @execution_options, @futures, @profile_manager)
  promise = @futures.promise

  client.connect.on_complete do |f|
    if f.resolved?
      if keyspace
        f = session.execute_async("USE #{Util.escape_name(keyspace)}")

        f.on_success {promise.fulfill(session)}
        f.on_failure {|e| promise.break(e)}
      else
        promise.fulfill(session)
      end
    else
      f.on_failure {|e| promise.break(e)}
    end
  end

  promise.future
end

#each_execution_profile {|name, profile| ... } ⇒ Cassandra::Cluster #each_execution_profileHash<String, Cassandra::Execution::Profile> Also known as: execution_profiles

Yield or enumerate each execution profile defined in this cluster

Overloads:



177
178
179
180
181
182
183
184
185
# File 'lib/cassandra/cluster.rb', line 177

def each_execution_profile(&block)
  if block_given?
    @profile_manager.profiles.each_pair(&block)
    self
  else
    # Return a dup of the hash to prevent the user from adding/removing profiles from the profile-manager.
    @profile_manager.profiles.dup
  end
end

#each_host {|host| ... } ⇒ Cassandra::Cluster #each_hostArray<Cassandra::Host> Also known as: hosts

Yield or enumerate each member of this cluster

Overloads:



112
113
114
115
116
# File 'lib/cassandra/cluster.rb', line 112

def each_host(&block)
  r = @registry.each_host(&block)
  return self if r == @registry
  r
end

#each_keyspace {|keyspace| ... } ⇒ Cassandra::Cluster #each_keyspaceArray<Cassandra::Keyspace> Also known as: keyspaces

Yield or enumerate each keyspace defined in this cluster

Overloads:



136
137
138
139
140
# File 'lib/cassandra/cluster.rb', line 136

def each_keyspace(&block)
  r = @schema.each_keyspace(&block)
  return self if r == @schema
  r
end

#execution_profile(name) ⇒ Cassandra::Execution::Profile

Returns execution profile of the given name.

Parameters:

  • name (String)

    Name of profile to retrieve

Returns:



166
167
168
# File 'lib/cassandra/cluster.rb', line 166

def execution_profile(name)
  @profile_manager.profiles[name]
end

#find_replicas(keyspace, statement) ⇒ Array<Cassandra::Host>

Note:

an empty list is returned when statement/keyspace information is not enough to determine replica list.

Return replicas for a given statement and keyspace

Parameters:

  • keyspace (String)

    keyspace name

  • statement (Cassandra::Statement)

    statement for which to find replicas

Returns:



82
# File 'lib/cassandra/cluster.rb', line 82

def_delegators :@metadata, :name, :find_replicas

#has_host?(address) ⇒ Boolean

Determine if a host by a given address exists

Parameters:

  • address (IPAddr, String)

    ip address

Returns:

  • (Boolean)

    true or false



128
# File 'lib/cassandra/cluster.rb', line 128

def_delegators :@registry, :host, :has_host?

#has_keyspace?(name) ⇒ Boolean

Determine if a keyspace by a given name exists

Parameters:

  • name (String)

    keyspace name

Returns:

  • (Boolean)

    true or false



152
# File 'lib/cassandra/cluster.rb', line 152

def_delegators :@schema, :keyspace, :has_keyspace?

#host(address) ⇒ Cassandra::Host?

Find a host by its address

Parameters:

  • address (IPAddr, String)

    ip address

Returns:



128
# File 'lib/cassandra/cluster.rb', line 128

def_delegators :@registry, :host, :has_host?

#keyspace(name) ⇒ Cassandra::Keyspace?

Find a keyspace by name

Parameters:

  • name (String)

    keyspace name

Returns:



152
# File 'lib/cassandra/cluster.rb', line 152

def_delegators :@schema, :keyspace, :has_keyspace?

#nameString

Return cluster's name

Returns:

  • (String)

    cluster's name



82
# File 'lib/cassandra/cluster.rb', line 82

def_delegators :@metadata, :name, :find_replicas

#portInteger

Returns Cassandra native protocol port.

Returns:

  • (Integer)

    Cassandra native protocol port



155
156
157
# File 'lib/cassandra/cluster.rb', line 155

def port
  @connection_options.port
end

#protocol_versionInteger

Returns the version of the native protocol used in communication with nodes.

Returns:

  • (Integer)

    the version of the native protocol used in communication with nodes



160
161
162
# File 'lib/cassandra/cluster.rb', line 160

def protocol_version
  @connection_options.protocol_version
end

#refresh_schemanil

Synchronously refresh schema metadata

Returns:

  • (nil)

    nothing

Raises:

See Also:



213
214
215
# File 'lib/cassandra/cluster.rb', line 213

def refresh_schema
  refresh_schema_async.get
end

#refresh_schema_asyncObject



192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/cassandra/cluster.rb', line 192

def refresh_schema_async
  promise = @futures.promise
  @control_connection.send(:refresh_maybe_retry, :schema).on_complete do |f|
    if f.resolved?
      promise.fulfill(nil)
    else
      f.on_failure do |e|
        promise.break(e)
      end
    end
  end
  promise.future
end

#register(listener) ⇒ self

Register a cluster state listener. State listener will start receiving notifications about topology and schema changes

Parameters:

Returns:

  • (self)


89
90
91
92
93
# File 'lib/cassandra/cluster.rb', line 89

def register(listener)
  @registry.add_listener(listener)
  @schema.add_listener(listener)
  self
end

#unregister(listener) ⇒ self

Unregister a cluster state listener. State listener will stop receiving notifications about topology and schema changes

Parameters:

Returns:

  • (self)


100
101
102
103
104
# File 'lib/cassandra/cluster.rb', line 100

def unregister(listener)
  @registry.remove_listener(listener)
  @schema.remove_listener(listener)
  self
end