Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed GrowIndex to work correctly #395

Merged
merged 6 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public partial class FasterKV<Key, Value> : FasterBase,
/// </summary>
public long IndexSize => state[resizeInfo.version].size;

/// <summary>
/// Number of overflow buckets in use (64 bytes each)
/// </summary>
public long OverflowBucketCount => overflowBucketsAllocator.GetMaxValidAddress();

/// <summary>
/// Comparer used by FASTER
/// </summary>
Expand Down Expand Up @@ -656,12 +661,43 @@ internal Status ContextDelete<Input, Output, Context, FasterSession>(
}

/// <summary>
/// Grow the hash index
/// Grow the hash index by a factor of two. Make sure to take a full checkpoint
/// after growth, for persistence.
/// </summary>
/// <returns>Whether the request succeeded</returns>
/// <returns>Whether the grow completed</returns>
public bool GrowIndex()
{
return StartStateMachine(new IndexResizeStateMachine());
if (LightEpoch.AnyInstanceProtected())
throw new FasterException("Cannot use GrowIndex when using legacy or non-async sessions");

if (!StartStateMachine(new IndexResizeStateMachine())) return false;

epoch.Resume();

try
{
while (true)
{
SystemState _systemState = SystemState.Copy(ref systemState);
if (_systemState.phase == Phase.IN_PROGRESS_GROW)
{
SplitBuckets(0);
epoch.ProtectAndDrain();
}
else
{
SystemState.RemoveIntermediate(ref _systemState);
if (_systemState.phase != Phase.PREPARE_GROW && _systemState.phase != Phase.IN_PROGRESS_GROW)
{
return true;
}
}
}
}
finally
{
epoch.Suspend();
}
}

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Index/FASTER/FASTERBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ public unsafe partial class FasterBase
internal long minTableSize = 16;

// Allocator for the hash buckets
internal readonly MallocFixedPageSize<HashBucket> overflowBucketsAllocator;
internal MallocFixedPageSize<HashBucket> overflowBucketsAllocator;
internal MallocFixedPageSize<HashBucket> overflowBucketsAllocatorResize;

// An array of size two, that contains the old and new versions of the hash-table
internal InternalHashTable[] state = new InternalHashTable[2];
Expand Down
94 changes: 66 additions & 28 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1744,18 +1744,21 @@ private bool TraceBackForKeyMatch(
foundPhysicalAddress = Constants.kInvalidAddress;
return false;
}
#endregion
#endregion

#region Split Index
#region Split Index
private void SplitBuckets(long hash)
{
long masked_bucket_index = hash & state[1 - resizeInfo.version].size_mask;
int offset = (int)(masked_bucket_index >> Constants.kSizeofChunkBits);
SplitBuckets(offset);
}

