Skip to content

Commit

Permalink
Added TentativeHeadAddress - we first shift this, and when all thread…
Browse files Browse the repository at this point in the history
…s agree, we can do some pre-closing work such as eviction. Then we can shift the actual HeadAddress, closing pages when all threads agree on this. (#608)
  • Loading branch information
badrishc authored Dec 8, 2021
1 parent 1464025 commit 640b3f7
Showing 1 changed file with 30 additions and 33 deletions.
63 changes: 30 additions & 33 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ public abstract partial class AllocatorBase<Key, Value> : IDisposable
/// </summary>
public long SafeHeadAddress;

/// <summary>
/// Tentative head address. Threads do not take any record locks earlier than this point.
/// Records earlier than this address undergo eviction, before HeadAddress is moved.
/// </summary>
public long TentativeHeadAddress;

/// <summary>
/// Flushed until address
/// </summary>
Expand Down Expand Up @@ -1235,7 +1241,7 @@ internal virtual bool TryComplete()
/// Flush: send page to secondary store
/// </summary>
/// <param name="newSafeReadOnlyAddress"></param>
public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress)
private void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress)
{
if (Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newSafeReadOnlyAddress, out long oldSafeReadOnlyAddress))
{
Expand All @@ -1249,17 +1255,31 @@ public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress)
}
}

/// <summary>
/// Action to be performed when all threads agree that
/// a page range is ready to close.
/// </summary>
private void OnPagesReadyToClose(long oldHeadAddress, long newHeadAddress)
{
if (ReadCache && (newHeadAddress > HeadAddress))
EvictCallback(HeadAddress, newHeadAddress);

if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out oldHeadAddress))
{
Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress);
epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress));
}
}

/// <summary>
/// Action to be performed for when all threads have
/// agreed that a page range is closed.
/// </summary>
/// <param name="newSafeHeadAddress"></param>
public void OnPagesClosed(long newSafeHeadAddress)
private void OnPagesClosed(long newSafeHeadAddress)
{
if (Utility.MonotonicUpdate(ref SafeHeadAddress, newSafeHeadAddress, out long oldSafeHeadAddress))
{
Debug.WriteLine("SafeHeadOffset shifted from {0:X} to {1:X}", oldSafeHeadAddress, newSafeHeadAddress);

// Also shift begin address if we are using a null storage device
if (IsNullDevice)
Utility.MonotonicUpdate(ref BeginAddress, newSafeHeadAddress, out _);
Expand Down Expand Up @@ -1298,12 +1318,14 @@ internal void DebugPrintAddresses(long closePageAddress)
var _readonly = ReadOnlyAddress;
var _safereadonly = SafeReadOnlyAddress;
var _tail = GetTailAddress();
var _thead = TentativeHeadAddress;
var _head = HeadAddress;
var _safehead = SafeHeadAddress;

Console.WriteLine("ClosePageAddress: {0}.{1}", GetPage(closePageAddress), GetOffsetInPage(closePageAddress));
Console.WriteLine("FlushedUntil: {0}.{1}", GetPage(_flush), GetOffsetInPage(_flush));
Console.WriteLine("Tail: {0}.{1}", GetPage(_tail), GetOffsetInPage(_tail));
Console.WriteLine("TentativeHead: {0}.{1}", GetPage(_thead), GetOffsetInPage(_thead));
Console.WriteLine("Head: {0}.{1}", GetPage(_head), GetOffsetInPage(_head));
Console.WriteLine("SafeHead: {0}.{1}", GetPage(_safehead), GetOffsetInPage(_safehead));
Console.WriteLine("ReadOnly: {0}.{1}", GetPage(_readonly), GetOffsetInPage(_readonly));
Expand Down Expand Up @@ -1334,29 +1356,7 @@ private void PageAlignedShiftReadOnlyAddress(long currentTailAddress)
/// </summary>
/// <param name="currentTailAddress"></param>
private void PageAlignedShiftHeadAddress(long currentTailAddress)
{
//obtain local values of variables that can change
long currentHeadAddress = HeadAddress;
long currentFlushedUntilAddress = FlushedUntilAddress;
long pageAlignedTailAddress = currentTailAddress & ~PageSizeMask;
long desiredHeadAddress = (pageAlignedTailAddress - HeadOffsetLagAddress);

long newHeadAddress = desiredHeadAddress;
if (currentFlushedUntilAddress < newHeadAddress)
{
newHeadAddress = currentFlushedUntilAddress;
}
newHeadAddress &= ~PageSizeMask;

if (ReadCache && (newHeadAddress > HeadAddress))
EvictCallback(HeadAddress, newHeadAddress);

if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress))
{
Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress);
epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress));
}
}
=> ShiftHeadAddress((currentTailAddress & ~PageSizeMask) - HeadOffsetLagAddress);

/// <summary>
/// Tries to shift head address to specified value
Expand All @@ -1373,14 +1373,11 @@ public long ShiftHeadAddress(long desiredHeadAddress)
newHeadAddress = currentFlushedUntilAddress;
}

if (ReadCache && (newHeadAddress > HeadAddress))
EvictCallback(HeadAddress, newHeadAddress);

if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress))
if (Utility.MonotonicUpdate(ref TentativeHeadAddress, newHeadAddress, out long oldTentativeHeadAddress))
{
Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress);
epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress));
epoch.BumpCurrentEpoch(() => OnPagesReadyToClose(oldTentativeHeadAddress, newHeadAddress));
}

return newHeadAddress;
}

Expand Down

0 comments on commit 640b3f7

Please sign in to comment.