Class: Charrington::Insert

Inherits:
Object
  • Object
show all
Includes:
LogStash::Util::Loggable, Service
Defined in:
lib/logstash/outputs/charrington/insert.rb

Overview

This service assumes that the data is already clean and in a flattened hash format. The Transform service should be called before calling this.

Constant Summary collapse

REDSHIFT_TRACKS_COLUMNS =

TODO: create a current_table_columns (alter_postgres_table.rb) query on the tracks table to get the current columns

%w[id action app_name received_at uuid uuid_ts anonymous_id context_ip context_library_name context_library_version context_page_path context_page_referrer context_page_title context_page_url context_user_agent event event_text original_timestamp sent_at timestamp user_id user_uid context_campaign_medium context_campaign_name context_page_search context_campaign_source segment_dedupe_id context_campaign_content].freeze
POSTGRES_TRACKS_COLUMNS =
%w[anonymous_user app_name event published_at session_ip session_library_name session_library_version session_page_path session_page_referrer session_page_search session_page_title session_page_url session_user_agent user_id user_uid].freeze
TIMESTAMP_COLUMNS =
%w[published_at sent_at original_timestamp received_at timestamp].freeze
Error =
Class.new(StandardError)
EventNil =
Class.new(Error)
TableNameNil =
Class.new(Error)
InsertFailed =
Class.new(Error)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Service

included

Constructor Details

#initialize(connection, event, opts = {}) ⇒ Insert

Returns a new instance of Insert.

Raises:



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/logstash/outputs/charrington/insert.rb', line 29

def initialize(connection, event, opts = {})
  raise EventNil, 'Table name is nil' if event.nil?

  @transformer = opts[:transformer]
  @event = event.to_hash
  @tracks = create_tracks(@event)
  event_name = event['event'].to_s.strip
  raise TableNameNil, 'Table name is nil' if event_name.empty?

  @connection = connection
  @schema = opts[:schema].empty? ? '' : "#{opts[:schema]}."

  @table_name = underscore(event_name)

  @columns = event.keys.map { |x| underscore(x) }
  @should_retry = false
  @enable_event_as_json_keyword = opts[:enable_event_as_json_keyword]
  @event_as_json_keyword = opts[:event_as_json_keyword]
  @driver = opts[:driver]
  @opts = opts
end

Instance Attribute Details

#columnsObject (readonly)

Returns the value of attribute columns.



17
18
19
# File 'lib/logstash/outputs/charrington/insert.rb', line 17

def columns
  @columns
end

#connectionObject (readonly)

Returns the value of attribute connection.



17
18
19
# File 'lib/logstash/outputs/charrington/insert.rb', line 17

def connection
  @connection
end

#driverObject (readonly)

Returns the value of attribute driver.



17
18
19
# File 'lib/logstash/outputs/charrington/insert.rb', line 17

def driver
  @driver
end

#enable_event_as_json_keywordObject (readonly)

Returns the value of attribute enable_event_as_json_keyword.



17
18
19
# File 'lib/logstash/outputs/charrington/insert.rb', line 17

def enable_event_as_json_keyword
  @enable_event_as_json_keyword
end

#eventObject

Returns the value of attribute event.



16
17
18
# File 'lib/logstash/outputs/charrington/insert.rb', line 16

def event
  @event
end

#event_as_json_keywordObject (readonly)

Returns the value of attribute event_as_json_keyword.



17
18
19
# File 'lib/logstash/outputs/charrington/insert.rb', line 17

def event_as_json_keyword
  @event_as_json_keyword
end

#optsObject (readonly)

Returns the value of attribute opts.



17
18
19
# File 'lib/logstash/outputs/charrington/insert.rb', line 17

def opts
  @opts
end

#schemaObject (readonly)

Returns the value of attribute schema.



17
18
19
# File 'lib/logstash/outputs/charrington/insert.rb', line 17

def schema
  @schema
end

#should_retryObject

Returns the value of attribute should_retry.



16
17
18
# File 'lib/logstash/outputs/charrington/insert.rb', line 16

def should_retry
  @should_retry
end

#table_nameObject (readonly)

Returns the value of attribute table_name.



17
18
19
# File 'lib/logstash/outputs/charrington/insert.rb', line 17

def table_name
  @table_name
end

#tracksObject (readonly)

Returns the value of attribute tracks.



17
18
19
# File 'lib/logstash/outputs/charrington/insert.rb', line 17

def tracks
  @tracks
end

#transformerObject (readonly)

Returns the value of attribute transformer.



17
18
19
# File 'lib/logstash/outputs/charrington/insert.rb', line 17

def transformer
  @transformer
end

Instance Method Details

#callObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/logstash/outputs/charrington/insert.rb', line 51

def call
  logger.info "Attempting insert into table name: #{table_name}"
  insert_sql = insert_event_statement
  insert_stmt = connection.prepareStatement(insert_sql)
  logger.info "Insert statement passed into prepareStatement is: #{insert_stmt}"
  insert_stmt = add_statement_event_params(insert_stmt, event)
  logger.info "Insert statement to be run is: #{insert_stmt.toString}"
  insert_stmt.execute

  logger.info 'Attempting insert into tracks table'
  do_tracks_insert

  should_retry
rescue Java::JavaSql::SQLException => e
  case e.getSQLState
  when '42P01'
    logger.info 'Received Java::JavaSql::SQLException with error sql state of 42P01, moving to create table'
    should_retry = create_table
  when '42703'
    logger.info 'Received Java::JavaSql::SQLException with error sql state of 42703, moving to alter table'
    should_retry = alter_table
  else
    raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}"
  end
  should_retry
rescue StandardError => e
  raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}"
ensure
  insert_stmt&.close
  cleanup
end

#do_tracks_insertObject



83
84
85
86
87
88
89
90
91
92
93
# File 'lib/logstash/outputs/charrington/insert.rb', line 83

def do_tracks_insert
  tracks_sql = insert_tracks_statement
  tracks_stmt = connection.prepareStatement(tracks_sql)
  tracks_stmt = add_statement_event_params(tracks_stmt, tracks)
  logger.info "Insert tracks statment to be run: #{tracks_stmt.toString}"
  tracks_stmt.execute
rescue Java::JavaSql::SQLException => e
  logger.error("SQLException (Charrington:Insert) Insert tracks entry failed. #{e.message} #{tracks_sql}")
ensure
  tracks_stmt&.close
end