Class: Fluent::FeedlyInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_feedly.rb

Defined Under Namespace

Classes: StateStore

Instance Method Summary collapse

Constructor Details

#initializeFeedlyInput

Returns a new instance of FeedlyInput.



22
23
24
25
26
27
# File 'lib/fluent/plugin/in_feedly.rb', line 22

def initialize
  require 'feedlr'
  require 'digest/sha2'

  super
end

Instance Method Details

#configure(conf) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/in_feedly.rb', line 29

def configure(conf)
  super

  if not @fetch_count >= 20 && @fetch_count <= 10000
    raise Fluent::ConfigError, "Feedly: fetch_count param (#{@fetch_count}) should be between 20 and 10000."
  end

  @client = Feedlr::Client.new(
    oauth_access_token: @access_token,
    sandbox: @enable_sandbox
  )
end

#fetchObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/in_feedly.rb', line 70

def fetch
  @profile_id ||= @client..id
  @state_store ||= StateStore.new(@state_file)
  @subscribe_categories.each do |category_name|
    category_id = "user/#{@profile_id}/category/#{category_name}"
    fetch_time_range = get_fetch_time_range
    loop do
      request_option = { count: @fetch_count, continuation: get_continuation_id, newerThan: fetch_time_range }
      cursor = @client.stream_entries_contents(category_id, request_option)
      if cursor.items.nil?
        return raise Feedlr::Error::ServerError, cursor
      end
      cursor.items.each do |item|
        Engine.emit(@tag, Engine.now, item)
      end
      log.info "Feedly: fetched articles.", articles: cursor.items.size, request_option: request_option
      set_continuation_id(cursor.continuation)
      break if get_continuation_id.nil?
    end
  end
end

#get_continuation_idObject



114
115
116
117
118
119
120
121
# File 'lib/fluent/plugin/in_feedly.rb', line 114

def get_continuation_id
  record = @state_store.get('continuation')
  if subscribe_categories_hash == record[:subscribe_categories_hash]
    return record[:id]
  else
    return nil
  end
end

#get_fetch_time_rangeObject



92
93
94
95
96
97
98
99
100
# File 'lib/fluent/plugin/in_feedly.rb', line 92

def get_fetch_time_range
  if @initial_loop
    @initial_loop = false
    range = @fetch_time_range_on_startup
  else
    range = @fetch_time_range
  end
  return (Time.now.to_i - range ) * 1000
end

#runObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/in_feedly.rb', line 50

def run
  @initial_loop = true
  loop do
    begin
      fetch
    rescue Feedlr::Error::Unauthorized, Feedlr::Error::Forbidden => e
      log.error "Feedly: unrecoverable error has occoured.", error: e.message, error_class: e.class
      log.error_backtrace e.backtrace
      break
    rescue => e
      log.error "Feedly: error has occoured. trying to retry after #{@run_interval} seconds.", error: e.message, error_class: e.class
      log.error_backtrace e.backtrace
      sleep @run_interval
      retry
    end
    sleep @run_interval
  end
  log.error "Feedly: stopped fetching process due to the previous error."
end

#set_continuation_id(continuation_id) ⇒ Object



106
107
108
109
110
111
112
# File 'lib/fluent/plugin/in_feedly.rb', line 106

def set_continuation_id(continuation_id)
  @state_store.set("continuation", {
    id: continuation_id,
    subscribe_categories_hash: subscribe_categories_hash
  })
  @state_store.update!
end

#shutdownObject



46
47
48
# File 'lib/fluent/plugin/in_feedly.rb', line 46

def shutdown
  Thread.kill(@thread)
end

#startObject



42
43
44
# File 'lib/fluent/plugin/in_feedly.rb', line 42

def start
  @thread = Thread.new(&method(:run))
end

#subscribe_categories_hashObject



102
103
104
# File 'lib/fluent/plugin/in_feedly.rb', line 102

def subscribe_categories_hash
  Digest::SHA512.digest(@subscribe_categories.sort.join(''))
end