Module: KinesisSupervisor
- Included in:
- FluentPluginKinesis::InputFilter
- Defined in:
- lib/fluent/plugin/thread_supervisor.rb
Instance Method Summary collapse
- #get_shard_ids ⇒ Object
- #supervisor_thread ⇒ Object
- #thread_kill(shard_id) ⇒ Object
- #update_maping(active_shard_ids) ⇒ Object
Instance Method Details
#get_shard_ids ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/thread_supervisor.rb', line 48 def get_shard_ids() active_shard_ids = [] shards = @client.describe_stream(stream_name: @stream_name).stream_description.shards shards.each do |shard| if @describe_shard & !@describe_use_shards.include?(shard.shard_id) next end unless @dead_thread.include?(shard.shard_id) active_shard_ids << shard.shard_id end end active_shard_ids rescue => e $log.error "get_shard_ids : #{e.}" end |
#supervisor_thread ⇒ Object
5 6 7 8 9 10 11 |
# File 'lib/fluent/plugin/thread_supervisor.rb', line 5 def supervisor_thread() until @stop_flag do active_shard_ids = get_shard_ids() update_maping(active_shard_ids) sleep(@load_shard_interval) end end |
#thread_kill(shard_id) ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/thread_supervisor.rb', line 40 def thread_kill(shard_id) $log.info "Thread killing => shard : #{shard_id}" @map[shard_id].join @dead_thread << shard_id @thread_stop_map.delete(shard_id) @map.delete(shard_id) end |
#update_maping(active_shard_ids) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/fluent/plugin/thread_supervisor.rb', line 13 def update_maping(active_shard_ids) active_shard_ids.each do |shard_id| if @map.has_key?(shard_id) if @map[shard_id].status.nil? $log.error "Thread dead => shard : #{shard_id}" thread_kill(shard_id) elsif @thread_stop_map[shard_id] thread_kill(shard_id) else next end else @thread_stop_map[shard_id] = false t = Thread.new(shard_id, &method(:load_records_thread)) @map[shard_id] = t end end map_shard_ids = @map.keys map_shard_ids.each do |map_shard_id| unless active_shard_ids.include?(map_shard_id) @thread_stop_map[shard_id] = true thread_kill(map_shard_id) end end end |