Class: FluentPluginKinesis::InputFilter
- Inherits:
-
Fluent::Input
- Object
- Fluent::Input
- FluentPluginKinesis::InputFilter
show all
- Includes:
- Fluent::DetachMultiProcessMixin, KinesisShard, KinesisSupervisor
- Defined in:
- lib/fluent/plugin/in_kinesis.rb
Constant Summary
collapse
- USER_AGENT_NAME =
'fluent-plugin-kinesis-input-filter'
Instance Method Summary
collapse
#emit_records, #get_shard_iterator_info, #load_records_thread, #sequence
#get_shard_ids, #supervisor_thread, #thread_kill, #update_maping
Instance Method Details
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
# File 'lib/fluent/plugin/in_kinesis.rb', line 52
def configure(conf)
super
unless @state_dir_path
$log.warn "'state_dir_path PATH' parameter is not set to a 'kinesis' source."
$log.warn "this parameter is highly recommended to save the last rows to resume tailing."
end
@parser = Fluent::Plugin.new_parser(conf['format'])
@parser.configure(conf)
@map = {} @thread_stop_map = {} @dead_thread=[] end
|
#load_client ⇒ Object
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/fluent/plugin/in_kinesis.rb', line 80
def load_client
user_agent_suffix = "#{USER_AGENT_NAME}/#{FluentPluginKinesis::VERSION}"
options = {
user_agent_suffix: user_agent_suffix
}
if @region
options[:region] = @region
end
if @aws_key_id && @aws_sec_key
options.update(
access_key_id: @aws_key_id,
secret_access_key: @aws_sec_key,
)
elsif @profile
credentials_opts = {:profile_name => @profile}
credentials_opts[:path] = @credentials_path if @credentials_path
credentials = Aws::SharedCredentials.new(credentials_opts)
options[:credentials] = credentials
end
@client = Aws::Kinesis::Client.new(options)
end
|
#shutdown ⇒ Object
76
77
78
|
# File 'lib/fluent/plugin/in_kinesis.rb', line 76
def shutdown
@stop_flag = true
end
|
#start ⇒ Object
67
68
69
70
71
72
73
74
|
# File 'lib/fluent/plugin/in_kinesis.rb', line 67
def start
detach_multi_process do
super
@stop_flag = false
load_client
Thread.new(&method(:supervisor_thread))
end
end
|