Class: KinesisShard::StateStore
- Inherits:
-
Object
- Object
- KinesisShard::StateStore
- Defined in:
- lib/fluent/plugin/kinesis_shard.rb
Instance Method Summary collapse
-
#initialize(dir_path, shard_id) ⇒ StateStore
constructor
A new instance of StateStore.
- #load_json_file ⇒ Object
- #load_sequence_number ⇒ Object
- #update(sequence_number) ⇒ Object
Constructor Details
#initialize(dir_path, shard_id) ⇒ StateStore
Returns a new instance of StateStore.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/fluent/plugin/kinesis_shard.rb', line 91 def initialize(dir_path, shard_id) unless Dir.exist?(dir_path) begin FileUtils.mkdir_p(dir_path) rescue => e raise "does not make a directory : #{e.}" end end @path = "#{dir_path}/last_recode_#{shard_id}.json" if File.exists?(@path) begin load_json_file rescue => e $log.warn "load_json_file: #{e.}" end end if @data.nil? @data = {'last_sequence_number' => ''} end unless @data.is_a?(Hash) raise "state_file on #{@path.inspect} is invalid" end end |
Instance Method Details
#load_json_file ⇒ Object
119 120 121 122 123 |
# File 'lib/fluent/plugin/kinesis_shard.rb', line 119 def load_json_file() open(@path) do |io| @data =Yajl.load(io) end end |
#load_sequence_number ⇒ Object
125 126 127 |
# File 'lib/fluent/plugin/kinesis_shard.rb', line 125 def load_sequence_number @data['last_sequence_number'] end |
#update(sequence_number) ⇒ Object
129 130 131 132 133 134 |
# File 'lib/fluent/plugin/kinesis_shard.rb', line 129 def update(sequence_number) @data['last_sequence_number'] = sequence_number open(@path, "w") do |io| Yajl.dump(@data, io) end end |