Class: Apollo::Planner::SmartPlanner
- Inherits:
-
BasePlanner
- Object
- BasePlanner
- Apollo::Planner::SmartPlanner
- Defined in:
- lib/apollo_crawler/planner/smart_planner.rb
Instance Attribute Summary collapse
-
#amqp ⇒ Object
Returns the value of attribute amqp.
-
#declarations ⇒ Object
Returns the value of attribute declarations.
-
#mongo ⇒ Object
Returns the value of attribute mongo.
Instance Method Summary collapse
- #fetch_queued_urls(opts = {}) ⇒ Object
- #fetch_url(url, opts = {}) ⇒ Object
- #get_next_url(opts = {}) ⇒ Object
- #get_url_count(state, opts = {}) ⇒ Object
-
#initialize(amqp = nil, mongo = nil, opts = {}) ⇒ SmartPlanner
constructor
A new instance of SmartPlanner.
- #run(opts = {}) ⇒ Object
Constructor Details
#initialize(amqp = nil, mongo = nil, opts = {}) ⇒ SmartPlanner
Returns a new instance of SmartPlanner.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 38 def initialize(amqp=nil, mongo=nil, opts={}) self.amqp = amqp self.mongo = mongo # Declarations channel = amqp.create_channel # channel.prefetch(1) self.declarations = Apollo::Agent.declare_entities(channel, opts) # Bindings declarations[:queues]["planner.fetched.queue"].bind(declarations[:exchanges]["planner.fetched"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) request = msg['request'] response = msg['response'] doc = Apollo::Model::QueuedUrl.find(request["_id"]) doc.update_attributes(msg['request']) doc.state = :fetched doc.save doc = Apollo::Model::RawDocument.where(:url => request['url']).first if doc if doc.sha_hash != response['sha_hash'] puts "Removing old cached version of '#{request['url']}'" if opts[:verbose] doc.destroy doc = nil else puts "Using cached version of '#{request['url']}'" if opts[:verbose] end else doc = Apollo::Model::RawDocument.where(:sha_hash => response['sha_hash']).first if(doc.nil? == false) puts "Same as #{doc.inspect}" end end if(doc.nil?) doc = Apollo::Model::RawDocument.new(response).save # Publish declarations[:exchanges]["crawler"].publish(msg.to_json, :reply_to => "planner.crawled") end end declarations[:queues]["planner.domained.queue"].bind(declarations[:exchanges]["planner.domained"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) puts "DOMAINED !!!" end declarations[:queues]["planner.crawled.queue"].bind(declarations[:exchanges]["planner.crawled"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) # puts "Crawled - #{msg.inspect}" request = msg['request'] response = msg['response'] data = msg['data'] links = msg['links'] links = [] if links.nil? links.each do |url| link = url['link'] Apollo::Scheduler::BaseScheduler::schedule(link, request['crawler_name']) end # puts JSON.pretty_generate(data) # puts JSON.pretty_generate(links) end end |
Instance Attribute Details
#amqp ⇒ Object
Returns the value of attribute amqp.
34 35 36 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 34 def amqp @amqp end |
#declarations ⇒ Object
Returns the value of attribute declarations.
36 37 38 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 36 def declarations @declarations end |
#mongo ⇒ Object
Returns the value of attribute mongo.
35 36 37 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 35 def mongo @mongo end |
Instance Method Details
#fetch_queued_urls(opts = {}) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 131 def fetch_queued_urls(opts={}) fetching_count = Apollo::Model::QueuedUrl.where({:state => :fetching}).count if(fetching_count > 4) puts "Fetching too many URLs. Slowing down for a while ..." return end while get_url_count(:fetching) < 4 url = get_next_url(opts) puts "SmartPlanner::fetch_queued_urls() - Queueing: #{url.inspect}" fetch_url(url, opts) end end |
#fetch_url(url, opts = {}) ⇒ Object
116 117 118 119 120 121 122 123 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 116 def fetch_url(url, opts={}) if(opts[:verbose]) puts "AMQP fetching '#{url.inspect}'" end # Publish declarations[:exchanges]["fetcher"].publish(url.to_json, :reply_to => "planner.fetched") end |
#get_next_url(opts = {}) ⇒ Object
125 126 127 128 129 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 125 def get_next_url(opts={}) tmp = Apollo::Model::QueuedUrl.where({:state => :queued}).order_by(:created_at.asc) res = tmp.find_and_modify({ "$set" => { state: :fetching }}, new: true) return res end |
#get_url_count(state, opts = {}) ⇒ Object
112 113 114 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 112 def get_url_count(state, opts={}) Apollo::Model::QueuedUrl.where({:state => state}).count end |
#run(opts = {}) ⇒ Object
146 147 148 149 150 151 152 153 154 155 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 146 def run(opts={}) request_exit = false while request_exit == false fetch_queued_urls(opts) sleep 1 end return 0 end |