Class: Fluent::FeedlyInput
- Inherits:
-
Input
- Object
- Input
- Fluent::FeedlyInput
- Defined in:
- lib/fluent/plugin/in_feedly.rb
Defined Under Namespace
Classes: StateStore
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #fetch ⇒ Object
- #get_continuation_id ⇒ Object
- #get_fetch_time_range ⇒ Object
-
#initialize ⇒ FeedlyInput
constructor
A new instance of FeedlyInput.
- #run ⇒ Object
- #set_continuation_id(continuation_id) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #subscribe_categories_hash ⇒ Object
Constructor Details
#initialize ⇒ FeedlyInput
Returns a new instance of FeedlyInput.
22 23 24 25 26 27 |
# File 'lib/fluent/plugin/in_feedly.rb', line 22 def initialize require 'feedlr' require 'digest/sha2' super end |
Instance Method Details
#configure(conf) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/fluent/plugin/in_feedly.rb', line 29 def configure(conf) super if not @fetch_count >= 20 && @fetch_count <= 10000 raise Fluent::ConfigError, "Feedly: fetch_count param (#{@fetch_count}) should be between 20 and 10000." end @client = Feedlr::Client.new( oauth_access_token: @access_token, sandbox: @enable_sandbox ) end |
#fetch ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/fluent/plugin/in_feedly.rb', line 70 def fetch @profile_id ||= @client.user_profile.id @state_store ||= StateStore.new(@state_file) @subscribe_categories.each do |category_name| category_id = "user/#{@profile_id}/category/#{category_name}" fetch_time_range = get_fetch_time_range loop do request_option = { count: @fetch_count, continuation: get_continuation_id, newerThan: fetch_time_range } cursor = @client.stream_entries_contents(category_id, request_option) if cursor.items.nil? return raise Feedlr::Error::ServerError, cursor end cursor.items.each do |item| Engine.emit(@tag, Engine.now, item) end log.info "Feedly: fetched articles.", articles: cursor.items.size, request_option: request_option set_continuation_id(cursor.continuation) break if get_continuation_id.nil? end end end |
#get_continuation_id ⇒ Object
114 115 116 117 118 119 120 121 |
# File 'lib/fluent/plugin/in_feedly.rb', line 114 def get_continuation_id record = @state_store.get('continuation') if subscribe_categories_hash == record[:subscribe_categories_hash] return record[:id] else return nil end end |
#get_fetch_time_range ⇒ Object
92 93 94 95 96 97 98 99 100 |
# File 'lib/fluent/plugin/in_feedly.rb', line 92 def get_fetch_time_range if @initial_loop @initial_loop = false range = @fetch_time_range_on_startup else range = @fetch_time_range end return (Time.now.to_i - range ) * 1000 end |
#run ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/in_feedly.rb', line 50 def run @initial_loop = true loop do begin fetch rescue Feedlr::Error::, Feedlr::Error::Forbidden => e log.error "Feedly: unrecoverable error has occoured.", error: e., error_class: e.class log.error_backtrace e.backtrace break rescue => e log.error "Feedly: error has occoured. trying to retry after #{@run_interval} seconds.", error: e., error_class: e.class log.error_backtrace e.backtrace sleep @run_interval retry end sleep @run_interval end log.error "Feedly: stopped fetching process due to the previous error." end |
#set_continuation_id(continuation_id) ⇒ Object
106 107 108 109 110 111 112 |
# File 'lib/fluent/plugin/in_feedly.rb', line 106 def set_continuation_id(continuation_id) @state_store.set("continuation", { id: continuation_id, subscribe_categories_hash: subscribe_categories_hash }) @state_store.update! end |
#shutdown ⇒ Object
46 47 48 |
# File 'lib/fluent/plugin/in_feedly.rb', line 46 def shutdown Thread.kill(@thread) end |
#start ⇒ Object
42 43 44 |
# File 'lib/fluent/plugin/in_feedly.rb', line 42 def start @thread = Thread.new(&method(:run)) end |
#subscribe_categories_hash ⇒ Object
102 103 104 |
# File 'lib/fluent/plugin/in_feedly.rb', line 102 def subscribe_categories_hash Digest::SHA512.digest(@subscribe_categories.sort.join('')) end |