Class: Dataflow::Adapters::CsvAdapter

Inherits:
Object
  • Object
show all
Includes:
SchemaMixin
Defined in:
lib/dataflow/adapters/csv_adapter.rb

Overview

Interface between a data node and csv. We use mongodb to perform all the store/retrieve operations.

Constant Summary

Constants included from SchemaMixin

SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from SchemaMixin

#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer

Constructor Details

#initialize(args) ⇒ CsvAdapter

Returns a new instance of CsvAdapter.

[View source]

13
14
15
16
17
# File 'lib/dataflow/adapters/csv_adapter.rb', line 13

def initialize(args)
  # make sure the CsvPath exist
  `mkdir -p #{Dataflow::CsvPath}`
  update_settings(args)
end

Instance Attribute Details

#settingsObject (readonly)

Returns the value of attribute settings.


11
12
13
# File 'lib/dataflow/adapters/csv_adapter.rb', line 11

def settings
  @settings
end

Instance Method Details

#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false) ⇒ Object

retrieve all elements from a data node

[View source]

34
35
36
37
38
# File 'lib/dataflow/adapters/csv_adapter.rb', line 34

def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false)
  SmarterCSV.process(file_path, strings_as_keys: true)
rescue Errno::ENOENT => e
  []
end

#count(where: {}) ⇒ Object

count the number of records

[View source]

41
42
43
# File 'lib/dataflow/adapters/csv_adapter.rb', line 41

def count(where: {})
  all(where: where).count
end

#create_indexesObject

[View source]

65
# File 'lib/dataflow/adapters/csv_adapter.rb', line 65

def create_indexes(*); end

#find(where: opts = {}) ⇒ Object

retrieve a single element from a data node

Raises:

  • (NotImplementedError)
[View source]

29
30
31
# File 'lib/dataflow/adapters/csv_adapter.rb', line 29

def find(where: opts = {})
  raise NotImplementedError, '#find is not yet support on CSV.'
end

#on_save_finishedObject

[View source]

50
51
52
# File 'lib/dataflow/adapters/csv_adapter.rb', line 50

def on_save_finished
  write_single_csv(keys: @schema.keys)
end

#recreate_dataset(dataset: nil) ⇒ Object

[View source]

58
59
60
61
62
63
# File 'lib/dataflow/adapters/csv_adapter.rb', line 58

def recreate_dataset(dataset: nil)
  # simply delete the file
  delete_file(file_path)
  # and any parts if any is still there
  file_parts.each { |part| delete_file(part) }
end

#remove(_opts = {}) ⇒ Object

Raises:

  • (NotImplementedError)
[View source]

54
55
56
# File 'lib/dataflow/adapters/csv_adapter.rb', line 54

def remove(_opts = {})
  raise NotImplementedError, '#find is not yet support on CSV.'
end

#save(records:, part: nil) ⇒ Object

save the given records

[View source]

46
47
48
# File 'lib/dataflow/adapters/csv_adapter.rb', line 46

def save(records:, part: nil)
  write_csv_part(records, keys: @schema.keys, part: part)
end

#set_schema(schema) ⇒ Object

[View source]

24
25
26
# File 'lib/dataflow/adapters/csv_adapter.rb', line 24

def set_schema(schema)
  @schema = schema
end

#update_settings(args) ⇒ Object

[View source]

19
20
21
22
# File 'lib/dataflow/adapters/csv_adapter.rb', line 19

def update_settings(args)
  @settings = Dataflow::Adapters::Settings.new(args)
  @schema = [] # TODO: pre-fetch the csv's schema
end