Class: Apollo::Planner::SmartPlanner
- Inherits:
-
BasePlanner
- Object
- BasePlanner
- Apollo::Planner::SmartPlanner
- Defined in:
- lib/apollo_crawler/planner/smart_planner.rb
Instance Attribute Summary collapse
-
#amqp ⇒ Object
Returns the value of attribute amqp.
-
#declarations ⇒ Object
Returns the value of attribute declarations.
-
#mongo ⇒ Object
Returns the value of attribute mongo.
Instance Method Summary collapse
- #fetch_queued_urls(opts = {}) ⇒ Object
- #fetch_url(url, opts = {}) ⇒ Object
- #get_next_url(opts = {}) ⇒ Object
- #get_url_count(state, opts = {}) ⇒ Object
-
#initialize(amqp = nil, mongo = nil, opts = {}) ⇒ SmartPlanner
constructor
A new instance of SmartPlanner.
- #run(opts = {}) ⇒ Object
Constructor Details
#initialize(amqp = nil, mongo = nil, opts = {}) ⇒ SmartPlanner
Returns a new instance of SmartPlanner.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 38 def initialize(amqp=nil, mongo=nil, opts={}) self.amqp = amqp self.mongo = mongo # Declarations channel = amqp.create_channel # channel.prefetch(1) self.declarations = Apollo::Agent.declare_entities(channel, opts) # Bindings declarations[:queues]["planner.fetched.queue"].bind(declarations[:exchanges]["planner.fetched"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) request = msg['request'] response = msg['response'] doc = Apollo::Model::QueuedUrl.find(request["_id"]) doc.update_attributes(msg['request']) doc.state = :fetched doc.save doc = Apollo::Model::RawDocument.where(:url => request['url']).first if doc if doc.sha_hash != response['sha_hash'] puts "Removing old cached version of '#{request['url']}'" if opts[:verbose] doc.destroy doc = nil else puts "Using cached version of '#{request['url']}'" if opts[:verbose] end else doc = Apollo::Model::RawDocument.where(:sha_hash => response['sha_hash']).first if(doc.nil? == false) puts "Same as #{doc.inspect}" end end if(doc.nil?) doc = Apollo::Model::RawDocument.new(response).save # Publish declarations[:exchanges]["crawler"].publish(msg.to_json, :reply_to => "planner.crawled") end end declarations[:queues]["planner.domained.queue"].bind(declarations[:exchanges]["planner.domained"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) puts "DOMAINED !!!" end declarations[:queues]["planner.crawled.queue"].bind(declarations[:exchanges]["planner.crawled"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) request = msg['request'] response = msg['response'] data = msg['data'] links = msg['links'] links = [] if links.nil? links.each do |url| link = url['link'] Apollo::Scheduler::BaseScheduler::schedule(link, request['crawler_name']) end # puts JSON.pretty_generate(data) # puts JSON.pretty_generate(links) end end |
Instance Attribute Details
#amqp ⇒ Object
Returns the value of attribute amqp.
34 35 36 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 34 def amqp @amqp end |
#declarations ⇒ Object
Returns the value of attribute declarations.
36 37 38 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 36 def declarations @declarations end |
#mongo ⇒ Object
Returns the value of attribute mongo.
35 36 37 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 35 def mongo @mongo end |
Instance Method Details
#fetch_queued_urls(opts = {}) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 128 def fetch_queued_urls(opts={}) fetching_count = Apollo::Model::QueuedUrl.where({:state => :fetching}).count if(fetching_count > 4) puts "Fetching too many URLs. Slowing down for a while ..." return end while get_url_count(:fetching) < 4 url = get_next_url(opts) puts "SmartPlanner::fetch_queued_urls() - Queueing: #{url.inspect}" fetch_url(url, opts) end end |
#fetch_url(url, opts = {}) ⇒ Object
114 115 116 117 118 119 120 121 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 114 def fetch_url(url, opts={}) if(opts[:verbose]) puts "AMQP fetching '#{url.inspect}'" end # Publish declarations[:exchanges]["fetcher"].publish(url.to_json, :reply_to => "planner.fetched") end |
#get_next_url(opts = {}) ⇒ Object
123 124 125 126 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 123 def get_next_url(opts={}) tmp = Apollo::Model::QueuedUrl.where({:state => :queued}).order_by(:created_at.asc) tmp.find_and_modify({ "$set" => { state: :fetching }}, new: true) end |
#get_url_count(state, opts = {}) ⇒ Object
110 111 112 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 110 def get_url_count(state, opts={}) Apollo::Model::QueuedUrl.where({:state => state}).count end |
#run(opts = {}) ⇒ Object
143 144 145 146 147 148 149 150 151 152 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 143 def run(opts={}) request_exit = false while request_exit == false fetch_queued_urls(opts) sleep 1 end return 0 end |