Class: CassandraClient

Inherits:
Object
  • Object
show all
Includes:
Helper
Defined in:
lib/cassandra_client/helper.rb,
lib/cassandra_client/ordered_hash.rb,
lib/cassandra_client/serialization.rb,
lib/cassandra_client/cassandra_client.rb

Defined Under Namespace

Modules: Helper, Serialization Classes: AccessError, OrderedHash

Constant Summary collapse

MAX_INT =
2**31 - 1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(keyspace, host = '127.0.0.1', port = 9160, quorum = 1, serializer = CassandraClient::Serialization::JSON) ⇒ CassandraClient

Instantiate a new CassandraClient and open the connection.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/cassandra_client/cassandra_client.rb', line 10

def initialize(keyspace, host = '127.0.0.1', port = 9160, quorum = 1, serializer = CassandraClient::Serialization::JSON)
  @keyspace = keyspace
  @host = host
  @port = port
  @quorum = quorum
  @serializer = serializer

  @transport = Thrift::BufferedTransport.new(Thrift::Socket.new(@host, @port))
  @transport.open    
  @client = Cassandra::SafeClient.new(
    Cassandra::Client.new(Thrift::BinaryProtocol.new(@transport)), 
    @transport)

  keyspaces = @client.getStringListProperty("tables")
  unless keyspaces.include?(@keyspace)
    raise AccessError, "Keyspace #{@keyspace.inspect} not found. Available: #{keyspaces.inspect}"
  end
      
  @schema = @client.describeTable(@keyspace)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



7
8
9
# File 'lib/cassandra_client/cassandra_client.rb', line 7

def client
  @client
end

#hostObject (readonly)

Returns the value of attribute host.



7
8
9
# File 'lib/cassandra_client/cassandra_client.rb', line 7

def host
  @host
end

#keyspaceObject (readonly)

Returns the value of attribute keyspace.



7
8
9
# File 'lib/cassandra_client/cassandra_client.rb', line 7

def keyspace
  @keyspace
end

#portObject (readonly)

Returns the value of attribute port.



7
8
9
# File 'lib/cassandra_client/cassandra_client.rb', line 7

def port
  @port
end

#quorumObject (readonly)

Returns the value of attribute quorum.



7
8
9
# File 'lib/cassandra_client/cassandra_client.rb', line 7

def quorum
  @quorum
end

#schemaObject (readonly)

Returns the value of attribute schema.



7
8
9
# File 'lib/cassandra_client/cassandra_client.rb', line 7

def schema
  @schema
end

#serializerObject (readonly)

Returns the value of attribute serializer.



7
8
9
# File 'lib/cassandra_client/cassandra_client.rb', line 7

def serializer
  @serializer
end

#transportObject (readonly)

Returns the value of attribute transport.



7
8
9
# File 'lib/cassandra_client/cassandra_client.rb', line 7

def transport
  @transport
end

Instance Method Details

#clear_column_family!(column_family) ⇒ Object

Remove all rows in the column family you request.



69
70
71
72
73
# File 'lib/cassandra_client/cassandra_client.rb', line 69

def clear_column_family!(column_family)
  get_key_range(column_family).each do |key| 
    remove(column_family, key)
  end
end

#clear_keyspace!Object

Remove all rows in the keyspace



76
77
78
79
80
# File 'lib/cassandra_client/cassandra_client.rb', line 76

def clear_keyspace!
  @schema.keys.each do |column_family|
    clear_column_family!(column_family)
  end
end

#count(column_family, key_range = ''..'', limit = MAX_INT) ⇒ Object

Count all rows in the column_family you request. Requires the table to be partitioned with OrderPreservingHash.



152
153
154
# File 'lib/cassandra_client/cassandra_client.rb', line 152

def count(column_family, key_range = ''..'', limit = MAX_INT)
  get_key_range(column_family, key_range, limit).size
end

#count_columns(column_family, key, super_column = nil) ⇒ Object

Count the elements at the column_family:key:super_column path you request.



86
87
88
89
90
# File 'lib/cassandra_client/cassandra_client.rb', line 86

def count_columns(column_family, key, super_column = nil)
  @client.get_column_count(@keyspace, key, 
    ColumnParent.new(:column_family => column_family.to_s, :super_column => super_column)
  )
end

#get(column_family, key, super_column = nil, column = nil, limit = 100) ⇒ Object

Return a hash (actually, a CassandraClient::OrderedHash) or a single value representing the element at the column_family:key:super_column:column path you request.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/cassandra_client/cassandra_client.rb', line 108

