Skip to content

Commit

Permalink
[C#] Recovery improvements (#330)
Browse files Browse the repository at this point in the history
* Avoid writing back untouched pages during recovery
* Allow calling recover multple times with newer hlog checkpoints
* Fix bug in index fast-forward when using snapshot checkpoints with separate index checkpoints
* Recovery from snapshot leaves the log mutable after recovery
* Avoid rescanning the log during recovery - only single scan of the log is now needed
* Only write back pages that have records that need to be marked as Invalid
* Do not write back from snapshot file to main log during recovery as it should remain as mutable unpersisted state.
* Code cleanup
* Adding testcases for recovery combinations
  • Loading branch information
badrishc authored Sep 16, 2020
1 parent e61e9be commit 12aa82f
Show file tree
Hide file tree
Showing 11 changed files with 482 additions and 224 deletions.
9 changes: 5 additions & 4 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,8 @@ protected void ShiftClosedUntilAddress()
/// <param name="tailAddress"></param>
/// <param name="headAddress"></param>
/// <param name="beginAddress"></param>
public void RecoveryReset(long tailAddress, long headAddress, long beginAddress)
/// <param name="readonlyAddress"></param>
public void RecoveryReset(long tailAddress, long headAddress, long beginAddress, long readonlyAddress)
{
long tailPage = GetPage(tailAddress);
long offsetInPage = GetOffsetInPage(tailAddress);
Expand All @@ -1136,9 +1137,9 @@ public void RecoveryReset(long tailAddress, long headAddress, long beginAddress)
HeadAddress = headAddress;
SafeHeadAddress = headAddress;
ClosedUntilAddress = headAddress;
FlushedUntilAddress = tailAddress;
ReadOnlyAddress = tailAddress;
SafeReadOnlyAddress = tailAddress;
FlushedUntilAddress = readonlyAddress;
ReadOnlyAddress = readonlyAddress;
SafeReadOnlyAddress = readonlyAddress;

// for the last page which contains tailoffset, it must be open
pageIndex = GetPageIndexForAddress(tailAddress);
Expand Down
23 changes: 10 additions & 13 deletions cs/src/core/Index/FASTER/FASTERBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,6 @@ public unsafe partial class FasterBase
// Used as an atomic counter to check if resizing is complete
internal long numPendingChunksToBeSplit;

// Epoch set for resizing
internal int resizeEpoch;

internal LightEpoch epoch;

internal ResizeInfo resizeInfo;
Expand Down Expand Up @@ -309,7 +306,7 @@ public void Initialize(long size, int sector_size)
}

minTableSize = size;
resizeInfo = default(ResizeInfo);
resizeInfo = default;
resizeInfo.status = ResizeOperationStatus.DONE;
resizeInfo.version = 0;
Initialize(resizeInfo.version, size, sector_size);
Expand Down Expand Up @@ -385,7 +382,7 @@ internal bool FindTag(long hash, ushort tag, ref HashBucket* bucket, ref int slo

if (target_entry_word == 0)
{
entry = default(HashBucketEntry);
entry = default;
return false;
}
bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(target_entry_word);
Expand All @@ -408,7 +405,7 @@ internal void FindOrCreateTag(long hash, ushort tag, ref HashBucket* bucket, ref


// Install tentative tag in free slot
entry = default(HashBucketEntry);
entry = default;
entry.Tag = tag;
entry.Address = Constants.kTempInvalidAddress;
entry.Pending = false;
Expand Down Expand Up @@ -451,7 +448,7 @@ private bool FindTagInternal(long hash, ushort tag, ref HashBucket* bucket, ref
continue;
}

HashBucketEntry entry = default(HashBucketEntry);
HashBucketEntry entry = default;
entry.word = target_entry_word;
if (tag == entry.Tag)
{
Expand Down Expand Up @@ -491,7 +488,7 @@ private bool FindTagMaybeTentativeInternal(long hash, ushort tag, ref HashBucket
continue;
}

HashBucketEntry entry = default(HashBucketEntry);
HashBucketEntry entry = default;
entry.word = target_entry_word;
if (tag == entry.Tag)
{
Expand Down Expand Up @@ -599,7 +596,7 @@ private bool FindTagOrFreeInternal(long hash, ushort tag, ref HashBucket* bucket
// Install succeeded
bucket = physicalBucketAddress;
slot = 0;
entry = default(HashBucketEntry);
entry = default;
return recordExists;
}
}
Expand All @@ -609,7 +606,7 @@ private bool FindTagOrFreeInternal(long hash, ushort tag, ref HashBucket* bucket
{
bucket = entry_slot_bucket;
}
entry = default(HashBucketEntry);
entry = default;
break;
}
}
Expand Down Expand Up @@ -650,7 +647,7 @@ private bool FindOtherTagMaybeTentativeInternal(long hash, ushort tag, ref HashB
continue;
}

HashBucketEntry entry = default(HashBucketEntry);
HashBucketEntry entry = default;
entry.word = target_entry_word;
if (tag == entry.Tag)
{
Expand Down Expand Up @@ -726,7 +723,7 @@ protected virtual long GetEntryCount()
///
/// </summary>
/// <param name="version"></param>
protected virtual string _DumpDistribution(int version)
protected virtual string DumpDistributionInternal(int version)
{
var table_size_ = state[version].size;
var ptable_ = state[version].tableAligned;
Expand Down Expand Up @@ -779,7 +776,7 @@ protected virtual string _DumpDistribution(int version)
/// </summary>
public string DumpDistribution()
{
return _DumpDistribution(resizeInfo.version);
return DumpDistributionInternal(resizeInfo.version);
}

}
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ private void Restore(out Dictionary<string, long> recoveredIterators)

recoveredIterators = info.Iterators;

allocator.RestoreHybridLog(info.FlushedUntilAddress, headAddress, info.BeginAddress);
allocator.RestoreHybridLog(info.BeginAddress, headAddress, info.FlushedUntilAddress, info.FlushedUntilAddress);
CommittedUntilAddress = info.FlushedUntilAddress;
CommittedBeginAddress = info.BeginAddress;
SafeTailAddress = info.FlushedUntilAddress;
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/Interfaces/NullFasterSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
struct NullFasterSession : IFasterSession
{
public static readonly NullFasterSession Instance;
public static readonly NullFasterSession Instance = new NullFasterSession();

public void CheckpointCompletionCallback(string guid, CommitPoint commitPoint)
{
Expand Down
10 changes: 6 additions & 4 deletions cs/src/core/Index/Recovery/IndexRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ internal bool IsFuzzyIndexRecoveryComplete(bool waitUntilComplete = false)
return completed1 && completed2;
}

//Main Index Recovery Functions
private CountdownEvent mainIndexRecoveryEvent;
/// <summary>
/// Main Index Recovery Functions
/// </summary>
protected CountdownEvent mainIndexRecoveryEvent;

private void BeginMainIndexRecovery(
int version,
Expand All @@ -93,7 +95,7 @@ private void BeginMainIndexRecovery(
for (int index = 0; index < numChunks; index++)
{
long chunkStartBucket = (long)start + (index * chunkSize);
HashIndexPageAsyncReadResult result = default(HashIndexPageAsyncReadResult);
HashIndexPageAsyncReadResult result = default;
result.chunkIndex = index;
device.ReadAsync(numBytesRead, (IntPtr)chunkStartBucket, chunkSize, AsyncPageReadCallback, result);
numBytesRead += chunkSize;
Expand Down Expand Up @@ -124,7 +126,7 @@ private unsafe void AsyncPageReadCallback(uint errorCode, uint numBytes, object

internal void DeleteTentativeEntries()
{
HashBucketEntry entry = default(HashBucketEntry);
HashBucketEntry entry = default;

int version = resizeInfo.version;
var table_size_ = state[version].size;
Expand Down
Loading

0 comments on commit 12aa82f

Please sign in to comment.