Class: LWAC::ConsistencyManager

Inherits:
Object
  • Object
show all
Defined in:
lib/lwac/server/consistency_manager.rb

Overview

Wraps storage and link policies to enforce efficient workflow with regards links

Provides facilities for the following:

1) Read current state from files 2) Create a new sample. Read links from the db for that sample 3) Write datapoints and whilst keeping track of the link IDs to ensure all are done. 4) Close a sample and ensure everything is complete before opening another

This can be thought of as the server’s API. It wraps all other server functions.

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ ConsistencyManager

Returns a new instance of ConsistencyManager.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/lwac/server/consistency_manager.rb', line 24

def initialize(config)
  @storage    = StorageManager.new(config[:storage])
  @state      = @storage.state
  @mutex      = Mutex.new
  @config     = config[:sampling_policy]

  # Two lists to handle link checkout
  @links      = @state.current_sample.pending
  @checked_out_links = {}


  # Print handy messages to people
  if(@state.last_sample_id == -1)
    $log.info "No sampling has occurred yet, this is a new deployment."
    open_sample # Bootstrap the sample
  end

  # Print more handy messages to people
  if(not @state.current_sample.open? and @state.current_sample.complete?)
    $log.info "Current sample is closed and complete.  Opening a new one..."
    open_sample
  end

  $log.info "Current sample: #{@state.current_sample}."
  if(@state.current_sample.open?) 
    # Prevents the server completing a sample even if already open...
    # check_sample_limit
    $log.info "Sample opened at #{@state.current_sample.sample_start_time}, resuming..."
  else
    if(wait <= 0)
      $log.info "Sample is closed but ready to open."
    else
      $log.info "Sample closed: wait #{wait}s before sampling until #{Time.now + wait}."
    end
  end

end

Instance Method Details

#check_in(datapoints = []) ⇒ Object

Check links in, write the return to disk



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/lwac/server/consistency_manager.rb', line 152

def check_in(datapoints = [])
  raise "Cannot check in whilst waiting.  Wait #{wait}s until #{Time.now + wait}." if wait > 0

  @mutex.synchronize{
    # Check in each datapoint
    $log.debug "Checking in #{datapoints.length} datapoints."
    datapoints.each{|dp|
      if(@checked_out_links.delete(dp.link.id))
        @storage.write_datapoint(dp)

        # increment the progress counter
        @state.current_sample.link_complete( dp.response_properties[:downloaded_bytes] || 0 )
        
        # They shouldn't even be in the list below, hence it being commented out.
        #@links.delete(dp.link.id)
      else
        $log.warn "Attempted to check in link with ID #{dp.link.id}, but the sample says it's already been done."
      end
    }

    # Close the sample if we detect that we're done
    if(@state.current_sample.complete?)
      $log.info "Current sample complete."
      close_sample
    end
  }
end

#check_out(number = :all) ⇒ Object

Retrieve links



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/lwac/server/consistency_manager.rb', line 73

def check_out(number = :all)
  raise "Cannot check out links.  Wait #{wait}s until #{Time.now + wait}." if wait > 0
  if not @state.current_sample.open? then
    @state.current_sample.open_sample 
    @storage.write_sample 
  end

  links = []
  @mutex.synchronize{
    number = @state.remaining if number == :all

    # Check out links and reserve them
    $log.debug "Checking out #{number}/#{@state.current_sample.remaining} links."

    # If the cache isn't large enough, read more from the DB
    if @links.length < number then
      $log.debug "Reading #{number-@links.length} links from database (id > #{@state.current_sample.last_dp_id})"

      # Read from DB
      ids = @storage.read_link_ids(@state.current_sample.last_dp_id.to_i, (number - @links.length))

      # increment the last count or keep it the same if there were no returns
      @state.current_sample.last_dp_id = (ids.max || @state.current_sample.last_dp_id)

      # put in the links list
      @links += ids
    end

    # then assign from @links
    count = 0
    select = @links.classify{ ((count+=1) <= number) }

    # put back the ones we don't want
    @links = select[false] || Set.new

    # grab the ones we do and get them from the db
    links = @storage.read_links( select[true].to_a )

    # then pop them in the checkout list
    links.each{|l|
      @checked_out_links[l.id] = l
    }

    # @links.each{|id|
    #   break if (count+=1) > number
    #   # Read from DB
    #   link = @storage.read_link(id)
    #   # Add to the list of recorded checkec out ones
    #   @checked_out_links[id] = link
    #   # add to the list to return
    #   links << link
    #   # and delete from the pending list
    #   @links.delete(id)
    # }

    $log.debug "Done."
  }

  $log.debug "Total memory cache usage: #{@checked_out_links.length + @links.length} links"

  # TODO: exception handling.
  return links
end

#closeObject

Close the resource neatly.



188
189
190
191
192
193
194
195
196
197
198
# File 'lib/lwac/server/consistency_manager.rb', line 188

def close
  $log.debug "Closing consistency manager by unchecking #{@checked_out_links.values.length} links."

  # un-check-out all checked-out links
  uncheck(@checked_out_links.values)
  @state.current_sample.pending = @links


  # Close storage manager
  @storage.close
end

#countsObject



62
63
64
65
66
67
68
69
# File 'lib/lwac/server/consistency_manager.rb', line 62

def counts
  start_time = (@state.current_sample) ? @state.current_sample.sample_start_time : nil
  return @checked_out_links.values.length, 
         @state.current_sample.size, 
         @state.current_sample.progress, 
         start_time,
         @links.length
end

#uncheck(links = []) ⇒ Object

Check links in without converting them to datapoints. This doesn’t affect data consistency beyond making it possible to guarantee that we don’t duplicate or omit



140
141
142
143
144
145
146
147
148
149
# File 'lib/lwac/server/consistency_manager.rb', line 140

def uncheck(links = [])
  @mutex.synchronize{
    links.each{|l|
      id = l.id if l.class == Link
      
      raise "Attempt to uncheck a link that is not checked out" if not @checked_out_links.delete(id)
      @links << id 
    }
  }
end

#waitObject

Calculate how long we have until the sample is “openable”



181
182
183
184
185
# File 'lib/lwac/server/consistency_manager.rb', line 181

def wait
  @mutex.synchronize{
    (@state.next_sample_due - Time.now.to_i).ceil
  }
end