Class: ETL
- Inherits:
-
Object
- Object
- ETL
- Includes:
- ActiveSupport::Callbacks, TeguGears
- Defined in:
- lib/etl/etl.rb
Overview
State machine with useful callbacks for getting data (Extract, Transform, and Loading data) with some support for re-trying failed stages of the process. Raise errors liberally if things go wrong, the data is being staged and the process can usually be restarted once the issue has been addressed.
Direct Known Subclasses
Constant Summary collapse
- VALID_STATES =
[:before_extract, :extract, :after_extract, :before_transform, :transform, :after_transform, :before_load, :load, :after_load, :complete].freeze
- VALID_CALLBACKS =
[:before_extract, :after_extract, :before_transform, :after_transform, :before_load, :after_load, :complete].freeze
Instance Attribute Summary collapse
-
#block ⇒ Object
readonly
An optional block to process with.
-
#data ⇒ Object
readonly
The data being worked on, after it has successfully completed an extract, transform, or load process.
-
#options ⇒ Object
readonly
The options to process with.
-
#raw ⇒ Object
readonly
The data generated on a process that didn’t complete.
-
#state ⇒ Object
readonly
The state of the transform process.
Class Method Summary collapse
- .call ⇒ Object
-
.logger ⇒ Object
Sets up a logger for the class.
- .logger_filename ⇒ Object
-
.logger_root ⇒ Object
First tries to get the cached @@logger_root Second, sets the global @@logger_root unless it is cached.
-
.logger_root=(value) ⇒ Object
Sets the logger root for the subclass, and sets it globally if this is set on ETL.
- .process(options = {}, &block) ⇒ Object
Instance Method Summary collapse
-
#initialize ⇒ ETL
constructor
A new instance of ETL.
-
#process(options = {}, &block) ⇒ Object
Working towards a universal workflow driver here.
- #reverse_to(state) ⇒ Object
Constructor Details
#initialize ⇒ ETL
Returns a new instance of ETL.
109 110 111 |
# File 'lib/etl/etl.rb', line 109 def initialize @state = :before_extract end |
Instance Attribute Details
#block ⇒ Object (readonly)
An optional block to process with
134 135 136 |
# File 'lib/etl/etl.rb', line 134 def block @block end |
#data ⇒ Object (readonly)
The data being worked on, after it has successfully completed an extract, transform, or load process.
118 119 120 |
# File 'lib/etl/etl.rb', line 118 def data @data end |
#options ⇒ Object (readonly)
The options to process with. All your code will have access to these options, so things like:
:filename => ‘…’, :destination => ‘…’, :converters => :all
would all be useful. Your extract, transform, and load methods plus your callbacks can then extract out the information they need to get the job done.
131 132 133 |
# File 'lib/etl/etl.rb', line 131 def end |
#raw ⇒ Object (readonly)
The data generated on a process that didn’t complete.
121 122 123 |
# File 'lib/etl/etl.rb', line 121 def raw @raw end |
#state ⇒ Object (readonly)
The state of the transform process
114 115 116 |
# File 'lib/etl/etl.rb', line 114 def state @state end |
Class Method Details
.call ⇒ Object
27 28 29 30 31 |
# File 'lib/etl/etl.rb', line 27 def process(={}, &block) etl = new etl.process(, &block) etl end |
.logger ⇒ Object
Sets up a logger for the class. Respects inheritance, so a different logger will be created for each ETL subclass. Using the standard log levels here: DEBUG < INFO < WARN < ERROR < FATAL
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/etl/etl.rb', line 32 def logger logger_name = (self.to_s + "_logger").to_sym # Find and return the cached logger, if it's setup logger = read_inheritable_attribute(logger_name) return logger if logger # Create a logger. Will configure it here and save it in a moment. logger = Log4r::Logger.new(self.to_s) # Set my default output format format = Log4r::PatternFormatter.new(:pattern => "[%l] %d :: %m") # Setup a console logger with our formatting console = Log4r::StderrOutputter.new 'console' console.level = Log4r::WARN console.formatter = format # Setup a logger to a file with our formatting logfile = Log4r::FileOutputter.new('logfile', :filename => File.join(self.logger_root, self.logger_filename), :trunc => false, :level => Log4r::DEBUG) logfile.formatter = format # Tell the logger about both outputs. logger.add('console','logfile') # Store the logger as an inheritable class attribute write_inheritable_attribute(logger_name, logger) # Return the logger logger end |
.logger_filename ⇒ Object
101 102 103 |
# File 'lib/etl/etl.rb', line 101 def logger_filename File.join(self.logger_root, "#{self.to_s}.log") end |
.logger_root ⇒ Object
First tries to get the cached @@logger_root Second, sets the global @@logger_root unless it is cached. Sets it to the best possible place to locate the logs: 1) where log will be from RAILS_ROOT/vendor/gems/etl 2) where log will be in a Rails model 3) where log will be in a Rails lib 4) in the local directory where ETL is being subclassed Third, uses the subclasses stored logger_root, ignoring all the rest if this is found.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/etl/etl.rb', line 76 def logger_root @@logger_root ||= case when File.exist?(File.dirname(__FILE__) + "/../../../../../log") File.(File.dirname(__FILE__) + "/../../../../../log") when File.exist?(File.dirname(__FILE__) + "/../../log") File.(File.dirname(__FILE__) + '/../../log') when File.exist?(File.dirname(__FILE__) + "/../log") File.(File.dirname(__FILE__) + '/../log') when File.exist?(File.dirname(__FILE__) + "/log") File.(File.dirname(__FILE__) + '/log') else File.('.') end logger_root = read_inheritable_attribute(:logger_root) || @@logger_root end |
.logger_root=(value) ⇒ Object
Sets the logger root for the subclass, and sets it globally if this is set on ETL. So, ETL.logger_root = “some location” sets the logger root for all subclasses. This is useful if a lot of ETL is being done, and it needs to be logged in a non-standard place.
96 97 98 99 |
# File 'lib/etl/etl.rb', line 96 def logger_root=(value) write_inheritable_attribute(:logger_root, value) @@logger_root = value if self == ETL end |
.process(options = {}, &block) ⇒ Object
22 23 24 25 26 |
# File 'lib/etl/etl.rb', line 22 def process(={}, &block) etl = new etl.process(, &block) etl end |
Instance Method Details
#process(options = {}, &block) ⇒ Object
Working towards a universal workflow driver here. The signature is just a hash and a block. That should work for about anything.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/etl/etl.rb', line 138 def process(={}, &block) # Only setup the options the first time, the other times we are re- # starting the process. = unless @block = block self.class.logger.info "Processing #{self.class.to_s}" self.class.logger.info "To re-run this process, run: #{self.show_command}" self.class.logger.info "Note: Also pass the same block to #{self.class.to_s}" if block etl_callback(:before_extract) if @state == :extract extract @state = :after_extract end etl_callback(:after_extract) # To be sure this is after all after_extract callbacks process_raw_data etl_callback(:before_transform) if @state == :transform transform @state = :after_transform end etl_callback(:after_transform) # To be sure this is after all after_tranform callbacks process_raw_data etl_callback(:before_load) if @state == :load load @state = :after_load end etl_callback(:after_load) @state end |
#reverse_to(state) ⇒ Object
183 184 185 186 187 188 189 |
# File 'lib/etl/etl.rb', line 183 def reverse_to(state) raise ArgumentError, "State must be one of #{VALID_STATES.inspect}" unless VALID_STATES.include?(state) loc = VALID_STATES.index(state) possible_states = VALID_STATES[0..loc] raise "Cannot reverse to a state that hasn't been acheived yet." unless possible_states.include?(state) @state = state end |