Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Add TentativeHeadAddress to allocator (v2) #608

Merged
merged 1 commit into from
Dec 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 30 additions & 33 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ public abstract partial class AllocatorBase<Key, Value> : IDisposable
/// </summary>
public long SafeHeadAddress;

/// <summary>
/// Tentative head address. Threads do not take any record locks earlier than this point.
/// Records earlier than this address undergo eviction, before HeadAddress is moved.
/// </summary>
public long TentativeHeadAddress;

/// <summary>
/// Flushed until address
/// </summary>
Expand Down Expand Up @@ -1235,7 +1241,7 @@ internal virtual bool TryComplete()
/// Flush: send page to secondary store
/// </summary>
/// <param name="newSafeReadOnlyAddress"></param>
public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress)
private void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress)
{
if (Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newSafeReadOnlyAddress, out long oldSafeReadOnlyAddress))
{
Expand All @@ -1249,17 +1255,31 @@ public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress)
}
}

/// <summary>
/// Action to be performed when all threads agree that
/// a page range is ready to close.
/// </summary>
private void OnPagesReadyToClose(long oldHeadAddress, long newHeadAddress)
{
if (ReadCache && (newHeadAddress > HeadAddress))
EvictCallback(HeadAddress, newHeadAddress);

if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out oldHeadAddress))
{
Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress);
epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress));
}
}

/// <summary>
/// Action to be performed for when all threads have
/// agreed that a page range is closed.
/// </summary>
/// <param name="newSafeHeadAddress"></param>
public void OnPagesClosed(long newSafeHeadAddress)
private void OnPagesClosed(long newSafeHeadAddress)
{
if (Utility.MonotonicUpdate(ref SafeHeadAddress, newSafeHeadAddress, out long oldSafeHeadAddress))
{
Debug.WriteLine("SafeHeadOffset shifted from {0:X} to {1:X}", oldSafeHeadAddress, newSafeHeadAddress);

// Also shift begin address if we are using a null storage device
if (IsNullDevice)
Utility.MonotonicUpdate(ref BeginAddress, newSafeHeadAddress, out _);
Expand Down Expand Up @@ -1298,12 +1318,14 @@ internal void DebugPrintAddresses(long closePageAddress)
var _readonly = ReadOnlyAddress;
var _safereadonly = SafeReadOnlyAddress;
var _tail = GetTailAddress();
var _thead = TentativeHeadAddress;
var _head = HeadAddress;
var _safehead = SafeHeadAddress;

Console.WriteLine("ClosePageAddress: {0}.{1}", GetPage(closePageAddress), GetOffsetInPage(closePageAddress));
Console.WriteLine("FlushedUntil: {0}.{1}", GetPage(_flush), GetOffsetInPage(_flush));
Console.WriteLine("Tail: {0}.{1}", GetPage(_tail), GetOffsetInPage(_tail));
Console.WriteLine("TentativeHead: {0}.{1}", GetPage(_thead), GetOffsetInPage(_thead));
Console.WriteLine("Head: {0}.{1}", GetPage(_head), GetOffsetInPage(_head));
Console.WriteLine("SafeHead: {0}.{1}", GetPage(_safehead), GetOffsetInPage(_safehead));
Console.WriteLine("ReadOnly: {0}.{1}", GetPage(_readonly), GetOffsetInPage(_readonly));
Expand Down Expand Up @@ -1334,29 +1356,7 @@ private void PageAlignedShiftReadOnlyAddress(long currentTailAddress)
/// </summary>
/// <param name="currentTailAddress"></param>
private void PageAlignedShiftHeadAddress(long currentTailAddress)
{
//obtain local values of variables that can change
long currentHeadAddress = HeadAddress;
long currentFlushedUntilAddress = FlushedUntilAddress;
long pageAlignedTailAddress = currentTailAddress & ~PageSizeMask;
long desiredHeadAddress = (pageAlignedTailAddress - HeadOffsetLagAddress);

long newHeadAddress = desiredHeadAddress;
if (currentFlushedUntilAddress < newHeadAddress)
{
newHeadAddress = currentFlushedUntilAddress;
}
newHeadAddress &= ~PageSizeMask;

badrishc marked this conversation as resolved.
Show resolved Hide resolved
if (ReadCache && (newHeadAddress > HeadAddress))
EvictCallback(HeadAddress, newHeadAddress);

if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress))
{
Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress);
epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress));
}
}
=> ShiftHeadAddress((currentTailAddress & ~PageSizeMask) - HeadOffsetLagAddress);

/// <summary>
/// Tries to shift head address to specified value
Expand All @@ -1373,14 +1373,11 @@ public long ShiftHeadAddress(long desiredHeadAddress)
newHeadAddress = currentFlushedUntilAddress;
}

if (ReadCache && (newHeadAddress > HeadAddress))
EvictCallback(HeadAddress, newHeadAddress);

if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out long oldHeadAddress))
if (Utility.MonotonicUpdate(ref TentativeHeadAddress, newHeadAddress, out long oldTentativeHeadAddress))
{
Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress);
epoch.BumpCurrentEpoch(() => OnPagesClosed(newHeadAddress));
epoch.BumpCurrentEpoch(() => OnPagesReadyToClose(oldTentativeHeadAddress, newHeadAddress));
}

return newHeadAddress;
}

Expand Down