Class: KinesisShard::StateStore

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/kinesis_shard.rb

Instance Method Summary collapse

Constructor Details

#initialize(dir_path, shard_id) ⇒ StateStore

Returns a new instance of StateStore.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/fluent/plugin/kinesis_shard.rb', line 91

def initialize(dir_path, shard_id)
  
  unless Dir.exist?(dir_path)
    begin
      FileUtils.mkdir_p(dir_path)
    rescue => e
      raise "does not make a directory : #{e.message}"
    end
  end
  @path = "#{dir_path}/last_recode_#{shard_id}.json"
  
  if File.exists?(@path)
    begin
      load_json_file
    rescue => e
      $log.warn "load_json_file: #{e.message}"
    end
  end
  
  if @data.nil?
    @data = {'last_sequence_number' => ''}
  end
  
  unless @data.is_a?(Hash)
    raise "state_file on #{@path.inspect} is invalid"
  end
end

Instance Method Details

#load_json_fileObject



119
120
121
122
123
# File 'lib/fluent/plugin/kinesis_shard.rb', line 119

def load_json_file()
  open(@path) do |io|
    @data =Yajl.load(io)
  end
end

#load_sequence_numberObject



125
126
127
# File 'lib/fluent/plugin/kinesis_shard.rb', line 125

def load_sequence_number
  @data['last_sequence_number']
end

#update(sequence_number) ⇒ Object



129
130
131
132
133
134
# File 'lib/fluent/plugin/kinesis_shard.rb', line 129

def update(sequence_number)
  @data['last_sequence_number'] = sequence_number
  open(@path, "w") do |io|
    Yajl.dump(@data, io)
  end
end