def get(column_family, key, super_column = nil, column = nil, limit = 100)
  # You have got to be kidding
  if is_super(column_family)
    if column
      load(@client.get_column(@keyspace, key,  ColumnPath.new(:column_family => column_family.to_s, :super_column => super_column, :column => column)).value)
    elsif super_column
      columns_to_hash(@client.get_super_column(@keyspace, key,  SuperColumnPath.new(:column_family => column_family.to_s, :super_column => super_column)).columns)
    else
      # FIXME bug
      columns_to_hash(@client.get_slice_super(@keyspace, key, column_family.to_s, '', '', -1, limit))
    end
  else
    if super_column
      load(@client.get_column(@keyspace, key, ColumnPath.new(:column_family => column_family.to_s, :column => super_column)).value)
    elsif is_sorted_by_time(column_family)
      result = columns_to_hash(@client.get_columns_since(@keyspace, key, ColumnParent.new(:column_family => column_family.to_s), 0))

      # FIXME Hack until get_slice on a time-sorted column family works again
      result = OrderedHash[*flatten_once(result.to_a[0, limit])]
      result
    else
      columns_to_hash(@client.get_slice(@keyspace, key, ColumnParent.new(:column_family => column_family.to_s), '', '', -1, limit))
    end 
  end
rescue NotFoundException
  is_super(column_family) && !column ? OrderedHash.new : nil
end

#get_columns(column_family, key, super_columns, columns = nil) ⇒ Object

Return a list of single values for the elements at the column_family:key:super_column:column path you request.



94
95
96
97
98
99
100
101
102
103
# File 'lib/cassandra_client/cassandra_client.rb', line 94

def get_columns(column_family, key, super_columns, columns = nil)
  super_columns, columns = columns, super_columns unless columns
  result = if is_super(column_family) && !super_columns 
    columns_to_hash(@client.get_slice_super_by_names(@keyspace, key, column_family.to_s, columns))
  else
    columns_to_hash(@client.get_slice_by_names(@keyspace, key, 
      ColumnParent.new(:column_family => column_family.to_s, :super_column => super_columns), columns))
  end    
  columns.map { |name| result[name] }
end

#get_key_range(column_family, key_range = ''..'', limit = 100) ⇒ Object

Return a list of keys in the column_family you request. Requires the table to be partitioned with OrderPreservingHash.



146
147
148
# File 'lib/cassandra_client/cassandra_client.rb', line 146

def get_key_range(column_family, key_range = ''..'', limit = 100)      
  @client.get_key_range(@keyspace, column_family.to_s, key_range.begin, key_range.end, limit)
end

#insert(column_family, key, hash, timestamp = now) ⇒ Object

Insert a row for a key. Pass a flat hash for a regular column family, and a nested hash for a super column family.



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/cassandra_client/cassandra_client.rb', line 41

def insert(column_family, key, hash, timestamp = now)
  if is_super(column_family) 
    mutation = BatchMutationSuper.new(
      :key => key, 
      :cfmap => {column_family.to_s => hash_to_super_columns(hash, timestamp)})
    @client.batch_insert_super_column(@keyspace, mutation, @quorum)    
  else
    mutation = BatchMutation.new(
      :key => key, 
      :cfmap => {column_family.to_s => hash_to_columns(hash, timestamp)})
    @client.batch_insert(@keyspace, mutation, @quorum)
  end
end

#inspectObject



31
32
33
34
35
# File 'lib/cassandra_client/cassandra_client.rb', line 31

def inspect
  "#<CassandraClient:#{object_id}, @keyspace=#{keyspace.inspect}, @schema={#{
    schema.map {|name, hash| ":#{name} => #{hash['type'].inspect}"}.join(', ')
  }}, @host=#{host.inspect}, @port=#{port}, @quorum=#{quorum}, @serializer=#{serializer.name}>"
end

#remove(column_family, key, super_column = nil, column = nil, timestamp = now) ⇒ Object

Remove the element at the column_family:key:super_column:column path you request.



61
62
63
64
65
66
# File 'lib/cassandra_client/cassandra_client.rb', line 61

def remove(column_family, key, super_column = nil, column = nil, timestamp = now)
  super_column, column = column, super_column unless is_super(column_family)
  @client.remove(@keyspace, key,
    ColumnPathOrParent.new(:column_family => column_family.to_s, :super_column => super_column, :column => column), 
    timestamp, @quorum)
end