Class: BigRecordDriver::HbaseServer

Inherits:
BigRecordServer show all
Defined in:
lib/big_record_driver/hbase_driver/server.rb

Instance Method Summary collapse

Methods inherited from BigRecordServer

#method_missing, #respond_to?

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class BigRecordDriver::BigRecordServer

Instance Method Details

#add_column(table_name, column_descriptor) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/big_record_driver/hbase_driver/server.rb', line 229

def add_column(table_name, column_descriptor)
  safe_exec do
    table_name = table_name.to_s

    if @admin.tableExists(table_name)
      @admin.disableTable(table_name)

      cdesc = generate_column_descriptor(column_descriptor)
      @admin.addColumn(table_name, cdesc)

      @admin.enableTable(table_name)
    else
      raise BigRecordDriver::TableNotFound, table_name
    end
  end
end

#configure(config = {}) ⇒ Object

Establish the connection with HBase with the given configuration parameters.



23
24
25
26
27
28
29
30
# File 'lib/big_record_driver/hbase_driver/server.rb', line 23

def configure(config = {})
  config[:zookeeper_quorum]        ||= 'localhost'
  config[:zookeeper_client_port]  ||= '2181'

  @config = config

  init_connection
end

#create_table(table_name, column_descriptors) ⇒ Object

Create a table



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/big_record_driver/hbase_driver/server.rb', line 194

def create_table(table_name, column_descriptors)
  safe_exec do
    table_name = table_name.to_s
    unless table_exists?(table_name)
      tdesc = HTableDescriptor.new(table_name)

      column_descriptors.each do |cd|
        cdesc = generate_column_descriptor(cd)

        tdesc.addFamily(cdesc)
      end
      @admin.createTable(tdesc)
    else
      raise BigRecordDriver::TableAlreadyExists, table_name
    end
  end
end

#delete(table_name, row, timestamp = nil) ⇒ Object

Delete a whole row.



186
187
188
189
190
191
# File 'lib/big_record_driver/hbase_driver/server.rb', line 186

def delete(table_name, row, timestamp = nil)
  safe_exec do
    table = connect_table(table_name)
    timestamp ? table.deleteAll(row.to_bytes, timestamp) : table.deleteAll(row.to_bytes)
  end
end

#drop_table(table_name) ⇒ Object

Delete a table



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/big_record_driver/hbase_driver/server.rb', line 213

def drop_table(table_name)
  safe_exec do
    table_name = table_name.to_s

    if @admin.tableExists(table_name)
      @admin.disableTable(table_name)
      @admin.deleteTable(table_name)

      # Remove the table connection from the cache
      @tables.delete(table_name) if @tables.has_key?(table_name)
    else
      raise BigRecordDriver::TableNotFound, table_name
    end
  end
end

#get(table_name, row, column, options = {}) ⇒ Object

Returns a column of a row. Example:

get('entities', 'b9cef848-a4e0-11dc-a7ba-0018f3137ea8', 'attribute:travel_rank')
=> "--- 0.90124565\n"

valid options:

:timestamp      => integer corresponding to the time when the record was saved in hbase
:versions   => number of versions to retreive, starting at the specified timestamp (or the latest)


59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/big_record_driver/hbase_driver/server.rb', line 59

def get(table_name, row, column, options={})
  safe_exec do
    return nil unless row
    table = connect_table(table_name)

    # Retreive only the last version by default
    options[:versions] ||= options[:num_versions]
    options[:versions] ||= 1

    # validate the arguments
    raise ArgumentError, "versions must be >= 1" unless options[:versions] >= 1

    # get the raw data from hbase
    unless options[:timestamp]
      if options[:versions] == 1
        raw_data = table.get(row, column)
      else
        raw_data = table.get(row,
                              column,
                              options[:versions])
      end
    else
      raw_data = table.get(row,
                            column,
                            options[:timestamp],
                            options[:versions])
    end

    # Return either a single value or an array, depending on the number of version that have been requested
    if options[:versions] == 1
      return nil unless raw_data
      raw_data = raw_data[0] if options[:timestamp]
      to_ruby_string(raw_data)
    else
      return [] unless raw_data
      raw_data.collect do |raw_data_version|
        to_ruby_string(raw_data_version)
      end
    end
  end
end

#get_columns(table_name, row, columns, options = {}) ⇒ Object

Returns the last version of the given columns of the given row. The columns works with regular expressions (e.g. ‘attribute:’ matches all attributes columns). Example:

get_columns('entities', 'b9cef848-a4e0-11dc-a7ba-0018f3137ea8', ['attribute:'])
=> {"attribute:name" => "--- Oahu\n", "attribute:travel_rank" => "--- 0.90124565\n", etc...}


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/big_record_driver/hbase_driver/server.rb', line 105

