Class: ETL

Inherits:
Object
  • Object
show all
Includes:
ActiveSupport::Callbacks, TeguGears
Defined in:
lib/etl/etl.rb

Overview

State machine with useful callbacks for getting data (Extract, Transform, and Loading data) with some support for re-trying failed stages of the process. Raise errors liberally if things go wrong, the data is being staged and the process can usually be restarted once the issue has been addressed.

Direct Known Subclasses

CSV::ET, XML::ET

Constant Summary collapse

VALID_STATES =
[:before_extract, :extract, :after_extract, :before_transform, :transform, :after_transform, :before_load, :load, :after_load, :complete].freeze
VALID_CALLBACKS =
[:before_extract, :after_extract, :before_transform, :after_transform, :before_load, :after_load, :complete].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeETL

Returns a new instance of ETL.



110
111
112
# File 'lib/etl/etl.rb', line 110

def initialize
  @state = :before_extract
end

Instance Attribute Details

#blockObject (readonly)

An optional block to process with



135
136
137
# File 'lib/etl/etl.rb', line 135

def block
  @block
end

#dataObject (readonly)

The data being worked on, after it has successfully completed an extract, transform, or load process.



119
120
121
# File 'lib/etl/etl.rb', line 119

def data
  @data
end

#optionsObject (readonly)

The options to process with. All your code will have access to these options, so things like:

:filename => ‘…’, :destination => ‘…’, :converters => :all

would all be useful. Your extract, transform, and load methods plus your callbacks can then extract out the information they need to get the job done.



132
133
134
# File 'lib/etl/etl.rb', line 132

def options
  @options
end

#rawObject (readonly)

The data generated on a process that didn’t complete.



122
123
124
# File 'lib/etl/etl.rb', line 122

def raw
  @raw
end

#stateObject (readonly)

The state of the transform process



115
116
117
# File 'lib/etl/etl.rb', line 115

def state
  @state
end

Class Method Details

.callObject



27
28
29
30
31
# File 'lib/etl/etl.rb', line 27

def process(options={}, &block)
  etl = new
  etl.process(options, &block)
  etl
end

.loggerObject

Sets up a logger for the class. Respects inheritance, so a different logger will be created for each ETL subclass. Using the standard log levels here: DEBUG < INFO < WARN < ERROR < FATAL



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/etl/etl.rb', line 32

def logger
  
  logger_name = (self.to_s + "_logger").to_sym
  
  # Find and return the cached logger, if it's setup
  logger = read_inheritable_attribute(logger_name)
  return logger if logger
  
  # Create a logger.  Will configure it here and save it in a moment.
  logger = Log4r::Logger.new(self.to_s)
  
  # Set my default output format
  format = Log4r::PatternFormatter.new(:pattern => "[%l] %d :: %m")
  
  # Setup a console logger with our formatting
  console = Log4r::StderrOutputter.new 'console'
  console.level = Log4r::WARN
  console.formatter = format
  
  # Setup a logger to a file with our formatting
  logfile = Log4r::FileOutputter.new('logfile', 
                           :filename => self.logger_filename, 
                           :trunc => false,
                           :level => Log4r::DEBUG)
  logfile.formatter = format
  
  # Tell the logger about both outputs.
  logger.add('console','logfile')
  
  # Store the logger as an inheritable class attribute
  write_inheritable_attribute(logger_name, logger)
  
  # Return the logger
  logger
end

.logger_filenameObject



102
103
104
# File 'lib/etl/etl.rb', line 102

def logger_filename
  File.join(self.logger_root, "#{self.to_s}.log")
end

.logger_rootObject

First tries to get the cached @@logger_root Second, sets the global @@logger_root unless it is cached. Sets it to the best possible place to locate the logs: 1) where log will be from RAILS_ROOT/vendor/gems/etl 2) where log will be in a Rails model 3) where log will be in a Rails lib 4) in the local directory where ETL is being subclassed Third, uses the subclasses stored logger_root, ignoring all the rest if this is found.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/etl/etl.rb', line 77

def logger_root
  @@logger_root ||= case
  when File.exist?(File.dirname(__FILE__) + "/../../../../../log")
    File.expand_path(File.dirname(__FILE__) + "/../../../../../log")
  when File.exist?(File.dirname(__FILE__) + "/../../log")
    File.expand_path(File.dirname(__FILE__) + '/../../log')
  when File.exist?(File.dirname(__FILE__) + "/../log")
    File.expand_path(File.dirname(__FILE__) + '/../log')
  when File.exist?(File.dirname(__FILE__) + "/log")
    File.expand_path(File.dirname(__FILE__) + '/log')
  else
    File.expand_path(File.dirname(__FILE__))
  end
  logger_root = read_inheritable_attribute(:logger_root) || @@logger_root
end

.logger_root=(value) ⇒ Object

Sets the logger root for the subclass, and sets it globally if this is set on ETL. So, ETL.logger_root = “some location” sets the logger root for all subclasses. This is useful if a lot of ETL is being done, and it needs to be logged in a non-standard place.



97
98
99
100
# File 'lib/etl/etl.rb', line 97

def logger_root=(value)
  write_inheritable_attribute(:logger_root, value)
  @@logger_root = value if self == ETL
end

.process(options = {}, &block) ⇒ Object



22
23
24
25
26
# File 'lib/etl/etl.rb', line 22

def process(options={}, &block)
  etl = new
  etl.process(options, &block)
  etl
end

Instance Method Details

#process(options = {}, &block) ⇒ Object

Working towards a universal workflow driver here. The signature is just a hash and a block. That should work for about anything.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/etl/etl.rb', line 139

def process(options={}, &block)
  # Only setup the options the first time, the other times we are re-
  # starting the process. 
  @options = options unless @options
  @block = block

  self.class.logger.info "Processing #{self.class.to_s}"
  self.class.logger.info "To re-run this process, run: #{self.show_command}"
  self.class.logger.info "Note: Also pass the same block to #{self.class.to_s}" if block

  etl_callback(:before_extract)

  if @state == :extract
    extract 
    @state = :after_extract
  end

  etl_callback(:after_extract)

  # To be sure this is after all after_extract callbacks
  process_raw_data
  
  etl_callback(:before_transform)

  if @state == :transform
    transform
    @state = :after_transform
  end

  etl_callback(:after_transform)
  
  # To be sure this is after all after_tranform callbacks
  process_raw_data
  
  etl_callback(:before_load)

  if @state == :load
    load
    @state = :after_load
  end

  etl_callback(:after_load)
  @state
end

#reverse_to(state) ⇒ Object

Raises:

  • (ArgumentError)


184
185
186
187
188
189
190
# File 'lib/etl/etl.rb', line 184

def reverse_to(state)
  raise ArgumentError, "State must be one of #{VALID_STATES.inspect}" unless VALID_STATES.include?(state)
  loc = VALID_STATES.index(state)
  possible_states = VALID_STATES[0..loc]
  raise "Cannot reverse to a state that hasn't been acheived yet." unless possible_states.include?(state)
  @state = state
end