Class: Charrington::Process

Inherits:
Object
  • Object
show all
Includes:
LogStash::Util::Loggable, Service
Defined in:
lib/logstash/outputs/charrington/process.rb

Overview

This service starts the process of attempting to insert a row. It handles retries where applicable.

Constant Summary collapse

Error =
Class.new(StandardError)
ProcessFailed =
Class.new(Error)
EventNil =
Class.new(Error)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Service

included

Constructor Details

#initialize(connection, event, opts = {}) ⇒ Process

Returns a new instance of Process.

Raises:



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/logstash/outputs/charrington/process.rb', line 18

def initialize(connection, event, opts = {})
  raise EventNil, 'Event is nil' if event.nil?

  @connection = connection
  @event = event.to_hash
  @opts = opts

  @max_retries = opts[:max_retries] || 10
  @retry_max_interval = opts[:retry_max_interval] || 2
  @retry_interval = opts[:retry_initial_interval] || 2
  @driver = opts[:driver]
  @transformer = opts[:transformer]

  @attempts = 1
  @should_retry = true
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



11
12
13
# File 'lib/logstash/outputs/charrington/process.rb', line 11

def connection
  @connection
end

#driverObject (readonly)

Returns the value of attribute driver.



11
12
13
# File 'lib/logstash/outputs/charrington/process.rb', line 11

def driver
  @driver
end

#eventObject (readonly)

Returns the value of attribute event.



11
12
13
# File 'lib/logstash/outputs/charrington/process.rb', line 11

def event
  @event
end

#max_retriesObject (readonly)

Returns the value of attribute max_retries.



11
12
13
# File 'lib/logstash/outputs/charrington/process.rb', line 11

def max_retries
  @max_retries
end

#optsObject (readonly)

Returns the value of attribute opts.



11
12
13
# File 'lib/logstash/outputs/charrington/process.rb', line 11

def opts
  @opts
end

#retry_intervalObject

Returns the value of attribute retry_interval.



12
13
14
# File 'lib/logstash/outputs/charrington/process.rb', line 12

def retry_interval
  @retry_interval
end

#retry_max_intervalObject (readonly)

Returns the value of attribute retry_max_interval.



11
12
13
# File 'lib/logstash/outputs/charrington/process.rb', line 11

def retry_max_interval
  @retry_max_interval
end

#schemaObject (readonly)

Returns the value of attribute schema.



11
12
13
# File 'lib/logstash/outputs/charrington/process.rb', line 11

def schema
  @schema
end

#should_retryObject

Returns the value of attribute should_retry.



12
13
14
# File 'lib/logstash/outputs/charrington/process.rb', line 12

def should_retry
  @should_retry
end

#transformerObject (readonly)

Returns the value of attribute transformer.



11
12
13
# File 'lib/logstash/outputs/charrington/process.rb', line 11

def transformer
  @transformer
end

Instance Method Details

#callObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/logstash/outputs/charrington/process.rb', line 35

def call
  while should_retry
    logger.info "Found transformer of #{transformer} for driver of #{driver} with event of: #{event}"
    transformed =
      case transformer
      when 'redshift'
        Charrington::TransformRedshift.call(event)
      else
        Charrington::TransformPostgres.call(event)
      end
    logger.info "Transformed event into: #{transformed}"
    should_retry = Charrington::Insert.call(connection, transformed, opts)
    break unless should_retry

    @attempts += 1
    break if @attempts > max_retries

    # If we're retrying the action, sleep for the recommended interval
    # Double the interval for the next time through to achieve exponential backoff
    sleep_interval
  end
rescue StandardError => e
  raise ProcessFailed, e.message
ensure
  connection&.close
  @event.clear if clearable(@event)
end