Class: Apollo::Agent::FetcherAgent

Inherits:
BaseAgent
  • Object
show all
Defined in:
lib/apollo_crawler/agent/fetcher_agent.rb

Constant Summary collapse

THREAD_POOL_SIZE =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from BaseAgent

#run

Constructor Details

#initialize(amqp, opts = {}) ⇒ FetcherAgent

Returns a new instance of FetcherAgent.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 41

def initialize(amqp, opts={})
  self.fetcher = Apollo::Fetcher::SmartFetcher.new

  if(opts[:verbose])
    puts "Initializing fetcher agent..."
  end

  # Declarations
  channel = amqp.create_channel
  channel.prefetch(THREAD_POOL_SIZE)
  
  # Binding (Default)
  self.declarations = Apollo::Agent.declare_entities(channel, opts)
  queue = declarations[:queues]["fetcher.queue"]

  # AMQP contexts for threads
  contexts = []
  (0...THREAD_POOL_SIZE).each do |i|
    puts "FetcherAgent::initialize() - Creating context #{i}" if opts[:verbose]
  end

  # AMQP contexts mutex/lock
  self.mutex = Mutex.new()

  exchange = self.declarations[:exchanges]["fetcher"]

  queue.bind(exchange).subscribe(:ack => true) do |delivery_info, , payload|   
    # There can be troubles with concurency, please see https://groups.google.com/forum/?fromgroups=#!topic/ruby-amqp/aO9GPu-jxuE
    queued_url = JSON.parse(payload)
    url = queued_url["url"]

    puts "FetcherAgent: Received - '#{url}', metadata #{metadata.inspect}" if opts[:verbose]
    self.mutex.synchronize {
      puts "FetcherAgent: Acking - '#{delivery_info.delivery_tag}'" if opts[:verbose]
      channel.basic_ack(delivery_info.delivery_tag, true)
    }

    begin
      doc = Apollo::Fetcher::SmartFetcher::fetch(url)
      doc = process_fetched_doc(queued_url, doc, , opts)
      
      if( && [:reply_to])
        puts "Replying to '#{metadata[:reply_to]}'"
        send_response_msg([:reply_to], queued_url, doc)
      end

    rescue Exception => e
      puts "EXCEPTION: FetcherAgent::initialize() - Unable to fetch '#{url}', reason: '#{e.to_s}'"
    end

    doc
  end
end

Instance Attribute Details

#declarationsObject

Returns the value of attribute declarations.



38
39
40
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 38

def declarations
  @declarations
end

#fetcherObject

Returns the value of attribute fetcher.



37
38
39
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 37

def fetcher
  @fetcher
end

#mutexObject

Returns the value of attribute mutex.



39
40
41
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 39

def mutex
  @mutex
end

Instance Method Details

#format_response_msg(queued_url, doc) ⇒ Object



108
109
110
111
112
113
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 108

def format_response_msg(queued_url, doc)
  return {
    :request => queued_url,
    :response => doc
  }
end

#process_fetched_doc(queued_url, doc, metadata, opts = {}) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 95

def process_fetched_doc(queued_url, doc, , opts={})
  url = queued_url["url"]

  res = Apollo::Model::RawDocument.new
  res.headers = doc.headers
  res.body = doc.body
  res.sha_hash = Digest::SHA1.hexdigest(doc.body)
  res.status = doc.status
  res.url = url

  return res
end

#send_response_msg(dest, queued_url, doc) ⇒ Object



115
116
117
118
119
120
121
122
123
124
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 115

def send_response_msg(dest, queued_url, doc)
  if(dest != nil)              
    msg = format_response_msg(queued_url, doc)

    self.mutex.synchronize {
      exchange = self.declarations[:exchanges][dest]
      exchange.publish(msg.to_json)
    }
  end
end