Class: Apollo::Planner::SmartPlanner

Inherits:
BasePlanner show all
Defined in:
lib/apollo_crawler/planner/smart_planner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(amqp = nil, mongo = nil, opts = {}) ⇒ SmartPlanner

Returns a new instance of SmartPlanner.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 38

def initialize(amqp=nil, mongo=nil, opts={})
	self.amqp = amqp
	self.mongo = mongo

	# Declarations
	channel = amqp.create_channel
	# channel.prefetch(1)
	self.declarations = Apollo::Agent.declare_entities(channel, opts)

	# Bindings
	declarations[:queues]["planner.fetched.queue"].bind(declarations[:exchanges]["planner.fetched"]).subscribe do |delivery_info, , payload|
		msg = JSON.parse(payload)

		request = msg['request']
		response = msg['response']

		doc = Apollo::Model::QueuedUrl.find(request["_id"])
		doc.update_attributes(msg['request'])
		doc.state = :fetched
		doc.save

		doc = Apollo::Model::RawDocument.where(:url => request['url']).first
		if doc
			if doc.sha_hash != response['sha_hash']
				puts "Removing old cached version of '#{request['url']}'" if opts[:verbose]
				
				doc.destroy
				doc = nil
			else
				puts "Using cached version of '#{request['url']}'" if opts[:verbose]
			end
		else
			doc = Apollo::Model::RawDocument.where(:sha_hash => response['sha_hash']).first
			if(doc.nil? == false)
				puts "Same as #{doc.inspect}"
			end
		end

		if(doc.nil?)
			doc = Apollo::Model::RawDocument.new(response).save

			# Publish
			declarations[:exchanges]["crawler"].publish(msg.to_json, :reply_to => "planner.crawled")
		end
	end

	declarations[:queues]["planner.domained.queue"].bind(declarations[:exchanges]["planner.domained"]).subscribe do |delivery_info, , payload|
		msg = JSON.parse(payload)

		puts "DOMAINED !!!"
	end

	declarations[:queues]["planner.crawled.queue"].bind(declarations[:exchanges]["planner.crawled"]).subscribe do |delivery_info, , payload|
		msg = JSON.parse(payload)

		# puts "Crawled - #{msg.inspect}"

		request = msg['request']
		response = msg['response']
		data = msg['data']
		links = msg['links']
		links = [] if links.nil?

		links.each do |url|
			link = url['link']
			
			Apollo::Scheduler::BaseScheduler::schedule(link, request['crawler_name'])
		end

		# puts JSON.pretty_generate(data)
		# puts JSON.pretty_generate(links)
	end
end

Instance Attribute Details

#amqpObject

Returns the value of attribute amqp.



34
35
36
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 34

def amqp
  @amqp
end

#declarationsObject

Returns the value of attribute declarations.



36
37
38
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 36

def declarations
  @declarations
end

#mongoObject

Returns the value of attribute mongo.



35
36
37
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 35

def mongo
  @mongo
end

Instance Method Details

#fetch_queued_urls(opts = {}) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 131

def fetch_queued_urls(opts={})
	fetching_count = Apollo::Model::QueuedUrl.where({:state => :fetching}).count
	
	if(fetching_count > 4)
		puts "Fetching too many URLs. Slowing down for a while ..."
		return
	end

	while get_url_count(:fetching) < 4
		url = get_next_url(opts)
		puts "SmartPlanner::fetch_queued_urls() - Queueing: #{url.inspect}"
		fetch_url(url, opts)
	end
end

#fetch_url(url, opts = {}) ⇒ Object



116
117
118
119
120
121
122
123
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 116

def fetch_url(url, opts={})
	if(opts[:verbose])
		puts "AMQP fetching '#{url.inspect}'"
	end

	# Publish
	declarations[:exchanges]["fetcher"].publish(url.to_json, :reply_to => "planner.fetched")
end

#get_next_url(opts = {}) ⇒ Object



125
126
127
128
129
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 125

def get_next_url(opts={})
	tmp = Apollo::Model::QueuedUrl.where({:state => :queued}).order_by(:created_at.asc)
	res = tmp.find_and_modify({ "$set" => { state: :fetching }}, new: true)
	return res
end

#get_url_count(state, opts = {}) ⇒ Object



112
113
114
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 112

def get_url_count(state, opts={})
	Apollo::Model::QueuedUrl.where({:state => state}).count
end

#run(opts = {}) ⇒ Object



146
147
148
149
150
151
152
153
154
155
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 146

def run(opts={})
	request_exit = false
	
	while request_exit == false
		fetch_queued_urls(opts)
		sleep 1
	end

	return 0
end