Class: Charrington::Insert
- Inherits:
-
Object
- Object
- Charrington::Insert
- Includes:
- LogStash::Util::Loggable, Service
- Defined in:
- lib/logstash/outputs/charrington/insert.rb
Overview
This service assumes that the data is already clean and in a flattened hash format. The Transform service should be called before calling this.
Constant Summary collapse
- REDSHIFT_TRACKS_COLUMNS =
TODO: create a current_table_columns (alter_postgres_table.rb) query on the tracks table to get the current columns
%w[id action app_name received_at uuid uuid_ts anonymous_id context_ip context_library_name context_library_version context_page_path context_page_referrer context_page_title context_page_url context_user_agent event event_text original_timestamp sent_at timestamp user_id user_uid context_campaign_medium context_campaign_name context_page_search context_campaign_source segment_dedupe_id context_campaign_content].freeze
- POSTGRES_TRACKS_COLUMNS =
%w[anonymous_user app_name event published_at session_ip session_library_name session_library_version session_page_path session_page_referrer session_page_search session_page_title session_page_url session_user_agent user_id user_uid].freeze
- TIMESTAMP_COLUMNS =
%w[published_at sent_at original_timestamp received_at timestamp].freeze
- Error =
Class.new(StandardError)
- EventNil =
Class.new(Error)
- TableNameNil =
Class.new(Error)
- InsertFailed =
Class.new(Error)
Instance Attribute Summary collapse
-
#columns ⇒ Object
readonly
Returns the value of attribute columns.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#driver ⇒ Object
readonly
Returns the value of attribute driver.
-
#enable_event_as_json_keyword ⇒ Object
readonly
Returns the value of attribute enable_event_as_json_keyword.
-
#event ⇒ Object
Returns the value of attribute event.
-
#event_as_json_keyword ⇒ Object
readonly
Returns the value of attribute event_as_json_keyword.
-
#opts ⇒ Object
readonly
Returns the value of attribute opts.
-
#schema ⇒ Object
readonly
Returns the value of attribute schema.
-
#should_retry ⇒ Object
Returns the value of attribute should_retry.
-
#table_name ⇒ Object
readonly
Returns the value of attribute table_name.
-
#tracks ⇒ Object
readonly
Returns the value of attribute tracks.
-
#transformer ⇒ Object
readonly
Returns the value of attribute transformer.
Instance Method Summary collapse
- #call ⇒ Object
- #do_tracks_insert ⇒ Object
-
#initialize(connection, event, opts = {}) ⇒ Insert
constructor
A new instance of Insert.
Methods included from Service
Constructor Details
#initialize(connection, event, opts = {}) ⇒ Insert
Returns a new instance of Insert.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 29 def initialize(connection, event, opts = {}) raise EventNil, 'Table name is nil' if event.nil? @transformer = opts[:transformer] @event = event.to_hash @tracks = create_tracks(@event) event_name = event['event'].to_s.strip raise TableNameNil, 'Table name is nil' if event_name.empty? @connection = connection @schema = opts[:schema].empty? ? '' : "#{opts[:schema]}." @table_name = underscore(event_name) @columns = event.keys.map { |x| underscore(x) } @should_retry = false @enable_event_as_json_keyword = opts[:enable_event_as_json_keyword] @event_as_json_keyword = opts[:event_as_json_keyword] @driver = opts[:driver] @opts = opts end |
Instance Attribute Details
#columns ⇒ Object (readonly)
Returns the value of attribute columns.
17 18 19 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 17 def columns @columns end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
17 18 19 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 17 def connection @connection end |
#driver ⇒ Object (readonly)
Returns the value of attribute driver.
17 18 19 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 17 def driver @driver end |
#enable_event_as_json_keyword ⇒ Object (readonly)
Returns the value of attribute enable_event_as_json_keyword.
17 18 19 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 17 def enable_event_as_json_keyword @enable_event_as_json_keyword end |
#event ⇒ Object
Returns the value of attribute event.
16 17 18 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 16 def event @event end |
#event_as_json_keyword ⇒ Object (readonly)
Returns the value of attribute event_as_json_keyword.
17 18 19 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 17 def event_as_json_keyword @event_as_json_keyword end |
#opts ⇒ Object (readonly)
Returns the value of attribute opts.
17 18 19 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 17 def opts @opts end |
#schema ⇒ Object (readonly)
Returns the value of attribute schema.
17 18 19 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 17 def schema @schema end |
#should_retry ⇒ Object
Returns the value of attribute should_retry.
16 17 18 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 16 def should_retry @should_retry end |
#table_name ⇒ Object (readonly)
Returns the value of attribute table_name.
17 18 19 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 17 def table_name @table_name end |
#tracks ⇒ Object (readonly)
Returns the value of attribute tracks.
17 18 19 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 17 def tracks @tracks end |
#transformer ⇒ Object (readonly)
Returns the value of attribute transformer.
17 18 19 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 17 def transformer @transformer end |
Instance Method Details
#call ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 51 def call logger.info "Attempting insert into table name: #{table_name}" insert_sql = insert_event_statement insert_stmt = connection.prepareStatement(insert_sql) logger.info "Insert statement passed into prepareStatement is: #{insert_stmt}" insert_stmt = add_statement_event_params(insert_stmt, event) logger.info "Insert statement to be run is: #{insert_stmt.toString}" insert_stmt.execute logger.info 'Attempting insert into tracks table' do_tracks_insert should_retry rescue Java::JavaSql::SQLException => e case e.getSQLState when '42P01' logger.info 'Received Java::JavaSql::SQLException with error sql state of 42P01, moving to create table' should_retry = create_table when '42703' logger.info 'Received Java::JavaSql::SQLException with error sql state of 42703, moving to alter table' should_retry = alter_table else raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}" end should_retry rescue StandardError => e raise InsertFailed, "SQLException (Charrington:Insert) #{e.message} #{insert_sql}" ensure insert_stmt&.close cleanup end |
#do_tracks_insert ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/logstash/outputs/charrington/insert.rb', line 83 def do_tracks_insert tracks_sql = insert_tracks_statement tracks_stmt = connection.prepareStatement(tracks_sql) tracks_stmt = add_statement_event_params(tracks_stmt, tracks) logger.info "Insert tracks statment to be run: #{tracks_stmt.toString}" tracks_stmt.execute rescue Java::JavaSql::SQLException => e logger.error("SQLException (Charrington:Insert) Insert tracks entry failed. #{e.message} #{tracks_sql}") ensure tracks_stmt&.close end |