Module: KinesisSupervisor

Included in:
FluentPluginKinesis::InputFilter
Defined in:
lib/fluent/plugin/thread_supervisor.rb

Instance Method Summary collapse

Instance Method Details

#get_shard_idsObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/thread_supervisor.rb', line 48

def get_shard_ids()
  active_shard_ids = []
  shards = @client.describe_stream(stream_name: @stream_name).stream_description.shards
  shards.each do |shard|
    if @describe_shard & !@describe_use_shards.include?(shard.shard_id)
      next
    end
    
    unless @dead_thread.include?(shard.shard_id)
      active_shard_ids << shard.shard_id
    end
  end
  
  active_shard_ids
rescue => e
  $log.error "get_shard_ids : #{e.message}"
end

#supervisor_threadObject



5
6
7
8
9
10
11
# File 'lib/fluent/plugin/thread_supervisor.rb', line 5

def supervisor_thread()
  until @stop_flag do
    active_shard_ids = get_shard_ids()
    update_maping(active_shard_ids)
    sleep(@load_shard_interval)
  end
end

#thread_kill(shard_id) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/fluent/plugin/thread_supervisor.rb', line 40

def thread_kill(shard_id)
  $log.info "Thread killing => shard : #{shard_id}"
  @map[shard_id].join
  @dead_thread << shard_id
  @thread_stop_map.delete(shard_id)
  @map.delete(shard_id)
end

#update_maping(active_shard_ids) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/thread_supervisor.rb', line 13

def update_maping(active_shard_ids)
  active_shard_ids.each do |shard_id|
    if @map.has_key?(shard_id)
      if @map[shard_id].status.nil?
        $log.error "Thread dead => shard : #{shard_id}"
        thread_kill(shard_id)
      elsif @thread_stop_map[shard_id]
        thread_kill(shard_id)
      else
        next
      end
    else
      @thread_stop_map[shard_id] = false
      t = Thread.new(shard_id, &method(:load_records_thread))
      @map[shard_id] = t
    end
  end
  
  map_shard_ids = @map.keys
  map_shard_ids.each do |map_shard_id|
    unless active_shard_ids.include?(map_shard_id)
      @thread_stop_map[shard_id] = true
      thread_kill(map_shard_id)
    end
  end
end