Class: DeltaLake::Table
- Inherits:
-
Object
- Object
- DeltaLake::Table
- Defined in:
- lib/deltalake/table.rb
Constant Summary collapse
- FSCK_METRICS_FILES_REMOVED_LABEL =
"files_removed"
Class Method Summary collapse
Instance Method Summary collapse
-
#_stringify_partition_values(partition_filters) ⇒ Object
private.
-
#_table ⇒ Object
private.
- #alter ⇒ Object
- #delete(predicate = nil, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
- #file_uris(partition_filters: nil) ⇒ Object
- #files(partition_filters: nil) ⇒ Object
- #history(limit: nil) ⇒ Object
-
#initialize(table_uri, version: nil, storage_options: nil, without_files: false, log_buffer_size: nil) ⇒ Table
constructor
A new instance of Table.
- #load_as_version(version) ⇒ Object
- #load_cdf(starting_version: 0, ending_version: nil, starting_timestamp: nil, ending_timestamp: nil, columns: nil) ⇒ Object
- #merge(source, predicate, source_alias: nil, target_alias: nil, error_on_type_mismatch: true, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
- #metadata ⇒ Object
- #optimize ⇒ Object
- #partitions ⇒ Object
- #protocol ⇒ Object
- #repair(dry_run: false, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
- #restore(target, ignore_missing_files: false, protocol_downgrade_allowed: false, commit_properties: nil) ⇒ Object
- #schema ⇒ Object
- #table_uri ⇒ Object
- #to_polars(eager: true, rechunk: false, columns: nil) ⇒ Object
- #transaction_versions ⇒ Object
- #update_incremental ⇒ Object
- #vacuum(retention_hours: nil, dry_run: true, enforce_retention_duration: true, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
- #version ⇒ Object
Constructor Details
#initialize(table_uri, version: nil, storage_options: nil, without_files: false, log_buffer_size: nil) ⇒ Table
Returns a new instance of Table.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/deltalake/table.rb', line 5 def initialize( table_uri, version: nil, storage_options: nil, without_files: false, log_buffer_size: nil ) @storage_options = @table = RawDeltaTable.new( table_uri, version, , without_files, log_buffer_size ) end |
Class Method Details
.exists?(table_uri, storage_options: nil) ⇒ Boolean
23 24 25 |
# File 'lib/deltalake/table.rb', line 23 def self.exists?(table_uri, storage_options: nil) RawDeltaTable.is_deltatable(table_uri, ) end |
Instance Method Details
#_stringify_partition_values(partition_filters) ⇒ Object
private
266 267 268 269 270 271 272 |
# File 'lib/deltalake/table.rb', line 266 def _stringify_partition_values(partition_filters) if partition_filters.nil? return partition_filters end raise Todo end |
#_table ⇒ Object
private
261 262 263 |
# File 'lib/deltalake/table.rb', line 261 def _table @table end |
#alter ⇒ Object
137 138 139 |
# File 'lib/deltalake/table.rb', line 137 def alter TableAlterer.new(self) end |
#delete(predicate = nil, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/deltalake/table.rb', line 222 def delete( predicate = nil, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil ) metrics = @table.delete( predicate, writer_properties, post_commithook_properties, commit_properties ) JSON.parse(metrics).transform_keys(&:to_sym) end |
#file_uris(partition_filters: nil) ⇒ Object
44 45 46 |
# File 'lib/deltalake/table.rb', line 44 def file_uris(partition_filters: nil) @table.file_uris(_stringify_partition_values(partition_filters)) end |
#files(partition_filters: nil) ⇒ Object
40 41 42 |
# File 'lib/deltalake/table.rb', line 40 def files(partition_filters: nil) @table.files(_stringify_partition_values(partition_filters)) end |
#history(limit: nil) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/deltalake/table.rb', line 92 def history(limit: nil) backwards_enumerate = lambda do |iterable, start_end, &block| n = start_end iterable.each do |elem| block.call(n, elem) n -= 1 end end commits = @table.history(limit) history = [] backwards_enumerate.(commits, @table.get_latest_version) do |version, commit_info_raw| commit = JSON.parse(commit_info_raw) commit["version"] = version history << commit end history end |
#load_as_version(version) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/deltalake/table.rb', line 48 def load_as_version(version) if version.is_a?(Integer) @table.load_version(version) elsif version.is_a?(Time) @table.load_with_datetime(version.utc.iso8601(9)) elsif version.is_a?(String) @table.load_with_datetime(version) else raise TypeError, "Invalid datatype provided for version, only Integer, String, and Time are accepted." end end |
#load_cdf(starting_version: 0, ending_version: nil, starting_timestamp: nil, ending_timestamp: nil, columns: nil) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/deltalake/table.rb', line 60 def load_cdf( starting_version: 0, ending_version: nil, starting_timestamp: nil, ending_timestamp: nil, columns: nil ) @table.load_cdf( starting_version, ending_version, , , columns ) end |
#merge(source, predicate, source_alias: nil, target_alias: nil, error_on_type_mismatch: true, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/deltalake/table.rb', line 141 def merge( source, predicate, source_alias: nil, target_alias: nil, error_on_type_mismatch: true, writer_properties: nil, post_commithook_properties: nil, commit_properties: nil ) source = Utils.convert_data(source) rb_merge_builder = @table.create_merge_builder( source, predicate, source_alias, target_alias, !error_on_type_mismatch, writer_properties, post_commithook_properties, commit_properties ) TableMerger.new(rb_merge_builder, @table) end |
#optimize ⇒ Object
133 134 135 |
# File 'lib/deltalake/table.rb', line 133 def optimize TableOptimizer.new(self) end |
#partitions ⇒ Object
31 32 33 34 35 36 37 38 |
# File 'lib/deltalake/table.rb', line 31 def partitions partitions = [] @table.get_active_partitions.each do |partition| next unless partition partitions << partition.to_h end partitions end |
#protocol ⇒ Object
88 89 90 |
# File 'lib/deltalake/table.rb', line 88 def protocol ProtocolVersions.new(*@table.protocol_versions) end |
#repair(dry_run: false, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/deltalake/table.rb', line 238 def repair( dry_run: false, post_commithook_properties: nil, commit_properties: nil ) metrics = @table.repair( dry_run, commit_properties, post_commithook_properties ) deserialized_metrics = JSON.parse(metrics) deserialized_metrics[FSCK_METRICS_FILES_REMOVED_LABEL] = JSON.parse( deserialized_metrics[FSCK_METRICS_FILES_REMOVED_LABEL] ) deserialized_metrics.transform_keys(&:to_sym) end |
#restore(target, ignore_missing_files: false, protocol_downgrade_allowed: false, commit_properties: nil) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/deltalake/table.rb', line 167 def restore( target, ignore_missing_files: false, protocol_downgrade_allowed: false, commit_properties: nil ) if target.is_a?(Time) metrics = @table.restore( target.utc.iso8601(9), ignore_missing_files, protocol_downgrade_allowed, commit_properties ) else metrics = @table.restore( target, ignore_missing_files, protocol_downgrade_allowed, commit_properties ) end JSON.parse(metrics) end |
#schema ⇒ Object
80 81 82 |
# File 'lib/deltalake/table.rb', line 80 def schema @table.schema end |
#table_uri ⇒ Object
76 77 78 |
# File 'lib/deltalake/table.rb', line 76 def table_uri @table.table_uri end |
#to_polars(eager: true, rechunk: false, columns: nil) ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/deltalake/table.rb', line 193 def to_polars(eager: true, rechunk: false, columns: nil) require "polars-df" sources = file_uris if sources.empty? lf = Polars::LazyFrame.new else delta_keys = [ "AWS_S3_ALLOW_UNSAFE_RENAME", "AWS_S3_LOCKING_PROVIDER", "CONDITIONAL_PUT", "DELTA_DYNAMO_TABLE_NAME" ] = @storage_options&.reject { |k, _| delta_keys.include?(k.to_s.upcase) } lf = Polars.scan_parquet(sources, storage_options: , rechunk: rechunk) if columns # by_name requires polars-df > 0.15.0 lf = lf.select(Polars.cs.by_name(*columns)) end end eager ? lf.collect : lf end |
#transaction_versions ⇒ Object
256 257 258 |
# File 'lib/deltalake/table.rb', line 256 def transaction_versions @table.transaction_versions end |
#update_incremental ⇒ Object
218 219 220 |
# File 'lib/deltalake/table.rb', line 218 def update_incremental @table.update_incremental end |
#vacuum(retention_hours: nil, dry_run: true, enforce_retention_duration: true, post_commithook_properties: nil, commit_properties: nil) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/deltalake/table.rb', line 111 def vacuum( retention_hours: nil, dry_run: true, enforce_retention_duration: true, post_commithook_properties: nil, commit_properties: nil ) if retention_hours if retention_hours < 0 raise ArgumentError, "The retention periods should be positive." end end @table.vacuum( dry_run, retention_hours, enforce_retention_duration, commit_properties, post_commithook_properties ) end |
#version ⇒ Object
27 28 29 |
# File 'lib/deltalake/table.rb', line 27 def version @table.version end |