Class: Apollo::Agent::FetcherAgent

Inherits:
BaseAgent
  • Object
show all
Defined in:
lib/apollo_crawler/agent/fetcher_agent.rb

Constant Summary collapse

THREAD_POOL_SIZE =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from BaseAgent

#run

Constructor Details

#initialize(amqp, opts = {}) ⇒ FetcherAgent



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 41

def initialize(amqp, opts={})
  self.fetcher = Apollo::Fetcher::SmartFetcher.new

  if(opts[:verbose])
    puts "Initializing fetcher agent..."
  end

  # Declarations
  channel = amqp.create_channel
  channel.prefetch(THREAD_POOL_SIZE)
  
  # Binding (Default)
  self.declarations = Apollo::Agent.declare_entities(channel, opts)
  queue = declarations[:queues]["fetcher.queue"]

  # AMQP contexts for threads
  contexts = []
  (0...THREAD_POOL_SIZE).each do |i|
    puts "FetcherAgent::initialize() - Creating context #{i}" if opts[:verbose]
  end

  # AMQP contexts mutex/lock
  self.mutex = Mutex.new()

  exchange = self.declarations[:exchanges]["fetcher"]

  queue.bind(exchange).subscribe(:ack => true) do |delivery_info, , payload|   
    # There can be troubles with concurency, please see https://groups.google.com/forum/?fromgroups=#!topic/ruby-amqp/aO9GPu-jxuE
    queued_url = JSON.parse(payload)
    url = queued_url["url"]

    puts "FetcherAgent: Received - '#{url}', metadata #{metadata.inspect}" if opts[:verbose]
    self.mutex.synchronize {
      puts "FetcherAgent: Acking - '#{delivery_info.delivery_tag}'" if opts[:verbose]
      channel.basic_ack(delivery_info.delivery_tag, true)
    }

    begin
      doc = Apollo::Fetcher::SmartFetcher::fetch(url)
      doc = process_fetched_doc(queued_url, doc, , opts)
      
      if( && [:reply_to])
        puts "Replying to '#{metadata[:reply_to]}'"
        send_response_msg([:reply_to], queued_url, doc)
      end

    rescue Exception => e
      puts "EXCEPTION: FetcherAgent::initialize() - Unable to fetch '#{url}', reason: '#{e.to_s}'"
    end

    doc
  end
end

Instance Attribute Details

#declarationsObject

Returns the value of attribute declarations.



38
39
40
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 38

def declarations
  @declarations
end

#fetcherObject

Returns the value of attribute fetcher.



37
38
39
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 37

def fetcher
  @fetcher
end

#mutexObject

Returns the value of attribute mutex.



39
40
41
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 39

def mutex
  @mutex
end

Instance Method Details

#format_response_msg(queued_url, doc) ⇒ Object



108
109
110
111
112
113
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 108

def format_response_msg(queued_url, doc)
  return {
    :request => queued_url,
    :response => doc
  }
end

#process_fetched_doc(queued_url, doc, metadata, opts = {}) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 95

def process_fetched_doc(queued_url, doc, , opts={})
  url = queued_url["url"]

  res = Apollo::Model::RawDocument.new
  res.headers = doc[:headers]
  res.body = doc[:body]
  res.sha_hash = Digest::SHA1.hexdigest(doc[:body])
  res.status = doc[:status]
  res.url = url

  return res
end

#send_response_msg(dest, queued_url, doc) ⇒ Object



115
116
117
118
119
120
121
122
123
124
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 115

def send_response_msg(dest, queued_url, doc)
  if(dest != nil)              
    msg = format_response_msg(queued_url, doc)

    self.mutex.synchronize {
      exchange = self.declarations[:exchanges][dest]
      exchange.publish(msg.to_json)
    }
  end
end