Class: DeltaLake::Table
- Inherits:
-
Object
- Object
- DeltaLake::Table
- Defined in:
- lib/deltalake/table.rb
Class Method Summary collapse
Instance Method Summary collapse
-
#_stringify_partition_values(partition_filters) ⇒ Object
private.
-
#_table ⇒ Object
private.
- #alter ⇒ Object
- #delete(predicate = nil, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
- #file_uris(partition_filters: nil) ⇒ Object
- #files(partition_filters: nil) ⇒ Object
- #history(limit: nil) ⇒ Object
-
#initialize(table_uri, version: nil, storage_options: nil, without_files: false, log_buffer_size: nil) ⇒ Table
constructor
A new instance of Table.
- #load_as_version(version) ⇒ Object
- #load_cdf(starting_version: 0, ending_version: nil, starting_timestamp: nil, ending_timestamp: nil, columns: nil) ⇒ Object
- #merge(source, predicate, source_alias: nil, target_alias: nil, error_on_type_mismatch: true, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
- #metadata ⇒ Object
- #optimize ⇒ Object
- #partitions ⇒ Object
- #protocol ⇒ Object
- #repair(dry_run: false, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
- #restore(target, ignore_missing_files: false, protocol_downgrade_allowed: false, commit_properties: nil) ⇒ Object
- #schema ⇒ Object
- #table_uri ⇒ Object
- #to_polars(eager: true, rechunk: false, columns: nil) ⇒ Object
- #transaction_versions ⇒ Object
- #update_incremental ⇒ Object
- #vacuum(retention_hours: nil, dry_run: true, enforce_retention_duration: true, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
- #version ⇒ Object
Constructor Details
#initialize(table_uri, version: nil, storage_options: nil, without_files: false, log_buffer_size: nil) ⇒ Table
Returns a new instance of Table.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/deltalake/table.rb', line 3 def initialize( table_uri, version: nil, storage_options: nil, without_files: false, log_buffer_size: nil ) @storage_options = @table = RawDeltaTable.new( table_uri, version, , without_files, log_buffer_size ) end |
Class Method Details
.exists?(table_uri, storage_options: nil) ⇒ Boolean
21 22 23 |
# File 'lib/deltalake/table.rb', line 21 def self.exists?(table_uri, storage_options: nil) RawDeltaTable.is_deltatable(table_uri, ) end |
Instance Method Details
#_stringify_partition_values(partition_filters) ⇒ Object
private
260 261 262 263 264 265 266 |
# File 'lib/deltalake/table.rb', line 260 def _stringify_partition_values(partition_filters) if partition_filters.nil? return partition_filters end raise Todo end |
#_table ⇒ Object
private
255 256 257 |
# File 'lib/deltalake/table.rb', line 255 def _table @table end |
#alter ⇒ Object
135 136 137 |
# File 'lib/deltalake/table.rb', line 135 def alter TableAlterer.new(self) end |
#delete(predicate = nil, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/deltalake/table.rb', line 220 def delete( predicate = nil, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil ) metrics = @table.delete( predicate, writer_properties, post_commithook_properties, commit_properties ) JSON.parse(metrics).transform_keys(&:to_sym) end |
#file_uris(partition_filters: nil) ⇒ Object
42 43 44 |
# File 'lib/deltalake/table.rb', line 42 def file_uris(partition_filters: nil) @table.file_uris(_stringify_partition_values(partition_filters)) end |
#files(partition_filters: nil) ⇒ Object
38 39 40 |
# File 'lib/deltalake/table.rb', line 38 def files(partition_filters: nil) @table.files(_stringify_partition_values(partition_filters)) end |
#history(limit: nil) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/deltalake/table.rb', line 90 def history(limit: nil) backwards_enumerate = lambda do |iterable, start_end, &block| n = start_end iterable.each do |elem| block.call(n, elem) n -= 1 end end commits = @table.history(limit) history = [] backwards_enumerate.(commits, @table.get_latest_version) do |version, commit_info_raw| commit = JSON.parse(commit_info_raw) commit["version"] = version history << commit end history end |
#load_as_version(version) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/deltalake/table.rb', line 46 def load_as_version(version) if version.is_a?(Integer) @table.load_version(version) elsif version.is_a?(Time) @table.load_with_datetime(version.utc.iso8601(9)) elsif version.is_a?(String) @table.load_with_datetime(version) else raise TypeError, "Invalid datatype provided for version, only Integer, String, and Time are accepted." end end |
#load_cdf(starting_version: 0, ending_version: nil, starting_timestamp: nil, ending_timestamp: nil, columns: nil) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/deltalake/table.rb', line 58 def load_cdf( starting_version: 0, ending_version: nil, starting_timestamp: nil, ending_timestamp: nil, columns: nil ) @table.load_cdf( starting_version, ending_version, , , columns ) end |
#merge(source, predicate, source_alias: nil, target_alias: nil, error_on_type_mismatch: true, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/deltalake/table.rb', line 139 def merge( source, predicate, source_alias: nil, target_alias: nil, error_on_type_mismatch: true, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil ) source = Utils.convert_data(source) rb_merge_builder = @table.create_merge_builder( source, predicate, source_alias, target_alias, !error_on_type_mismatch, writer_properties, post_commithook_properties, commit_properties ) TableMerger.new(rb_merge_builder, @table) end |
#optimize ⇒ Object
131 132 133 |
# File 'lib/deltalake/table.rb', line 131 def optimize TableOptimizer.new(self) end |
#partitions ⇒ Object
29 30 31 32 33 34 35 36 |
# File 'lib/deltalake/table.rb', line 29 def partitions partitions = [] @table.get_active_partitions.each do |partition| next unless partition partitions << partition.to_h end partitions end |
#protocol ⇒ Object
86 87 88 |
# File 'lib/deltalake/table.rb', line 86 def protocol ProtocolVersions.new(*@table.protocol_versions) end |
#repair(dry_run: false, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/deltalake/table.rb', line 236 def repair( dry_run: false, post_commithook_properties: nil, commit_properties: nil ) metrics = @table.repair( dry_run, commit_properties, post_commithook_properties ) JSON.parse(metrics).transform_keys(&:to_sym) end |
#restore(target, ignore_missing_files: false, protocol_downgrade_allowed: false, commit_properties: nil) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/deltalake/table.rb', line 165 def restore( target, ignore_missing_files: false, protocol_downgrade_allowed: false, commit_properties: nil ) if target.is_a?(Time) metrics = @table.restore( target.utc.iso8601(9), ignore_missing_files, protocol_downgrade_allowed, commit_properties ) else metrics = @table.restore( target, ignore_missing_files, protocol_downgrade_allowed, commit_properties ) end JSON.parse(metrics) end |
#schema ⇒ Object
78 79 80 |
# File 'lib/deltalake/table.rb', line 78 def schema @table.schema end |
#table_uri ⇒ Object
74 75 76 |
# File 'lib/deltalake/table.rb', line 74 def table_uri @table.table_uri end |
#to_polars(eager: true, rechunk: false, columns: nil) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/deltalake/table.rb', line 191 def to_polars(eager: true, rechunk: false, columns: nil) require "polars-df" sources = file_uris if sources.empty? lf = Polars::LazyFrame.new else delta_keys = [ "AWS_S3_ALLOW_UNSAFE_RENAME", "AWS_S3_LOCKING_PROVIDER", "CONDITIONAL_PUT", "DELTA_DYNAMO_TABLE_NAME" ] = @storage_options&.reject { |k, _| delta_keys.include?(k.to_s.upcase) } lf = Polars.scan_parquet(sources, storage_options: , rechunk: rechunk) if columns # by_name requires polars-df > 0.15.0 lf = lf.select(Polars.cs.by_name(*columns)) end end eager ? lf.collect : lf end |
#transaction_versions ⇒ Object
250 251 252 |
# File 'lib/deltalake/table.rb', line 250 def transaction_versions @table.transaction_versions end |
#update_incremental ⇒ Object
216 217 218 |
# File 'lib/deltalake/table.rb', line 216 def update_incremental @table.update_incremental end |
#vacuum(retention_hours: nil, dry_run: true, enforce_retention_duration: true, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/deltalake/table.rb', line 109 def vacuum( retention_hours: nil, dry_run: true, enforce_retention_duration: true, post_commithook_properties: nil, commit_properties: nil ) if retention_hours if retention_hours < 0 raise ArgumentError, "The retention periods should be positive." end end @table.vacuum( dry_run, retention_hours, enforce_retention_duration, commit_properties, post_commithook_properties ) end |
#version ⇒ Object
25 26 27 |
# File 'lib/deltalake/table.rb', line 25 def version @table.version end |