def get_columns(table_name, row, columns, options={})
  safe_exec do
    return nil unless row
    table_name = table_name.to_s
    table = connect_table(table_name)

    java_cols = Java::String[columns.size].new
    columns.each_with_index do |col, i|
      java_cols[i] = Java::String.new(col)
    end

    result =
    if options[:timestamp]
      table.getRow(row, java_cols, options[:timestamp])
    else
      table.getRow(row, java_cols)
    end

    unless !result or result.isEmpty
      values = {}
      result.entrySet.each do |entry|
        column_name = Java::String.new(entry.getKey).to_s
        values[column_name] = to_ruby_string(entry.getValue)
      end
      values["id"] = row
      values
    else
      nil
    end
  end
end

#get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil) ⇒ Object

Get consecutive rows. Example to get 100 records starting with the one specified and get all the columns in the column family ‘attribute:’ :

get_consecutive_rows('entities', 'b9cef848-a4e0-11dc-a7ba-0018f3137ea8', 100, ['attribute:'])


140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/big_record_driver/hbase_driver/server.rb', line 140

def get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)
  safe_exec do
    table_name = table_name.to_s
    table = connect_table(table_name)

    java_cols = Java::String[columns.size].new
    columns.each_with_index do |col, i|
      java_cols[i] = Java::String.new(col)
    end

    start_row ||= ""
    start_row = start_row.to_s

    # We cannot set stop_row like start_row because a
    # default stop row would have to be the biggest value possible
    if stop_row
      scanner = table.getScanner(java_cols, start_row, stop_row, HConstants::LATEST_TIMESTAMP)
    else
      scanner = table.getScanner(java_cols, start_row)
    end

    row_count = 0 if limit
    result = []
    while (row_result = scanner.next) != nil
      if limit
        break if row_count == limit
        row_count += 1
      end
      values = {}
      row_result.entrySet.each do |entry|
        column_name = Java::String.new(entry.getKey).to_s
        data = to_ruby_string(entry.getValue)
        values[column_name] = data
      end
      unless values.empty?
        # TODO: is this really supposed to be hard coded?
        values['id'] = Java::String.new(row_result.getRow).to_s
        result << values
      end
    end
    scanner.close
    result
  end
end

#modify_column(table_name, column_descriptor) ⇒ Object



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/big_record_driver/hbase_driver/server.rb', line 264

def modify_column(table_name, column_descriptor)
  safe_exec do
    table_name = table_name.to_s
    column_name = column_name.to_s

    if @admin.tableExists(table_name)
      @admin.disableTable(table_name)

      cdesc = generate_column_descriptor(column_descriptor)
      @admin.modifyColumn(table_name, column_descriptor.name, cdesc)

      @admin.enableTable(table_name)
    else
      raise BigRecordDriver::TableNotFound, table_name
    end
  end
end

#pingObject



292
293
294
295
296
# File 'lib/big_record_driver/hbase_driver/server.rb', line 292

def ping
  safe_exec do
    @admin.isMasterRunning
  end
end

#remove_column(table_name, column_name) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/big_record_driver/hbase_driver/server.rb', line 246

def remove_column(table_name, column_name)
  safe_exec do
    table_name = table_name.to_s
    column_name = column_name.to_s

    if @admin.tableExists(table_name)
      @admin.disableTable(table_name)

      column_name << ":" unless column_name =~ /:$/
      @admin.deleteColumn(table_name, column_name)

      @admin.enableTable(table_name)
    else
      raise BigRecordDriver::TableNotFound, table_name
    end
  end
end

#table_exists?(table_name) ⇒ Boolean

Returns:

  • (Boolean)


298
299
300
301
302
# File 'lib/big_record_driver/hbase_driver/server.rb', line 298

def table_exists?(table_name)
  safe_exec do
    @admin.tableExists(table_name.to_s)
  end
end

#table_namesObject



304
305
306
307
308
# File 'lib/big_record_driver/hbase_driver/server.rb', line 304

def table_names
  safe_exec do
    @admin.listTables.collect{|td| Java::String.new(td.getName).to_s}
  end
end

#truncate_table(table_name) ⇒ Object



282
283
284
285
286
287
288
289
290
# File 'lib/big_record_driver/hbase_driver/server.rb', line 282

def truncate_table(table_name)
  safe_exec do
    table_name = table_name.to_s
    table = connect_table(table_name)
    tableDescriptor = table.getTableDescriptor
    drop_table(table_name)
    @admin.createTable(tableDescriptor)
  end
end

#update(table_name, row, values, timestamp = nil) ⇒ Object

Atomic row insertion/update. Example:

update('entities', 'b9cef848-a4e0-11dc-a7ba-0018f3137ea8', {'attribute:name' => "--- Oahu\n",
                                                            'attribute:travel_rank' => "--- 0.90124565\n"})
=> 'b9cef848-a4e0-11dc-a7ba-0018f3137ea8'


36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/big_record_driver/hbase_driver/server.rb', line 36

def update(table_name, row, values, timestamp=nil)
  safe_exec do
    return nil unless row
    table = connect_table(table_name)

    batch = timestamp ? BatchUpdate.new(row, timestamp) : BatchUpdate.new(row)

    values.each do |column, value|
      batch.put(column, value.to_bytes)
    end

    table.commit(batch)
    row
  end
end