Class: LWAC::DownloadClient
- Inherits:
-
Object
- Object
- LWAC::DownloadClient
- Defined in:
- lib/lwac/client.rb
Instance Method Summary collapse
-
#initialize(config) ⇒ DownloadClient
constructor
Construct a new DownloadClient.
-
#work ⇒ Object
Poll the server and download from the web, maintaining throughput to the web by downloading batches of links.
Constructor Details
#initialize(config) ⇒ DownloadClient
Construct a new DownloadClient
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/lwac/client.rb', line 23 def initialize(config) # Save the config @config = config # Generate a unique identifier for this host @uuid = generate_uuid # Fire up el RPC client... @rpc_client = SimpleRPC::Client.new(@config[:server]) # Don't RPC again until... @rpc_delay = Time.now # Construct a new multi-curl thingy $log.info "Starting download engine..." @dl = Blat::Queue.new(@config[:client][:simultaneous_workers]) # List of links pending, cached. @links = [] @cache = new_cache @cache_bytes = 0 # Mutices for link access @link_mx = Mutex.new @cache_mx = Mutex.new # Don't try to acquire more data until... @checkout_delay = Time.now # Start the log with UUID info. $log.info "Client started with UUID: #{@uuid}" # ping for helpfulness ping end |
Instance Method Details
#work ⇒ Object
Poll the server and download from the web, maintaining throughput to the web by downloading batches of links.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/lwac/client.rb', line 61 def work loop do @dl.perform do sleep(@config[:client][:monitor_rate]) # Keep the download queue topped up while @dl.request_count < @config[:client][:simultaneous_workers] && (new_link = get_curl) @dl.add(new_link) end # Read things safely using a mutex link_len = @link_mx.synchronize { @links.length } active_requests = @dl.request_count cache_len, bytes = @cache_mx.synchronize { [@cache.length, @cache_bytes] } # Print nice progress output for folks if @config[:client][:announce_progress] && (link_len > 0 || cache_len > 0 || active_requests > 0) progress_mb = bytes.to_f / 1024 / 1024 limit_mb = @config[:client][:cache_limit].to_f / 1024 / 1024 pc_progress = (progress_mb / limit_mb) * 100 str = "#{(@config[:client][:cache_limit], bytes)} #{pc_progress.round}%" str += " #{progress_mb.round(2)}/#{limit_mb.round(2)}MB" str += " (#{link_len} pend, #{active_requests} active, #{cache_len} done)" str += " #{(link_len == 0 && Time.now < @checkout_delay) ? "[waiting #{(@checkout_delay - Time.now).round}s]" : ''}" $log.info(str) end # Run out of links if link_len <= 0 && Time.now > @checkout_delay acquire_links end # Downloaded enough data already if (@dl.idle? || bytes > @config[:client][:cache_limit]) && cache_len > 0 # (@pool.cache_size.to_f / 1024.0 / 1024.0) > @config[:client][:cache_limit] then # Send completed points back to the server send_cache end end $log.debug "Downloader is idle." end rescue SignalException => se $log.fatal "Caught signal - #{se}" ensure # Cancel web requests cancelled = @dl.request_count if @dl.request_count > 0 $log.info "Cancelling #{@dl.request_count} web requests..." @dl.cancel else $log.info "No web requests active." end # Tell the server we're dying if @links.length > 0 || @cache.length > 0 || @dl.request_count > 0 $log.info "Releasing lock on approx. #{@links.length + @cache.length + cancelled} links..." rpc(5) do |s| s.cancel(LWAC::VERSION, @uuid) end else $log.info "No links to clean up." end # Quit $log.info "Done. Client has closed cleanly." end |