Class: Dataflow::Nodes::DataNode
- Inherits:
-
Object
- Object
- Dataflow::Nodes::DataNode
- Includes:
- EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
- Defined in:
- lib/dataflow/nodes/data_node.rb
Overview
Data nodes are used to build a data computing/transformation graph. At each step we can save the results to a (temp) table.
Nodes::DataNode represents one of the data nodes. It is meant to be treated as an interface and should not be used directly.
Direct Known Subclasses
Constant Summary
Constants included from SchemaMixin
SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR
Instance Method Summary collapse
-
#add(records:) ⇒ Object
Adds the given records to the dataset and updates the updated_at time.
-
#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, include_system_id: false) {|db_client| ... } ⇒ Object
Returns all the records from a dataset that match the options.
-
#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash
Supports paginating efficiently through the dataset.
-
#clear(where: {}) ⇒ Object
Clear the data that matches the options.
-
#count(where: {}) ⇒ Integer
Counts how many records matches the condition or all if no condition is given.
-
#create_non_unique_indexes(dataset_type: :read) ⇒ Object
Applies non-unique indexes on the dataset.
-
#create_unique_indexes(dataset_type: :read) ⇒ Object
Applies unique indexes on the dataset.
- #db_indexes(write_dataset: false) ⇒ Object
- #drop_dataset! ⇒ Object
-
#dump_dataset(base_folder: './dump') ⇒ String
Dump a backup of this dataset to a file.
- #explain_update(depth: 0, verbose: false) ⇒ Object
- #export(connection_opts: { db_backend: :csv }, keys: [], where: {}) ⇒ Object
-
#find(where: {}) ⇒ Hash
Finds and return from the dataset, based on the given options.
-
#handle_dataset_settings_changed ⇒ Object
When the dataset properties changed notify the adapter to handle the new settings.
- #import(connection_opts: {}, keys: nil) ⇒ Object
-
#info(write_dataset: false) ⇒ Object
retrieves some informations about this node and its usage.
-
#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object
Return a list of order (ASC) system IDs.
- #read_dataset_name ⇒ Object
-
#read_dataset_name=(dataset) ⇒ Object
Use to select from which dataset you want to read.
-
#recreate_dataset(dataset_type: :read) ⇒ Object
Recreates a dataset.
- #required_by ⇒ Object
-
#restore_dataset(filepath:) ⇒ Object
Restore a dump of this dataset.
-
#safely_clear_write_dataset ⇒ Object
this is not safe if there is some parallel processing going on.
-
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
- #swap_read_write_datasets! ⇒ Object
-
#update_schema(sch) ⇒ Object
Update this node’s schema.
- #updated? ⇒ Boolean
- #use_symbols? ⇒ Boolean
- #write_dataset_name ⇒ Object
Methods included from SchemaMixin
#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer
Methods included from Dataflow::Node
#all_dependencies, find, #metadata, #recompute, #valid_for_computation?, #validate!
Instance Method Details
#add(records:) ⇒ Object
Adds the given records to the dataset and updates the updated_at time.
187 188 189 190 191 192 193 194 |
# File 'lib/dataflow/nodes/data_node.rb', line 187 def add(records:) raise ArgumentError, "records must be an array of documents. Received: '#{records.class}'." unless records.is_a?(Array) records = records.compact return if records.blank? db_adapter.save(records: records) self.updated_at = Time.now save! end |
#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, include_system_id: false) {|db_client| ... } ⇒ Object
Returns all the records from a dataset that match the options.
147 148 149 |
# File 'lib/dataflow/nodes/data_node.rb', line 147 def all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, include_system_id: false, &block) db_adapter.all(where: where, fields: fields, sort: sort, limit: limit, offset: offset, include_system_id: include_system_id, &block) end |
#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash
Supports paginating efficiently through the dataset.
164 165 166 |
# File 'lib/dataflow/nodes/data_node.rb', line 164 def all_paginated(where: {}, fields: [], cursor: nil) db_adapter.all_paginated(where: where, fields: fields, cursor: cursor) end |
#clear(where: {}) ⇒ Object
Clear the data that matches the options.
197 198 199 |
# File 'lib/dataflow/nodes/data_node.rb', line 197 def clear(where: {}) db_adapter.delete(where: where) end |
#count(where: {}) ⇒ Integer
Counts how many records matches the condition or all if no condition is given.
181 182 183 |
# File 'lib/dataflow/nodes/data_node.rb', line 181 def count(where: {}) db_adapter.count(where: where) end |
#create_non_unique_indexes(dataset_type: :read) ⇒ Object
Applies non-unique indexes on the dataset. For performance reasons, these indexes are best applied after adding data (especially on large import operations).
229 230 231 232 |
# File 'lib/dataflow/nodes/data_node.rb', line 229 def create_non_unique_indexes(dataset_type: :read) dataset = send("#{dataset_type}_dataset_name") db_adapter.create_indexes(dataset: dataset, type: :non_unique_only) end |
#create_unique_indexes(dataset_type: :read) ⇒ Object
Applies unique indexes on the dataset. As this will be enforcing constraints, it is best applied before adding any data.
221 222 223 224 |
# File 'lib/dataflow/nodes/data_node.rb', line 221 def create_unique_indexes(dataset_type: :read) dataset = send("#{dataset_type}_dataset_name") db_adapter.create_indexes(dataset: dataset, type: :unique_only) end |
#db_indexes(write_dataset: false) ⇒ Object
307 308 309 310 |
# File 'lib/dataflow/nodes/data_node.rb', line 307 def db_indexes(write_dataset: false) dataset = write_dataset ? write_dataset_name : read_dataset_name db_adapter.retrieve_dataset_indexes(dataset) end |
#drop_dataset! ⇒ Object
343 344 345 346 347 |
# File 'lib/dataflow/nodes/data_node.rb', line 343 def drop_dataset! db_adapter.drop_dataset(write_dataset_name) return unless use_double_buffering db_adapter.drop_dataset(read_dataset_name) end |
#dump_dataset(base_folder: './dump') ⇒ String
Dump a backup of this dataset to a file.
352 353 354 355 356 357 |
# File 'lib/dataflow/nodes/data_node.rb', line 352 def dump_dataset(base_folder: './dump') read_idx = 0 read_idx = read_dataset_idx if use_double_buffering db_adapter.dump(base_folder: base_folder, read_dataset_idx: read_idx) end |
#explain_update(depth: 0, verbose: false) ⇒ Object
320 321 322 |
# File 'lib/dataflow/nodes/data_node.rb', line 320 def explain_update(depth: 0, verbose: false) logger.log("#{'>' * (depth + 1)} #{name} [Dataset] | UPDATED = #{updated_at}") end |
#export(connection_opts: { db_backend: :csv }, keys: [], where: {}) ⇒ Object
278 279 280 281 282 283 284 285 286 287 |
# File 'lib/dataflow/nodes/data_node.rb', line 278 def export(connection_opts: { db_backend: :csv }, keys: [], where: {}) on_export_started(connection_opts: connection_opts, keys: keys) # instanciate and export without saving anything Export::ToCsvNode.new( dependency_ids: [self], query: where.to_json, keys: keys ).compute_impl on_export_finished end |
#find(where: {}) ⇒ Hash
Finds and return from the dataset, based on the given options.
127 128 129 |
# File 'lib/dataflow/nodes/data_node.rb', line 127 def find(where: {}) db_adapter.find(where: where) end |
#handle_dataset_settings_changed ⇒ Object
When the dataset properties changed notify the adapter to handle the new settings.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/dataflow/nodes/data_node.rb', line 107 def handle_dataset_settings_changed db_adapter.update_settings(data_node: self) # if we're using double buffering, just wait for the next buffer # to be created to apply the changes. return if use_double_buffering # recreate the dataset if there is no data if db_adapter.count.zero? db_adapter.recreate_dataset(dataset: read_dataset_name) end db_adapter.create_indexes(dataset: read_dataset_name) end |
#import(connection_opts: {}, keys: nil) ⇒ Object
272 273 274 275 276 |
# File 'lib/dataflow/nodes/data_node.rb', line 272 def import(connection_opts: {}, keys: nil) importer = db_adapter(connection_opts) records = importer.all add(records: records) end |
#info(write_dataset: false) ⇒ Object
retrieves some informations about this node and its usage
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/dataflow/nodes/data_node.rb', line 290 def info(write_dataset: false) dataset = write_dataset ? write_dataset_name : read_dataset_name usage = db_adapter.usage(dataset: dataset) { name: name, type: self.class.to_s, dataset: dataset, db_backend: db_backend, updated_at: updated_at, record_count: count, indexes: indexes, db_indexes: db_indexes(write_dataset: write_dataset), mem_usage: usage[:memory], storage_usage: usage[:storage] } end |
#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object
Return a list of order (ASC) system IDs. These can be used to process the dataset in parallel by querying on a sub-section: queries = node.ordered_system_id_queries Parallel.each(queries) do |query|
process(node.all(where: query))
end
175 176 177 |
# File 'lib/dataflow/nodes/data_node.rb', line 175 def ordered_system_id_queries(batch_size:, where: {}) db_adapter.ordered_system_id_queries(batch_size: batch_size, where: {}) end |
#read_dataset_name ⇒ Object
234 235 236 237 238 239 240 241 242 |
# File 'lib/dataflow/nodes/data_node.rb', line 234 def read_dataset_name return @temporary_read_dataset if @temporary_read_dataset if use_double_buffering "#{name}_#{double_buffer_prefix}#{read_dataset_idx}" else name end end |
#read_dataset_name=(dataset) ⇒ Object
Use to select from which dataset you want to read. A possible use case is to read from an old dataset name.
256 257 258 259 260 261 |
# File 'lib/dataflow/nodes/data_node.rb', line 256 def read_dataset_name=(dataset) return unless valid_dataset_names.include?(dataset) @temporary_read_dataset = dataset db_adapter.update_settings(data_node: self) dataset end |
#recreate_dataset(dataset_type: :read) ⇒ Object
Recreates a dataset.
210 211 212 213 214 |
# File 'lib/dataflow/nodes/data_node.rb', line 210 def recreate_dataset(dataset_type: :read) # fetch the proper dataset name dataset = send("#{dataset_type}_dataset_name") db_adapter.recreate_dataset(dataset: dataset) end |
#required_by ⇒ Object
324 325 326 327 328 |
# File 'lib/dataflow/nodes/data_node.rb', line 324 def required_by super + Dataflow::Nodes::ComputeNode.where(data_node_id: _id).map do |node| { node: node, type: 'dataset' } end end |
#restore_dataset(filepath:) ⇒ Object
Restore a dump of this dataset
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 |
# File 'lib/dataflow/nodes/data_node.rb', line 362 def restore_dataset(filepath:) filename = filepath.split('/')[-1] read_idx = if filename.count('.') < 2 # for compatibility reasons: previously we were not # exporting the read idx in the filename 0 else filename.split('.')[1].to_i end raise "Called #restore_dataset with incompatible datasets settings: #{filepath} contains a single buffer dataset but node '#{name}' is expecting a double buffered one." if read_idx == 0 && use_double_buffering raise "Called #restore_dataset with incompatible datasets settings: #{filepath} contains a double buffer dataset but node '#{name}' is expecting a single buffered one." if read_idx > 0 && !use_double_buffering if use_double_buffering dataset_name = dataset_name_for_buffer(read_idx) else dataset_name = name end db_adapter.restore(filepath: filepath, dataset_name: dataset_name) self.read_dataset_idx = read_idx save db_adapter.update_settings(data_node: self) true end |
#safely_clear_write_dataset ⇒ Object
this is not safe if there is some parallel processing going on
331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/dataflow/nodes/data_node.rb', line 331 def safely_clear_write_dataset # we can only clear the write dataset if we're using double buffering return unless use_double_buffering # check if there is any node that is currently computing to this dataset used_by = required_by.select { |x| x[:type] == 'dataset' && x[:node].locked_for_computing? } return if used_by.present? logger.log("Dropping #{db_name}.#{write_dataset_name} on #{db_backend}.") # TODO: lock the node? db_adapter.drop_dataset(write_dataset_name) end |
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/dataflow/nodes/data_node.rb', line 82 def set_defaults self.schema = schema || {} # Use the schema as the inferred schema if none is provided. # This useful when there is no need to infer schemas (e.g. in SQL) self.inferred_schema ||= schema # This is needed for the flow to compute properly self.updated_at = Time.now end |
#swap_read_write_datasets! ⇒ Object
263 264 265 266 267 268 269 270 |
# File 'lib/dataflow/nodes/data_node.rb', line 263 def swap_read_write_datasets! raise Dataflow::Errors::InvalidConfigurationError, '#swap_read_write_dataset_names! called on "#{self.name}" but "use_double_buffering" is not activated.' unless use_double_buffering tmp = read_dataset_idx self.read_dataset_idx = write_dataset_idx self.write_dataset_idx = tmp db_adapter.update_settings(data_node: self) save! end |
#update_schema(sch) ⇒ Object
Update this node’s schema.
202 203 204 205 |
# File 'lib/dataflow/nodes/data_node.rb', line 202 def update_schema(sch) self.schema = sch db_adapter.update_settings(data_node: self) end |
#updated? ⇒ Boolean
316 317 318 |
# File 'lib/dataflow/nodes/data_node.rb', line 316 def updated? true end |
#use_symbols? ⇒ Boolean
312 313 314 |
# File 'lib/dataflow/nodes/data_node.rb', line 312 def use_symbols? (db_backend.to_s =~ /sql/).present? end |
#write_dataset_name ⇒ Object
244 245 246 247 248 249 250 |
# File 'lib/dataflow/nodes/data_node.rb', line 244 def write_dataset_name if use_double_buffering "#{name}_#{double_buffer_prefix}#{write_dataset_idx}" else name end end |