Class: FluentPluginKinesis::InputFilter

Inherits:
Fluent::Input
  • Object
show all
Includes:
Fluent::DetachMultiProcessMixin, KinesisShard, KinesisSupervisor
Defined in:
lib/fluent/plugin/in_kinesis.rb

Constant Summary collapse

USER_AGENT_NAME =
'fluent-plugin-kinesis-input-filter'

Instance Method Summary collapse

Methods included from KinesisShard

#emit_records, #get_shard_iterator_info, #load_records_thread, #sequence

Methods included from KinesisSupervisor

#get_shard_ids, #supervisor_thread, #thread_kill, #update_maping

Instance Method Details

#configure(conf) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/in_kinesis.rb', line 52

def configure(conf)
  super
  
  unless @state_dir_path
    $log.warn "'state_dir_path PATH' parameter is not set to a 'kinesis' source."
    $log.warn "this parameter is highly recommended to save the last rows to resume tailing."
  end
  @parser = Fluent::Plugin.new_parser(conf['format'])
  @parser.configure(conf)

  @map = {} #=> Thread Object management
  @thread_stop_map = {} #=> Thread stop flag management
  @dead_thread=[] #=> Dead Thread management
end

#load_clientObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin/in_kinesis.rb', line 80

def load_client
      
  options = {}
  
  if @region
    options[:region] = @region
  end

  if @aws_key_id && @aws_sec_key
    options.update(
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
    )
  elsif @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    options[:credentials] = credentials
  end
  
  @client = Aws::Kinesis::Client.new(options)
end

#shutdownObject



76
77
78
# File 'lib/fluent/plugin/in_kinesis.rb', line 76

def shutdown
  @stop_flag = true
end

#startObject



67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/in_kinesis.rb', line 67

def start
  detach_multi_process do
    super
    @stop_flag = false
    load_client
    Thread.new(&method(:supervisor_thread))
  end
end