Class: ThreadSafe::AtomicReferenceCacheBackend
- Inherits:
-
Object
- Object
- ThreadSafe::AtomicReferenceCacheBackend
- Extended by:
- Util::Volatile
- Defined in:
- lib/thread_safe/atomic_reference_cache_backend.rb
Overview
A Ruby port of the Doug Lea's jsr166e.ConcurrentHashMapV8 class version 1.59 available in public domain.
Original source code available here: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ConcurrentHashMapV8.java?revision=1.59
The Ruby port skips out the +TreeBin+ (red-black trees for use in bins whose size exceeds a threshold).
A hash table supporting full concurrency of retrievals and high expected concurrency for updates. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access.
Retrieval operations generally do not block, so may overlap with update operations. Retrievals reflect the results of the most recently completed update operations holding upon their onset. (More formally, an update operation for a given key bears a happens-before relation with any (non +nil+) retrieval for that key reporting the updated value.) For aggregate operations such as +clear()+, concurrent retrievals may reflect insertion or removal of only some entries. Similarly, the +each_pair+ iterator yields elements reflecting the state of the hash table at some point at or since the start of the +each_pair+. Bear in mind that the results of aggregate status methods including +size()+ and +empty?+} are typically useful only when a map is not undergoing concurrent updates in other threads. Otherwise the results of these methods reflect transient states that may be adequate for monitoring or estimation purposes, but not for program control.
The table is dynamically expanded when there are too many collisions (i.e., keys that have distinct hash codes but fall into the same slot modulo the table size), with the expected average effect of maintaining roughly two bins per mapping (corresponding to a 0.75 load factor threshold for resizing). There may be much variance around this average as mappings are added and removed, but overall, this maintains a commonly accepted time/space tradeoff for hash tables. However, resizing this or any other kind of hash table may be a relatively slow operation. When possible, it is a good idea to provide a size estimate as an optional :initial_capacity initializer argument. An additional optional :load_factor constructor argument provides a further means of customizing initial table capacity by specifying the table density to be used in calculating the amount of space to allocate for the given number of elements. Note that using many keys with exactly the same +hash+ is a sure way to slow down performance of any hash table.
Design overview
The primary design goal of this hash table is to maintain concurrent readability (typically method +[]+, but also iteration and related methods) while minimizing update contention. Secondary goals are to keep space consumption about the same or better than plain +Hash+, and to support high initial insertion rates on an empty table by many threads.
Each key-value mapping is held in a +Node+. The validation-based approach explained below leads to a lot of code sprawl because retry-control precludes factoring into smaller methods.
The table is lazily initialized to a power-of-two size upon the first insertion. Each bin in the table normally contains a list of +Node+s (most often, the list has only zero or one +Node+). Table accesses require volatile/atomic reads, writes, and CASes. The lists of nodes within bins are always accurately traversable under volatile reads, so long as lookups check hash code and non-nullness of value before checking key equality.
We use the top two bits of +Node+ hash fields for control purposes -- they are available anyway because of addressing constraints. As explained further below, these top bits are used as follows:
- 00 - Normal
- 01 - Locked
- 11 - Locked and may have a thread waiting for lock
- 10 - +Node+ is a forwarding node
The lower 28 bits of each +Node+'s hash field contain a the key's hash code, except for forwarding nodes, for which the lower bits are zero (and so always have hash field == +MOVED+).
Insertion (via +[]=+ or its variants) of the first node in an empty bin is performed by just CASing it to the bin. This is by far the most common case for put operations under most key/hash distributions. Other update operations (insert, delete, and replace) require locks. We do not want to waste the space required to associate a distinct lock object with each bin, so instead use the first node of a bin list itself as a lock. Blocking support for these locks relies +Util::CheapLockable. However, we also need a +try_lock+ construction, so we overlay these by using bits of the +Node+ hash field for lock control (see above), and so normally use builtin monitors only for blocking and signalling using +cheap_wait+/+cheap_broadcast+ constructions. See +Node#try_await_lock+.
Using the first node of a list as a lock does not by itself suffice though: When a node is locked, any update must first validate that it is still the first node after locking it, and retry if not. Because new nodes are always appended to lists, once a node is first in a bin, it remains first until deleted or the bin becomes invalidated (upon resizing). However, operations that only conditionally update may inspect nodes until the point of update. This is a converse of sorts to the lazy locking technique described by Herlihy & Shavit.
The main disadvantage of per-bin locks is that other update operations on other nodes in a bin list protected by the same lock can stall, for example when user +eql?+ or mapping functions take a long time. However, statistically, under random hash codes, this is not a common problem. Ideally, the frequency of nodes in bins follows a Poisson distribution (http://en.wikipedia.org/wiki/Poisson_distribution) with a parameter of about 0.5 on average, given the resizing threshold of 0.75, although with a large variance because of resizing granularity. Ignoring variance, the expected occurrences of list size k are (exp(-0.5) * pow(0.5, k) / factorial(k)). The first values are:
- 0: 0.60653066
- 1: 0.30326533
- 2: 0.07581633
- 3: 0.01263606
- 4: 0.00157952
- 5: 0.00015795
- 6: 0.00001316
- 7: 0.00000094
- 8: 0.00000006
- more: less than 1 in ten million
Lock contention probability for two threads accessing distinct elements is roughly 1 / (8 * #elements) under random hashes.
The table is resized when occupancy exceeds a percentage threshold (nominally, 0.75, but see below). Only a single thread performs the resize (using field +size_control+, to arrange exclusion), but the table otherwise remains usable for reads and updates. Resizing proceeds by transferring bins, one by one, from the table to the next table. Because we are using power-of-two expansion, the elements from each bin must either stay at same index, or move with a power of two offset. We eliminate unnecessary node creation by catching cases where old nodes can be reused because their next fields won't change. On average, only about one-sixth of them need cloning when a table doubles. The nodes they replace will be garbage collectable as soon as they are no longer referenced by any reader thread that may be in the midst of concurrently traversing table. Upon transfer, the old table bin contains only a special forwarding node (with hash field +MOVED+) that contains the next table as its key. On encountering a forwarding node, access and update operations restart, using the new table.
Each bin transfer requires its bin lock. However, unlike other cases, a transfer can skip a bin if it fails to acquire its lock, and revisit it later. Method +rebuild+ maintains a buffer of TRANSFER_BUFFER_SIZE bins that have been skipped because of failure to acquire a lock, and blocks only if none are available (i.e., only very rarely). The transfer operation must also ensure that all accessible bins in both the old and new table are usable by any traversal. When there are no lock acquisition failures, this is arranged simply by proceeding from the last bin (+table.size - 1+) up towards the first. Upon seeing a forwarding node, traversals arrange to move to the new table without revisiting nodes. However, when any node is skipped during a transfer, all earlier table bins may have become visible, so are initialized with a reverse-forwarding node back to the old table until the new ones are established. (This sometimes requires transiently locking a forwarding node, which is possible under the above encoding.) These more expensive mechanics trigger only when necessary.
The traversal scheme also applies to partial traversals of ranges of bins (via an alternate Traverser constructor) to support partitioned aggregate operations. Also, read-only operations give up if ever forwarded to a null table, which provides support for shutdown-style clearing, which is also not currently implemented.
Lazy table initialization minimizes footprint until first use.
The element count is maintained using a +ThreadSafe::Util::Adder+, which avoids contention on updates but can encounter cache thrashing if read too frequently during concurrent access. To avoid reading so often, resizing is attempted either when a bin lock is contended, or upon adding to a bin already holding two or more nodes (checked before adding in the +x_if_absent+ methods, after adding in others). Under uniform hash distributions, the probability of this occurring at threshold is around 13%, meaning that only about 1 in 8 puts check threshold (and after resizing, many fewer do so). But this approximation has high variance for small table sizes, so we check on any collision for sizes <= 64. The bulk putAll operation further reduces contention by only committing count updates upon these size checks.
Defined Under Namespace
Constant Summary collapse
- MOVED =
shorthands
Node::MOVED
- LOCKED =
Node::LOCKED
- WAITING =
Node::WAITING
- HASH_BITS =
Node::HASH_BITS
- NOW_RESIZING =
-1
- DEFAULT_CAPACITY =
16
- MAX_CAPACITY =
Util::MAX_INT
- TRANSFER_BUFFER_SIZE =
The buffer size for skipped bins during transfers. The value is arbitrary but should be large enough to avoid most locking stalls during resizes.
32
Class Method Summary collapse
-
.attr_volatile(*attr_names) ⇒ Object
extended
from Util::Volatile
Provides +volatile+ (in the JVM's sense) attribute accessors implemented atop of the +AtomicReference+s.
Instance Method Summary collapse
- #[](key) ⇒ Object
- #[]=(key, value) ⇒ Object
-
#clear ⇒ Object
Implementation for clear.
- #compute(key) ⇒ Object
- #compute_if_absent(key) ⇒ Object
- #compute_if_present(key) ⇒ Object
- #delete(key) ⇒ Object
- #delete_pair(key, value) ⇒ Object
- #each_pair ⇒ Object
- #empty? ⇒ Boolean
-
#get_and_set(key, value) ⇒ Object
internalPut in the original CHMV8.
- #get_or_default(key, else_value = nil) ⇒ Object
-
#initialize(options = nil) ⇒ AtomicReferenceCacheBackend
constructor
A new instance of AtomicReferenceCacheBackend.
- #key?(key) ⇒ Boolean
- #merge_pair(key, value) ⇒ Object
- #replace_if_exists(key, new_value) ⇒ Object
- #replace_pair(key, old_value, new_value) ⇒ Object
- #size ⇒ Object
Constructor Details
#initialize(options = nil) ⇒ AtomicReferenceCacheBackend
357 358 359 360 361 362 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 357 def initialize( = nil) super() @counter = Util::Adder.new initial_capacity = && [:initial_capacity] || DEFAULT_CAPACITY self.size_control = (capacity = table_size_for(initial_capacity)) > MAX_CAPACITY ? MAX_CAPACITY : capacity end |
Class Method Details
.attr_volatile(*attr_names) ⇒ Object Originally defined in module Util::Volatile
Provides +volatile+ (in the JVM's sense) attribute accessors implemented atop of the +AtomicReference+s.
Usage: class Foo extend ThreadSafe::Util::Volatile attr_volatile :foo, :bar
def initialize()
super() # must super() into parent initializers before using the volatile attribute accessors
self. =
end
def hello
my_foo = foo # volatile read
self.foo = 1 # volatile write
cas_foo(1, 2) # => true | a strong CAS
end
end
Instance Method Details
#[](key) ⇒ Object
382 383 384 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 382 def [](key) get_or_default(key) end |
#[]=(key, value) ⇒ Object
390 391 392 393 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 390 def []=(key, value) get_and_set(key, value) value end |
#clear ⇒ Object
Implementation for clear. Steps through each bin, removing all nodes.
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 529 def clear return self unless current_table = table current_table_size = current_table.size deleted_count = i = 0 while i < current_table_size if !(node = current_table.volatile_get(i)) i += 1 elsif (node_hash = node.hash) == MOVED current_table = node.key current_table_size = current_table.size elsif Node.locked_hash?(node_hash) decrement_size(deleted_count) # opportunistically update count deleted_count = 0 node.try_await_lock(current_table, i) else current_table.try_lock_via_hash(i, node, node_hash) do begin deleted_count += 1 if NULL != node.value # recheck under lock node.value = nil end while node = node.next current_table.volatile_set(i, nil) i += 1 end end end decrement_size(deleted_count) self end |
#compute(key) ⇒ Object
430 431 432 433 434 435 436 437 438 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 430 def compute(key) internal_compute(key) do |old_value| if (new_value = yield(NULL == old_value ? nil : old_value)).nil? NULL else new_value end end end |
#compute_if_absent(key) ⇒ Object
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 395 def compute_if_absent(key) hash = key_hash(key) current_table = table || initialize_table while true if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash))) succeeded, new_value = current_table.try_to_cas_in_computed(i, hash, key) { yield } if succeeded increment_size return new_value end elsif (node_hash = node.hash) == MOVED current_table = node.key elsif NULL != (current_value = find_value_in_node_list(node, key, hash, node_hash & HASH_BITS)) return current_value elsif Node.locked_hash?(node_hash) try_await_lock(current_table, i, node) else succeeded, value = attempt_internal_compute_if_absent(key, hash, current_table, i, node, node_hash) { yield } return value if succeeded end end end |
#compute_if_present(key) ⇒ Object
418 419 420 421 422 423 424 425 426 427 428 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 418 def compute_if_present(key) new_value = nil internal_replace(key) do |old_value| if (new_value = yield(NULL == old_value ? nil : old_value)).nil? NULL else new_value end end new_value end |
#delete(key) ⇒ Object
480 481 482 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 480 def delete(key) replace_if_exists(key, NULL) end |
#delete_pair(key, value) ⇒ Object
484 485 486 487 488 489 490 491 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 484 def delete_pair(key, value) result = internal_replace(key, value) { NULL } if result && NULL != result !!result else false end end |
#each_pair ⇒ Object
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 493 def each_pair return self unless current_table = table current_table_size = base_size = current_table.size i = base_index = 0 while base_index < base_size if node = current_table.volatile_get(i) if node.hash == MOVED current_table = node.key current_table_size = current_table.size else begin if NULL != (value = node.value) # skip deleted or special nodes yield node.key, value end end while node = node.next end end if (i_with_base = i + base_size) < current_table_size i = i_with_base # visit upper slots if present else i = base_index += 1 end end self end |
#empty? ⇒ Boolean
524 525 526 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 524 def empty? size == 0 end |
#get_and_set(key, value) ⇒ Object
internalPut in the original CHMV8
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 460 def get_and_set(key, value) # internalPut in the original CHMV8 hash = key_hash(key) current_table = table || initialize_table while true if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash))) if current_table.cas_new_node(i, hash, key, value) increment_size break end elsif (node_hash = node.hash) == MOVED current_table = node.key elsif Node.locked_hash?(node_hash) try_await_lock(current_table, i, node) else succeeded, old_value = attempt_get_and_set(key, value, hash, current_table, i, node, node_hash) break old_value if succeeded end end end |
#get_or_default(key, else_value = nil) ⇒ Object
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 364 def get_or_default(key, else_value = nil) hash = key_hash(key) current_table = table while current_table node = current_table.volatile_get_by_hash(hash) current_table = while node if (node_hash = node.hash) == MOVED break node.key elsif (node_hash & HASH_BITS) == hash && node.key?(key) && NULL != (value = node.value) return value end node = node.next end end else_value end |
#key?(key) ⇒ Boolean
386 387 388 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 386 def key?(key) get_or_default(key, NULL) != NULL end |
#merge_pair(key, value) ⇒ Object
440 441 442 443 444 445 446 447 448 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 440 def merge_pair(key, value) internal_compute(key) do |old_value| if NULL == old_value || !(value = yield(old_value)).nil? value else NULL end end end |
#replace_if_exists(key, new_value) ⇒ Object
454 455 456 457 458 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 454 def replace_if_exists(key, new_value) if (result = internal_replace(key) { new_value }) && NULL != result result end end |
#replace_pair(key, old_value, new_value) ⇒ Object
450 451 452 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 450 def replace_pair(key, old_value, new_value) NULL != internal_replace(key, old_value) { new_value } end |
#size ⇒ Object
520 521 522 |
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 520 def size (sum = @counter.sum) < 0 ? 0 : sum # ignore transient negative values end |