private void SplitBuckets(int offset)
{
int numChunks = (int)(state[1 - resizeInfo.version].size / Constants.kSizeofChunk);
if (numChunks == 0) numChunks = 1; // at least one chunk


if (!Utility.IsPowerOfTwo(numChunks))
{
throw new FasterException("Invalid number of chunks: " + numChunks);
Expand All @@ -1780,6 +1783,8 @@ private void SplitBuckets(long hash)
{
// GC old version of hash table
state[1 - resizeInfo.version] = default;
overflowBucketsAllocatorResize.Dispose();
overflowBucketsAllocatorResize = null;
GlobalStateMachineStep(systemState);
return;
}
Expand All @@ -1789,7 +1794,7 @@ private void SplitBuckets(long hash)

while (Interlocked.Read(ref splitStatus[offset & (numChunks - 1)]) == 1)
{

Thread.Yield();
}

}
Expand Down Expand Up @@ -1821,17 +1826,26 @@ private void SplitChunk(
}

var logicalAddress = entry.Address;
if (logicalAddress >= hlog.HeadAddress)
long physicalAddress = 0;

if (entry.ReadCache && (entry.Address & ~Constants.kReadCacheBitMask) >= readcache.HeadAddress)
physicalAddress = readcache.GetPhysicalAddress(entry.Address & ~Constants.kReadCacheBitMask);
else if (logicalAddress >= hlog.HeadAddress)
physicalAddress = hlog.GetPhysicalAddress(logicalAddress);

// It is safe to always use hlog instead of readcache for some calls such
// as GetKey and GetInfo
if (physicalAddress != 0)
{
var physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
var hash = comparer.GetHashCode64(ref hlog.GetKey(physicalAddress));
if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == 0)
{
// Insert in left
if (left == left_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*left = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*left = new_bucket_logical;
left = (long*)new_bucket;
left_end = left + Constants.kOverflowBucketIndex;
}
Expand All @@ -1841,12 +1855,13 @@ private void SplitChunk(

// Insert previous address in right
entry.Address = TraceBackForOtherChainStart(hlog.GetInfo(physicalAddress).PreviousAddress, 1);
if (entry.Address != Constants.kInvalidAddress)
if ((entry.Address != Constants.kInvalidAddress) && (entry.Address != Constants.kTempInvalidAddress))
{
if (right == right_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*right = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*right = new_bucket_logical;
right = (long*)new_bucket;
right_end = right + Constants.kOverflowBucketIndex;
}
Expand All @@ -1860,8 +1875,9 @@ private void SplitChunk(
// Insert in right
if (right == right_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*right = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*right = new_bucket_logical;
right = (long*)new_bucket;
right_end = right + Constants.kOverflowBucketIndex;
}
Expand All @@ -1871,12 +1887,13 @@ private void SplitChunk(

// Insert previous address in left
entry.Address = TraceBackForOtherChainStart(hlog.GetInfo(physicalAddress).PreviousAddress, 0);
if (entry.Address != Constants.kInvalidAddress)
if ((entry.Address != Constants.kInvalidAddress) && (entry.Address != Constants.kTempInvalidAddress))
{
if (left == left_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*left = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*left = new_bucket_logical;
left = (long*)new_bucket;
left_end = left + Constants.kOverflowBucketIndex;
}
Expand All @@ -1893,8 +1910,9 @@ private void SplitChunk(
// Insert in left
if (left == left_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*left = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*left = new_bucket_logical;
left = (long*)new_bucket;
left_end = left + Constants.kOverflowBucketIndex;
}
Expand All @@ -1905,8 +1923,9 @@ private void SplitChunk(
// Insert in right
if (right == right_end)
{
var new_bucket = (HashBucket*)overflowBucketsAllocator.Allocate();
*right = (long)new_bucket;
var new_bucket_logical = overflowBucketsAllocator.Allocate();
var new_bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(new_bucket_logical);
*right = new_bucket_logical;
right = (long*)new_bucket;
right_end = right + Constants.kOverflowBucketIndex;
}
Expand All @@ -1917,22 +1936,41 @@ private void SplitChunk(
}

if (*(((long*)src_start) + Constants.kOverflowBucketIndex) == 0) break;
src_start = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(*(((long*)src_start) + Constants.kOverflowBucketIndex));
src_start = (HashBucket*)overflowBucketsAllocatorResize.GetPhysicalAddress(*(((long*)src_start) + Constants.kOverflowBucketIndex));
} while (true);
}
}

private long TraceBackForOtherChainStart(long logicalAddress, int bit)
{
while (logicalAddress >= hlog.HeadAddress)
while (true)
{
var physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
var hash = comparer.GetHashCode64(ref hlog.GetKey(physicalAddress));
if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == bit)
HashBucketEntry entry = default;
entry.Address = logicalAddress;
if (entry.ReadCache)
{
return logicalAddress;
if (logicalAddress < readcache.HeadAddress)
break;
var physicalAddress = readcache.GetPhysicalAddress(logicalAddress);
var hash = comparer.GetHashCode64(ref readcache.GetKey(physicalAddress));
if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == bit)
{
return logicalAddress;
}
logicalAddress = readcache.GetInfo(physicalAddress).PreviousAddress;
}
else
{
if (logicalAddress < hlog.HeadAddress)
break;
var physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
var hash = comparer.GetHashCode64(ref hlog.GetKey(physicalAddress));
if ((hash & state[resizeInfo.version].size_mask) >> (state[resizeInfo.version].size_bits - 1) == bit)
{
return logicalAddress;
}
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
}
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
}
return logicalAddress;
}
Expand Down Expand Up @@ -2087,7 +2125,7 @@ private long GetLatestRecordVersion(ref HashBucketEntry entry, long defaultVersi
if (UseReadCache && entry.ReadCache)
{
var _addr = readcache.GetPhysicalAddress(entry.Address & ~Constants.kReadCacheBitMask);
if (entry.Address >= readcache.HeadAddress)
if ((entry.Address & ~Constants.kReadCacheBitMask) >= readcache.HeadAddress)
return readcache.GetInfo(_addr).Version;
else
return defaultVersion;
Expand Down
13 changes: 9 additions & 4 deletions cs/src/core/Index/Recovery/IndexRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,20 @@ private uint InitializeMainIndexRecovery(ref IndexCheckpointInfo info, bool isAs
var token = info.info.token;
var ht_version = resizeInfo.version;

if (state[ht_version].size != info.info.table_size)
throw new FasterException($"Incompatible hash table size during recovery; allocated {state[ht_version].size} buckets, recovering {info.info.table_size} buckets");

// Create devices to read from using Async API
info.main_ht_device = checkpointManager.GetIndexDevice(token);
var sectorSize = info.main_ht_device.SectorSize;

if (state[ht_version].size != info.info.table_size)
{
Free(ht_version);
Initialize(info.info.table_size, (int)sectorSize);
}


BeginMainIndexRecovery(ht_version, info.main_ht_device, info.info.num_ht_bytes, isAsync);

var sectorSize = info.main_ht_device.SectorSize;

var alignedIndexSize = (uint)((info.info.num_ht_bytes + (sectorSize - 1)) & ~(sectorSize - 1));
return alignedIndexSize;
}
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public void GlobalBeforeEnteringState<Key, Value>(

faster.numPendingChunksToBeSplit = numChunks;
faster.splitStatus = new long[numChunks];

faster.overflowBucketsAllocatorResize = faster.overflowBucketsAllocator;
faster.overflowBucketsAllocator = new MallocFixedPageSize<HashBucket>(false);
faster.Initialize(1 - faster.resizeInfo.version, faster.state[faster.resizeInfo.version].size * 2, faster.sectorSize);

faster.resizeInfo.version = 1 - faster.resizeInfo.version;
Expand Down
73 changes: 73 additions & 0 deletions cs/test/RecoveryChecks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,5 +309,78 @@ public async ValueTask RecoveryCheck4([Values] CheckpointType checkpointType, [V
s2.CompletePending(true);
}
}

[Test]
public async ValueTask RecoveryCheck5([Values] CheckpointType checkpointType, [Values] bool isAsync, [Values] bool useReadCache, [Values(128, 1 << 10)] int size)
{
using var fht1 = new FasterKV<long, long>
(size,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 14, ReadCacheSettings = useReadCache ? new ReadCacheSettings() : null },
checkpointSettings: new CheckpointSettings { CheckpointDir = path }
);

using var s1 = fht1.NewSession(new MyFunctions());
for (long key = 0; key < 1000; key++)
{
s1.Upsert(ref key, ref key);
}

if (useReadCache)
{
fht1.Log.FlushAndEvict(true);
for (long key = 0; key < 1000; key++)
{
long output = default;
var status = s1.Read(ref key, ref output);
if (status != Status.PENDING)
Assert.IsTrue(status == Status.OK && output == key);
}
s1.CompletePending(true);
}

fht1.GrowIndex();

for (long key = 0; key < 1000; key++)
{
long output = default;
var status = s1.Read(ref key, ref output);
if (status != Status.PENDING)
Assert.IsTrue(status == Status.OK && output == key);
}
s1.CompletePending(true);

var task = fht1.TakeFullCheckpointAsync(checkpointType);

using var fht2 = new FasterKV<long, long>
(size,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20, ReadCacheSettings = useReadCache ? new ReadCacheSettings() : null },
checkpointSettings: new CheckpointSettings { CheckpointDir = path }
);

if (isAsync)
{
await task;
await fht2.RecoverAsync();
}
else
{
task.GetAwaiter().GetResult();
fht2.Recover();
}

Assert.IsTrue(fht1.Log.HeadAddress == fht2.Log.HeadAddress);
Assert.IsTrue(fht1.Log.ReadOnlyAddress == fht2.Log.ReadOnlyAddress);
Assert.IsTrue(fht1.Log.TailAddress == fht2.Log.TailAddress);

using var s2 = fht2.NewSession(new MyFunctions());
for (long key = 0; key < 1000; key++)
{
long output = default;
var status = s2.Read(ref key, ref output);
if (status != Status.PENDING)
Assert.IsTrue(status == Status.OK && output == key);
}
s2.CompletePending(true);
}
}
}