11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/bigshift/big_query/table.rb', line 11
def load(uri, options={})
poll_interval = options[:poll_interval] || DEFAULT_POLL_INTERVAL
load_configuration = {}
load_configuration[:source_uris] = [uri]
load_configuration[:write_disposition] = options[:allow_overwrite] ? 'WRITE_TRUNCATE' : 'WRITE_EMPTY'
load_configuration[:create_disposition] = 'CREATE_IF_NEEDED'
load_configuration[:schema] = options[:schema] if options[:schema]
load_configuration[:source_format] = 'CSV'
load_configuration[:field_delimiter] = '\t'
load_configuration[:quote] = '"'
load_configuration[:destination_table] = @table_data.table_reference
load_configuration[:max_bad_records] = options[:max_bad_records] if options[:max_bad_records]
job = Google::Apis::BigqueryV2::Job.new(
configuration: Google::Apis::BigqueryV2::JobConfiguration.new(
load: Google::Apis::BigqueryV2::JobConfigurationLoad.new(load_configuration)
)
)
job = @big_query_service.insert_job(@table_data.table_reference.project_id, job)
@logger.info(sprintf('Loading rows from %s to the table %s.%s', uri, @table_data.table_reference.dataset_id, @table_data.table_reference.table_id))
started = false
loop do
job = @big_query_service.get_job(@table_data.table_reference.project_id, job.job_reference.job_id)
if job.status && job.status.state == 'DONE'
if job.status.errors.nil? || job.status.errors.empty?
break
else
job.status.errors.each do |error|
message = %<Load error: "#{error.message}">
if error.location
file, line, field = error.location.split('/').map { |s| s.split(':').last.strip }
message << " at file #{file}, line #{line}"
message << ", field #{field}" if field
end
@logger.debug(message)
end
raise job.status.error_result.message
end
else
state = job.status && job.status.state
if state == 'RUNNING' && !started
@logger.info('Loading started')
started = true
else
@logger.debug(sprintf('Waiting for job %s (status: %s)', job.job_reference.job_id.inspect, state ? state.inspect : 'unknown'))
end
@thread.sleep(poll_interval)
end
end
report_complete(job)
nil
end
|