Class: BigShift::BigQuery::Table

Inherits:
Object
  • Object
show all
Defined in:
lib/bigshift/big_query/table.rb

Instance Method Summary collapse

Constructor Details

#initialize(big_query_service, table_data, options = {}) ⇒ Table

Returns a new instance of Table.

[View source]

4
5
6
7
8
9
# File 'lib/bigshift/big_query/table.rb', line 4

def initialize(big_query_service, table_data, options={})
  @big_query_service = big_query_service
  @table_data = table_data
  @logger = options[:logger] || NullLogger::INSTANCE
  @thread = options[:thread] || Kernel
end

Instance Method Details

#load(uri, options = {}) ⇒ Object

[View source]

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/bigshift/big_query/table.rb', line 11

def load(uri, options={})
  poll_interval = options[:poll_interval] || DEFAULT_POLL_INTERVAL
  load_configuration = {}
  load_configuration[:source_uris] = [uri]
  load_configuration[:write_disposition] = options[:allow_overwrite] ? 'WRITE_TRUNCATE' : 'WRITE_EMPTY'
  load_configuration[:create_disposition] = 'CREATE_IF_NEEDED'
  load_configuration[:schema] = options[:schema] if options[:schema]
  load_configuration[:source_format] = 'CSV'
  load_configuration[:field_delimiter] = '\t'
  load_configuration[:quote] = '"'
  load_configuration[:destination_table] = @table_data.table_reference
  load_configuration[:max_bad_records] = options[:max_bad_records] if options[:max_bad_records]
  job = Google::Apis::BigqueryV2::Job.new(
    configuration: Google::Apis::BigqueryV2::JobConfiguration.new(
      load: Google::Apis::BigqueryV2::JobConfigurationLoad.new(load_configuration)
    )
  )
  job = @big_query_service.insert_job(@table_data.table_reference.project_id, job)
  @logger.info(sprintf('Loading rows from %s to the table %s.%s', uri, @table_data.table_reference.dataset_id, @table_data.table_reference.table_id))
  started = false
  loop do
    job = @big_query_service.get_job(@table_data.table_reference.project_id, job.job_reference.job_id)
    if job.status && job.status.state == 'DONE'
      if job.status.errors.nil? || job.status.errors.empty?
        break
      else
        job.status.errors.each do |error|
          message = %<Load error: "#{error.message}">
          if error.location
            file, line, field = error.location.split('/').map { |s| s.split(':').last.strip }
            message << " at file #{file}, line #{line}"
            message << ", field #{field}" if field
          end
          @logger.debug(message)
        end
        raise job.status.error_result.message
      end
    else
      state = job.status && job.status.state
      if state == 'RUNNING' && !started
        @logger.info('Loading started')
        started = true
      else
        @logger.debug(sprintf('Waiting for job %s (status: %s)', job.job_reference.job_id.inspect, state ? state.inspect : 'unknown'))
      end
      @thread.sleep(poll_interval)
    end
  end
  report_complete(job)
  nil
end