Class: DeltaLake::Table

Inherits:
Object
  • Object
show all
Defined in:
lib/deltalake/table.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(table_uri, version: nil, storage_options: nil, without_files: false, log_buffer_size: nil) ⇒ Table

Returns a new instance of Table.



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/deltalake/table.rb', line 3

def initialize(
  table_uri,
  version: nil,
  storage_options: nil,
  without_files: false,
  log_buffer_size: nil
)
  @storage_options = storage_options
  @table =
    RawDeltaTable.new(
      table_uri,
      version,
      storage_options,
      without_files,
      log_buffer_size
    )
end

Class Method Details

.exists?(table_uri, storage_options: nil) ⇒ Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/deltalake/table.rb', line 21

def self.exists?(table_uri, storage_options: nil)
  RawDeltaTable.is_deltatable(table_uri, storage_options)
end

Instance Method Details

#_stringify_partition_values(partition_filters) ⇒ Object

private

Raises:



260
261
262
263
264
265
266
# File 'lib/deltalake/table.rb', line 260

def _stringify_partition_values(partition_filters)
  if partition_filters.nil?
    return partition_filters
  end

  raise Todo
end

#_tableObject

private



255
256
257
# File 'lib/deltalake/table.rb', line 255

def _table
  @table
end

#alterObject



135
136
137
# File 'lib/deltalake/table.rb', line 135

def alter
  TableAlterer.new(self)
end

#delete(predicate = nil, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil) ⇒ Object



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/deltalake/table.rb', line 220

def delete(
  predicate = nil,
  writer_properties: nil,
  post_commithook_properties: nil,
  commit_properties: nil
)
  metrics =
    @table.delete(
      predicate,
      writer_properties,
      post_commithook_properties,
      commit_properties
    )
  JSON.parse(metrics).transform_keys(&:to_sym)
end

#file_uris(partition_filters: nil) ⇒ Object



42
43
44
# File 'lib/deltalake/table.rb', line 42

def file_uris(partition_filters: nil)
  @table.file_uris(_stringify_partition_values(partition_filters))
end

#files(partition_filters: nil) ⇒ Object



38
39
40
# File 'lib/deltalake/table.rb', line 38

def files(partition_filters: nil)
  @table.files(_stringify_partition_values(partition_filters))
end

#history(limit: nil) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/deltalake/table.rb', line 90

def history(limit: nil)
  backwards_enumerate = lambda do |iterable, start_end, &block|
    n = start_end
    iterable.each do |elem|
      block.call(n, elem)
      n -= 1
    end
  end

  commits = @table.history(limit)
  history = []
  backwards_enumerate.(commits, @table.get_latest_version) do |version, commit_info_raw|
    commit = JSON.parse(commit_info_raw)
    commit["version"] = version
    history << commit
  end
  history
end

#load_as_version(version) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/deltalake/table.rb', line 46

def load_as_version(version)
  if version.is_a?(Integer)
    @table.load_version(version)
  elsif version.is_a?(Time)
    @table.load_with_datetime(version.utc.iso8601(9))
  elsif version.is_a?(String)
    @table.load_with_datetime(version)
  else
    raise TypeError, "Invalid datatype provided for version, only Integer, String, and Time are accepted."
  end
end

#load_cdf(starting_version: 0, ending_version: nil, starting_timestamp: nil, ending_timestamp: nil, columns: nil) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/deltalake/table.rb', line 58

def load_cdf(
  starting_version: 0,
  ending_version: nil,
  starting_timestamp: nil,
  ending_timestamp: nil,
  columns: nil
)
  @table.load_cdf(
    starting_version,
    ending_version,
    starting_timestamp,
    ending_timestamp,
    columns
  )
end

#merge(source, predicate, source_alias: nil, target_alias: nil, error_on_type_mismatch: true, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/deltalake/table.rb', line 139

def merge(
  source,
  predicate,
  source_alias: nil,
  target_alias: nil,
  error_on_type_mismatch: true,
  writer_properties: nil,
  post_commithook_properties: nil,
  commit_properties: nil
)
  source = Utils.convert_data(source)

  rb_merge_builder =
    @table.create_merge_builder(
      source,
      predicate,
      source_alias,
      target_alias,
      !error_on_type_mismatch,
      writer_properties,
      post_commithook_properties,
      commit_properties
    )
  TableMerger.new(rb_merge_builder, @table)
