diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index c989aa0fc..5558a2d40 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -58,6 +58,11 @@ public partial class FasterKV : FasterBase, /// public long IndexSize => state[resizeInfo.version].size; + /// + /// Number of overflow buckets in use (64 bytes each) + /// + public long OverflowBucketCount => overflowBucketsAllocator.GetMaxValidAddress(); + /// /// Comparer used by FASTER /// @@ -656,12 +661,43 @@ internal Status ContextDelete( } /// - /// Grow the hash index + /// Grow the hash index by a factor of two. Make sure to take a full checkpoint + /// after growth, for persistence. /// - /// Whether the request succeeded + /// Whether the grow completed public bool GrowIndex() { - return StartStateMachine(new IndexResizeStateMachine()); + if (LightEpoch.AnyInstanceProtected()) + throw new FasterException("Cannot use GrowIndex when using legacy or non-async sessions"); + + if (!StartStateMachine(new IndexResizeStateMachine())) return false; + + epoch.Resume(); + + try + { + while (true) + { + SystemState _systemState = SystemState.Copy(ref systemState); + if (_systemState.phase == Phase.IN_PROGRESS_GROW) + { + SplitBuckets(0); + epoch.ProtectAndDrain(); + } + else + { + SystemState.RemoveIntermediate(ref _systemState); + if (_systemState.phase != Phase.PREPARE_GROW && _systemState.phase != Phase.IN_PROGRESS_GROW) + { + return true; + } + } + } + } + finally + { + epoch.Suspend(); + } } /// diff --git a/cs/src/core/Index/FASTER/FASTERBase.cs b/cs/src/core/Index/FASTER/FASTERBase.cs index 863e57902..5918ac2e8 100644 --- a/cs/src/core/Index/FASTER/FASTERBase.cs +++ b/cs/src/core/Index/FASTER/FASTERBase.cs @@ -244,7 +244,8 @@ public unsafe partial class FasterBase internal long minTableSize = 16; // Allocator for the hash buckets - internal readonly MallocFixedPageSize overflowBucketsAllocator; + internal MallocFixedPageSize overflowBucketsAllocator; + internal MallocFixedPageSize overflowBucketsAllocatorResize; // An array of size two, that contains the old and new versions of the hash-table internal InternalHashTable[] state = new InternalHashTable[2]; diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 4f2ff9397..9edcd1090 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -1744,18 +1744,21 @@ private bool TraceBackForKeyMatch( foundPhysicalAddress = Constants.kInvalidAddress; return false; } -#endregion + #endregion -#region Split Index + #region Split Index private void SplitBuckets(long hash) { long masked_bucket_index = hash & state[1 - resizeInfo.version].size_mask; int offset = (int)(masked_bucket_index >> Constants.kSizeofChunkBits); + SplitBuckets(offset); + } + private void SplitBuckets(int offset) + { int numChunks = (int)(state[1 - resizeInfo.version].size / Constants.kSizeofChunk); if (numChunks == 0) numChunks = 1; // at least one chunk - if (!Utility.IsPowerOfTwo(numChunks)) { throw new FasterException("Invalid number of chunks: " + numChunks); @@ -1780,6 +1783,8 @@ private void SplitBuckets(long hash) { // GC old version of hash table state[1 - resizeInfo.version] = default; + overflowBucketsAllocatorResize.Dispose(); + overflowBucketsAllocatorResize = null; GlobalStateMachineStep(systemState); return; } @@ -1789,7 +1794,7 @@ private void SplitBuckets(long hash) while (Interlocked.Read(ref splitStatus[offset & (numChunks - 1)]) == 1) { - + Thread.Yield(); } } @@ -1821,17 +1826,26 @@ private void SplitChunk( } var logicalAddress = entry.Address; - if (logicalAddress >= hlog.HeadAddress) + long physicalAddress = 0; + + if (entry.ReadCache && (entry.Address & ~Constants.kReadCacheBitMask) >= readcache.HeadAddress) + physicalAddress = readcache.GetPhysicalAddress(entry.Address & ~Constants.kReadCacheBitMask); + else if (logicalAddress >= hlog.HeadAddress) + physicalAddress = hlog.GetPhysicalAddress(logicalAddress); + + // It is safe to always use hlog instead of readcache for some calls such + // as GetKey and GetInfo + if (physicalAddress != 0) { - var physicalAddress = hlog.GetPhysicalAddress(logicalAddress); var hash = comparer.GetHashCode64(ref hlog.GetKey(physicalAddress)); if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == 0) { // Insert in left if (left == left_end) { - var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate(); - *left = (long)new_bucket; + var new_bucket_logical = overflowBucketsAllocator.Allocate(); + var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical); + *left = new_bucket_logical; left = (long*)new_bucket; left_end = left + Constants.kOverflowBucketIndex; } @@ -1841,12 +1855,13 @@ private void SplitChunk( // Insert previous address in right entry.Address = TraceBackForOtherChainStart(hlog.GetInfo(physicalAddress).PreviousAddress, 1); - if (entry.Address != Constants.kInvalidAddress) + if ((entry.Address != Constants.kInvalidAddress) && (entry.Address != Constants.kTempInvalidAddress)) { if (right == right_end) { - var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate(); - *right = (long)new_bucket; + var new_bucket_logical = overflowBucketsAllocator.Allocate(); + var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical); + *right = new_bucket_logical; right = (long*)new_bucket; right_end = right + Constants.kOverflowBucketIndex; } @@ -1860,8 +1875,9 @@ private void SplitChunk( // Insert in right if (right == right_end) { - var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate(); - *right = (long)new_bucket; + var new_bucket_logical = overflowBucketsAllocator.Allocate(); + var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical); + *right = new_bucket_logical; right = (long*)new_bucket; right_end = right + Constants.kOverflowBucketIndex; } @@ -1871,12 +1887,13 @@ private void SplitChunk( // Insert previous address in left entry.Address = TraceBackForOtherChainStart(hlog.GetInfo(physicalAddress).PreviousAddress, 0); - if (entry.Address != Constants.kInvalidAddress) + if ((entry.Address != Constants.kInvalidAddress) && (entry.Address != Constants.kTempInvalidAddress)) { if (left == left_end) { - var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate(); - *left = (long)new_bucket; + var new_bucket_logical = overflowBucketsAllocator.Allocate(); + var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical); + *left = new_bucket_logical; left = (long*)new_bucket; left_end = left + Constants.kOverflowBucketIndex; } @@ -1893,8 +1910,9 @@ private void SplitChunk( // Insert in left if (left == left_end) { - var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate(); - *left = (long)new_bucket; + var new_bucket_logical = overflowBucketsAllocator.Allocate(); + var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical); + *left = new_bucket_logical; left = (long*)new_bucket; left_end = left + Constants.kOverflowBucketIndex; } @@ -1905,8 +1923,9 @@ private void SplitChunk( // Insert in right if (right == right_end) { - var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate(); - *right = (long)new_bucket; + var new_bucket_logical = overflowBucketsAllocator.Allocate(); + var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical); + *right = new_bucket_logical; right = (long*)new_bucket; right_end = right + Constants.kOverflowBucketIndex; } @@ -1917,22 +1936,41 @@ private void SplitChunk( } if (*(((long*)src_start) + Constants.kOverflowBucketIndex) == 0) break; - src_start = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(*(((long*)src_start) + Constants.kOverflowBucketIndex)); + src_start = (HashBucket*)overflowBucketsAllocatorResize.GetPhysicalAddress(*(((long*)src_start) + Constants.kOverflowBucketIndex)); } while (true); } } private long TraceBackForOtherChainStart(long logicalAddress, int bit) { - while (logicalAddress >= hlog.HeadAddress) + while (true) { - var physicalAddress = hlog.GetPhysicalAddress(logicalAddress); - var hash = comparer.GetHashCode64(ref hlog.GetKey(physicalAddress)); - if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == bit) + HashBucketEntry entry = default; + entry.Address = logicalAddress; + if (entry.ReadCache) { - return logicalAddress; + if (logicalAddress < readcache.HeadAddress) + break; + var physicalAddress = readcache.GetPhysicalAddress(logicalAddress); + var hash = comparer.GetHashCode64(ref readcache.GetKey(physicalAddress)); + if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == bit) + { + return logicalAddress; + } + logicalAddress = readcache.GetInfo(physicalAddress).PreviousAddress; + } + else + { + if (logicalAddress < hlog.HeadAddress) + break; + var physicalAddress = hlog.GetPhysicalAddress(logicalAddress); + var hash = comparer.GetHashCode64(ref hlog.GetKey(physicalAddress)); + if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == bit) + { + return logicalAddress; + } + logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress; } - logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress; } return logicalAddress; } @@ -2087,7 +2125,7 @@ private long GetLatestRecordVersion(ref HashBucketEntry entry, long defaultVersi if (UseReadCache && entry.ReadCache) { var _addr = readcache.GetPhysicalAddress(entry.Address & ~Constants.kReadCacheBitMask); - if (entry.Address >= readcache.HeadAddress) + if ((entry.Address & ~Constants.kReadCacheBitMask) >= readcache.HeadAddress) return readcache.GetInfo(_addr).Version; else return defaultVersion; diff --git a/cs/src/core/Index/Recovery/IndexRecovery.cs b/cs/src/core/Index/Recovery/IndexRecovery.cs index 36a5b6961..d174f67e2 100644 --- a/cs/src/core/Index/Recovery/IndexRecovery.cs +++ b/cs/src/core/Index/Recovery/IndexRecovery.cs @@ -39,15 +39,20 @@ private uint InitializeMainIndexRecovery(ref IndexCheckpointInfo info, bool isAs var token = info.info.token; var ht_version = resizeInfo.version; - if (state[ht_version].size != info.info.table_size) - throw new FasterException($"Incompatible hash table size during recovery; allocated {state[ht_version].size} buckets, recovering {info.info.table_size} buckets"); - // Create devices to read from using Async API info.main_ht_device = checkpointManager.GetIndexDevice(token); + var sectorSize = info.main_ht_device.SectorSize; + + if (state[ht_version].size != info.info.table_size) + { + Free(ht_version); + Initialize(info.info.table_size, (int)sectorSize); + } + BeginMainIndexRecovery(ht_version, info.main_ht_device, info.info.num_ht_bytes, isAsync); - var sectorSize = info.main_ht_device.SectorSize; + var alignedIndexSize = (uint)((info.info.num_ht_bytes + (sectorSize - 1)) & ~(sectorSize - 1)); return alignedIndexSize; } diff --git a/cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs b/cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs index 4c1553712..a43d0ae47 100644 --- a/cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs +++ b/cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs @@ -26,7 +26,8 @@ public void GlobalBeforeEnteringState( faster.numPendingChunksToBeSplit = numChunks; faster.splitStatus = new long[numChunks]; - + faster.overflowBucketsAllocatorResize = faster.overflowBucketsAllocator; + faster.overflowBucketsAllocator = new MallocFixedPageSize(false); faster.Initialize(1 - faster.resizeInfo.version, faster.state[faster.resizeInfo.version].size * 2, faster.sectorSize); faster.resizeInfo.version = 1 - faster.resizeInfo.version; diff --git a/cs/test/RecoveryChecks.cs b/cs/test/RecoveryChecks.cs index 760758ed6..8dedb18e8 100644 --- a/cs/test/RecoveryChecks.cs +++ b/cs/test/RecoveryChecks.cs @@ -313,5 +313,78 @@ public async ValueTask RecoveryCheck4([Values] CheckpointType checkpointType, [V s2.CompletePending(true); } } + + [Test] + public async ValueTask RecoveryCheck5([Values] CheckpointType checkpointType, [Values] bool isAsync, [Values] bool useReadCache, [Values(128, 1 << 10)] int size) + { + using var fht1 = new FasterKV + (size, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 14, ReadCacheSettings = useReadCache ? new ReadCacheSettings() : null }, + checkpointSettings: new CheckpointSettings { CheckpointDir = path } + ); + + using var s1 = fht1.NewSession(new MyFunctions()); + for (long key = 0; key < 1000; key++) + { + s1.Upsert(ref key, ref key); + } + + if (useReadCache) + { + fht1.Log.FlushAndEvict(true); + for (long key = 0; key < 1000; key++) + { + long output = default; + var status = s1.Read(ref key, ref output); + if (status != Status.PENDING) + Assert.IsTrue(status == Status.OK && output == key); + } + s1.CompletePending(true); + } + + fht1.GrowIndex(); + + for (long key = 0; key < 1000; key++) + { + long output = default; + var status = s1.Read(ref key, ref output); + if (status != Status.PENDING) + Assert.IsTrue(status == Status.OK && output == key); + } + s1.CompletePending(true); + + var task = fht1.TakeFullCheckpointAsync(checkpointType); + + using var fht2 = new FasterKV + (size, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20, ReadCacheSettings = useReadCache ? new ReadCacheSettings() : null }, + checkpointSettings: new CheckpointSettings { CheckpointDir = path } + ); + + if (isAsync) + { + await task; + await fht2.RecoverAsync(); + } + else + { + task.GetAwaiter().GetResult(); + fht2.Recover(); + } + + Assert.IsTrue(fht1.Log.HeadAddress == fht2.Log.HeadAddress); + Assert.IsTrue(fht1.Log.ReadOnlyAddress == fht2.Log.ReadOnlyAddress); + Assert.IsTrue(fht1.Log.TailAddress == fht2.Log.TailAddress); + + using var s2 = fht2.NewSession(new MyFunctions()); + for (long key = 0; key < 1000; key++) + { + long output = default; + var status = s2.Read(ref key, ref output); + if (status != Status.PENDING) + Assert.IsTrue(status == Status.OK && output == key); + } + s2.CompletePending(true); + } } }