Class: LWAC::ConsistencyManager
- Inherits:
-
Object
- Object
- LWAC::ConsistencyManager
- Defined in:
- lib/lwac/server/consistency_manager.rb
Overview
Wraps storage and link policies to enforce efficient workflow with regards links
Provides facilities for the following:
1) Read current state from files 2) Create a new sample. Read links from the db for that sample 3) Write datapoints and whilst keeping track of the link IDs to ensure all are done. 4) Close a sample and ensure everything is complete before opening another
This can be thought of as the server’s API. It wraps all other server functions.
Instance Method Summary collapse
-
#check_in(datapoints = []) ⇒ Object
Check links in, write the return to disk.
-
#check_out(number = :all) ⇒ Object
Retrieve links.
-
#close ⇒ Object
Close the resource neatly.
- #counts ⇒ Object
-
#initialize(config) ⇒ ConsistencyManager
constructor
A new instance of ConsistencyManager.
-
#uncheck(links = []) ⇒ Object
Check links in without converting them to datapoints.
-
#wait ⇒ Object
Calculate how long we have until the sample is “openable”.
Constructor Details
#initialize(config) ⇒ ConsistencyManager
Returns a new instance of ConsistencyManager.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/lwac/server/consistency_manager.rb', line 24 def initialize(config) @storage = StorageManager.new(config[:storage]) @state = @storage.state @mutex = Mutex.new @config = config[:sampling_policy] # Two lists to handle link checkout @links = @state.current_sample.pending @checked_out_links = {} # Print handy messages to people if(@state.last_sample_id == -1) $log.info "No sampling has occurred yet, this is a new deployment." open_sample # Bootstrap the sample end # Print more handy messages to people if(not @state.current_sample.open? and @state.current_sample.complete?) $log.info "Current sample is closed and complete. Opening a new one..." open_sample end $log.info "Current sample: #{@state.current_sample}." if(@state.current_sample.open?) # Prevents the server completing a sample even if already open... # check_sample_limit $log.info "Sample opened at #{@state.current_sample.sample_start_time}, resuming..." else if(wait <= 0) $log.info "Sample is closed but ready to open." else $log.info "Sample closed: wait #{wait}s before sampling until #{Time.now + wait}." end end end |
Instance Method Details
#check_in(datapoints = []) ⇒ Object
Check links in, write the return to disk
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/lwac/server/consistency_manager.rb', line 152 def check_in(datapoints = []) raise "Cannot check in whilst waiting. Wait #{wait}s until #{Time.now + wait}." if wait > 0 @mutex.synchronize{ # Check in each datapoint $log.debug "Checking in #{datapoints.length} datapoints." datapoints.each{|dp| if(@checked_out_links.delete(dp.link.id)) @storage.write_datapoint(dp) # increment the progress counter @state.current_sample.link_complete( dp.response_properties[:downloaded_bytes] || 0 ) # They shouldn't even be in the list below, hence it being commented out. #@links.delete(dp.link.id) else $log.warn "Attempted to check in link with ID #{dp.link.id}, but the sample says it's already been done." end } # Close the sample if we detect that we're done if(@state.current_sample.complete?) $log.info "Current sample complete." close_sample end } end |
#check_out(number = :all) ⇒ Object
Retrieve links
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/lwac/server/consistency_manager.rb', line 73 def check_out(number = :all) raise "Cannot check out links. Wait #{wait}s until #{Time.now + wait}." if wait > 0 if not @state.current_sample.open? then @state.current_sample.open_sample @storage.write_sample end links = [] @mutex.synchronize{ number = @state.remaining if number == :all # Check out links and reserve them $log.debug "Checking out #{number}/#{@state.current_sample.remaining} links." # If the cache isn't large enough, read more from the DB if @links.length < number then $log.debug "Reading #{number-@links.length} links from database (id > #{@state.current_sample.last_dp_id})" # Read from DB ids = @storage.read_link_ids(@state.current_sample.last_dp_id.to_i, (number - @links.length)) # increment the last count or keep it the same if there were no returns @state.current_sample.last_dp_id = (ids.max || @state.current_sample.last_dp_id) # put in the links list @links += ids end # then assign from @links count = 0 select = @links.classify{ ((count+=1) <= number) } # put back the ones we don't want @links = select[false] || Set.new # grab the ones we do and get them from the db links = @storage.read_links( select[true].to_a ) # then pop them in the checkout list links.each{|l| @checked_out_links[l.id] = l } # @links.each{|id| # break if (count+=1) > number # # Read from DB # link = @storage.read_link(id) # # Add to the list of recorded checkec out ones # @checked_out_links[id] = link # # add to the list to return # links << link # # and delete from the pending list # @links.delete(id) # } $log.debug "Done." } $log.debug "Total memory cache usage: #{@checked_out_links.length + @links.length} links" # TODO: exception handling. return links end |
#close ⇒ Object
Close the resource neatly.
188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/lwac/server/consistency_manager.rb', line 188 def close $log.debug "Closing consistency manager by unchecking #{@checked_out_links.values.length} links." # un-check-out all checked-out links uncheck(@checked_out_links.values) @state.current_sample.pending = @links # Close storage manager @storage.close end |
#counts ⇒ Object
62 63 64 65 66 67 68 69 |
# File 'lib/lwac/server/consistency_manager.rb', line 62 def counts start_time = (@state.current_sample) ? @state.current_sample.sample_start_time : nil return @checked_out_links.values.length, @state.current_sample.size, @state.current_sample.progress, start_time, @links.length end |
#uncheck(links = []) ⇒ Object
Check links in without converting them to datapoints. This doesn’t affect data consistency beyond making it possible to guarantee that we don’t duplicate or omit
140 141 142 143 144 145 146 147 148 149 |
# File 'lib/lwac/server/consistency_manager.rb', line 140 def uncheck(links = []) @mutex.synchronize{ links.each{|l| id = l.id if l.class == Link raise "Attempt to uncheck a link that is not checked out" if not @checked_out_links.delete(id) @links << id } } end |
#wait ⇒ Object
Calculate how long we have until the sample is “openable”
181 182 183 184 185 |
# File 'lib/lwac/server/consistency_manager.rb', line 181 def wait @mutex.synchronize{ (@state.next_sample_due - Time.now.to_i).ceil } end |