end

#metadataObject



82
83
84
# File 'lib/deltalake/table.rb', line 82

def 
  Metadata.new(@table)
end

#optimizeObject



131
132
133
# File 'lib/deltalake/table.rb', line 131

def optimize
  TableOptimizer.new(self)
end

#partitionsObject



29
30
31
32
33
34
35
36
# File 'lib/deltalake/table.rb', line 29

def partitions
  partitions = []
  @table.get_active_partitions.each do |partition|
    next unless partition
    partitions << partition.to_h
  end
  partitions
end

#protocolObject



86
87
88
# File 'lib/deltalake/table.rb', line 86

def protocol
  ProtocolVersions.new(*@table.protocol_versions)
end

#repair(dry_run: false, post_commithook_properties: nil, commit_properties: nil) ⇒ Object



236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/deltalake/table.rb', line 236

def repair(
  dry_run: false,
  post_commithook_properties: nil,
  commit_properties: nil
)
  metrics =
    @table.repair(
      dry_run,
      commit_properties,
      post_commithook_properties
    )
  JSON.parse(metrics).transform_keys(&:to_sym)
end

#restore(target, ignore_missing_files: false, protocol_downgrade_allowed: false, commit_properties: nil) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/deltalake/table.rb', line 165

def restore(
  target,
  ignore_missing_files: false,
  protocol_downgrade_allowed: false,
  commit_properties: nil
)
  if target.is_a?(Time)
    metrics =
      @table.restore(
        target.utc.iso8601(9),
        ignore_missing_files,
        protocol_downgrade_allowed,
        commit_properties
      )
  else
    metrics =
      @table.restore(
        target,
        ignore_missing_files,
        protocol_downgrade_allowed,
        commit_properties
      )
  end
  JSON.parse(metrics)
end

#schemaObject



78
79
80
# File 'lib/deltalake/table.rb', line 78

def schema
  @table.schema
end

#table_uriObject



74
75
76
# File 'lib/deltalake/table.rb', line 74

def table_uri
  @table.table_uri
end

#to_polars(eager: true, rechunk: false, columns: nil) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/deltalake/table.rb', line 191

def to_polars(eager: true, rechunk: false, columns: nil)
  require "polars-df"

  sources = file_uris
  if sources.empty?
    lf = Polars::LazyFrame.new
  else
    delta_keys = [
      "AWS_S3_ALLOW_UNSAFE_RENAME",
      "AWS_S3_LOCKING_PROVIDER",
      "CONDITIONAL_PUT",
      "DELTA_DYNAMO_TABLE_NAME"
    ]
    storage_options = @storage_options&.reject { |k, _| delta_keys.include?(k.to_s.upcase) }
    lf = Polars.scan_parquet(sources, storage_options: storage_options, rechunk: rechunk)

    if columns
      # by_name requires polars-df > 0.15.0
      lf = lf.select(Polars.cs.by_name(*columns))
    end
  end

  eager ? lf.collect : lf
end

#transaction_versionsObject



250
251
252
# File 'lib/deltalake/table.rb', line 250

def transaction_versions
  @table.transaction_versions
end

#update_incrementalObject



216
217
218
# File 'lib/deltalake/table.rb', line 216

def update_incremental
  @table.update_incremental
end

#vacuum(retention_hours: nil, dry_run: true, enforce_retention_duration: true, post_commithook_properties: nil, commit_properties: nil) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/deltalake/table.rb', line 109

def vacuum(
  retention_hours: nil,
  dry_run: true,
  enforce_retention_duration: true,
  post_commithook_properties: nil,
  commit_properties: nil
)
  if retention_hours
    if retention_hours < 0
      raise ArgumentError, "The retention periods should be positive."
    end
  end

  @table.vacuum(
    dry_run,
    retention_hours,
    enforce_retention_duration,
    commit_properties,
    post_commithook_properties
  )
end

#versionObject



25
26
27
# File 'lib/deltalake/table.rb', line 25

def version
  @table.version
end