Class: BigShift::BigQuery::Table
- Inherits:
-
Object
- Object
- BigShift::BigQuery::Table
- Defined in:
- lib/bigshift/big_query/table.rb
Instance Method Summary collapse
-
#initialize(big_query_service, table_data, options = {}) ⇒ Table
constructor
A new instance of Table.
- #load(uri, options = {}) ⇒ Object
Constructor Details
permalink #initialize(big_query_service, table_data, options = {}) ⇒ Table
Returns a new instance of Table.
4 5 6 7 8 9 |
# File 'lib/bigshift/big_query/table.rb', line 4 def initialize(big_query_service, table_data, ={}) @big_query_service = big_query_service @table_data = table_data @logger = [:logger] || NullLogger::INSTANCE @thread = [:thread] || Kernel end |
Instance Method Details
permalink #load(uri, options = {}) ⇒ Object
[View source]
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/bigshift/big_query/table.rb', line 11 def load(uri, ={}) poll_interval = [:poll_interval] || DEFAULT_POLL_INTERVAL load_configuration = {} load_configuration[:source_uris] = [uri] load_configuration[:write_disposition] = [:allow_overwrite] ? 'WRITE_TRUNCATE' : 'WRITE_EMPTY' load_configuration[:create_disposition] = 'CREATE_IF_NEEDED' load_configuration[:schema] = [:schema] if [:schema] load_configuration[:source_format] = 'CSV' load_configuration[:field_delimiter] = '\t' load_configuration[:quote] = '"' load_configuration[:destination_table] = @table_data.table_reference load_configuration[:max_bad_records] = [:max_bad_records] if [:max_bad_records] job = Google::Apis::BigqueryV2::Job.new( configuration: Google::Apis::BigqueryV2::JobConfiguration.new( load: Google::Apis::BigqueryV2::JobConfigurationLoad.new(load_configuration) ) ) job = @big_query_service.insert_job(@table_data.table_reference.project_id, job) @logger.info(sprintf('Loading rows from %s to the table %s.%s', uri, @table_data.table_reference.dataset_id, @table_data.table_reference.table_id)) started = false loop do job = @big_query_service.get_job(@table_data.table_reference.project_id, job.job_reference.job_id) if job.status && job.status.state == 'DONE' if job.status.errors.nil? || job.status.errors.empty? break else job.status.errors.each do |error| = %<Load error: "#{error.}"> if error.location file, line, field = error.location.split('/').map { |s| s.split(':').last.strip } << " at file #{file}, line #{line}" << ", field #{field}" if field end @logger.debug() end raise job.status.error_result. end else state = job.status && job.status.state if state == 'RUNNING' && !started @logger.info('Loading started') started = true else @logger.debug(sprintf('Waiting for job %s (status: %s)', job.job_reference.job_id.inspect, state ? state.inspect : 'unknown')) end @thread.sleep(poll_interval) end end report_complete(job) nil end |