From 182296c9f15d4ba0ebb9df924864100bef23b923 Mon Sep 17 00:00:00 2001
From: TedHartMS <15467143+TedHartMS@users.noreply.github.com>
Date: Tue, 9 Apr 2024 21:51:18 -0700
Subject: [PATCH] Fix ContinuePendingRead to account for new records for the
same key that were added and then went to dusk during the pending Read
operation.
---
cs/src/core/Index/Common/Contexts.cs | 12 +-
.../FASTER/Implementation/ContinuePending.cs | 45 ++++-
.../FASTER/Implementation/InternalRead.cs | 1 +
cs/test/CompletePendingTests.cs | 173 +++++++++++++++++-
4 files changed, 210 insertions(+), 21 deletions(-)
diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs
index 8c03df6db..72ec0d462 100644
--- a/cs/src/core/Index/Common/Contexts.cs
+++ b/cs/src/core/Index/Common/Contexts.cs
@@ -5,7 +5,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
-using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
@@ -156,6 +155,7 @@ internal struct PendingContext
internal const ushort kNoOpFlags = 0;
internal const ushort kNoKey = 0x0001;
internal const ushort kIsAsync = 0x0002;
+ internal const ushort kHadStartAddress = 0x0004;
internal ReadCopyOptions readCopyOptions;
@@ -216,6 +216,12 @@ internal bool IsAsync
set => operationFlags = value ? (ushort)(operationFlags | kIsAsync) : (ushort)(operationFlags & ~kIsAsync);
}
+ internal bool HadStartAddress
+ {
+ get => (operationFlags & kHadStartAddress) != 0;
+ set => operationFlags = value ? (ushort)(operationFlags | kHadStartAddress) : (ushort)(operationFlags & ~kHadStartAddress);
+ }
+
internal long InitialEntryAddress
{
get => recordInfo.PreviousAddress;
@@ -555,7 +561,7 @@ internal void Recover(Guid token, ICheckpointManager checkpointManager, DeltaLog
using StreamReader s = new(new MemoryStream(metadata));
Initialize(s);
}
-
+
///
/// Recover info from token
///
@@ -582,7 +588,7 @@ internal void Recover(Guid token, ICheckpointManager checkpointManager, out byte
deltaTailAddress = deltaLog.NextAddress;
}
var cookie = s.ReadToEnd();
- commitCookie = cookie.Length == 0 ? null : Convert.FromBase64String(cookie);
+ commitCookie = cookie.Length == 0 ? null : Convert.FromBase64String(cookie);
}
///
diff --git a/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs b/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs
index 2f0f29144..0cb775146 100644
--- a/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs
+++ b/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs
@@ -2,7 +2,6 @@
// Licensed under the MIT license.
using System.Diagnostics;
-using System.Runtime.CompilerServices;
namespace FASTER.core
{
@@ -48,16 +47,42 @@ internal OperationStatus ContinuePendingRead InitialLatestLogicalAddress, then it means InitialLatestLogicalAddress is
+ // now below HeadAddress and there is at least one record below HeadAddress but above InitialLatestLogicalAddress. Reissue the Read(),
+ // using the LogicalAddress we just found as minAddress. We will either find an in-memory version of the key that was added after the
+ // TryFindRecordInMemory we just did, or do IO and find the record we just found or one above it. Read() updates InitialLatestLogicalAddress,
+ // so if we do IO, the next time we come to CompletePendingRead we will only search for a newer version of the key in any records added
+ // after our just-completed TryFindRecordInMemory.
+ if (stackCtx.recSrc.LogicalAddress > pendingContext.InitialLatestLogicalAddress
+ && (!pendingContext.HasMinAddress || stackCtx.recSrc.LogicalAddress >= pendingContext.minAddress))
+ {
+ OperationStatus internalStatus;
+ do
+ {
+ internalStatus = InternalRead(ref key, pendingContext.keyHash, ref pendingContext.input.Get(), ref pendingContext.output,
+ startAddress: Constants.kInvalidAddress, ref pendingContext.userContext, ref pendingContext, fasterSession, pendingContext.serialNum);
+ }
+ while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pendingContext));
+ return internalStatus;
+ }
+ }
}
if (!TryTransientSLock(fasterSession, ref key, ref stackCtx, out var status))
@@ -193,7 +218,7 @@ internal OperationStatus ContinuePendingRMW InitialLatestLogicalAddress, then it means InitialLatestLogicalAddress is
// now below HeadAddress and there is at least one record below HeadAddress but above InitialLatestLogicalAddress. We must do InternalRMW.
if (stackCtx.recSrc.LogicalAddress > pendingContext.InitialLatestLogicalAddress)
- {
+ {
Debug.Assert(pendingContext.InitialLatestLogicalAddress < hlog.HeadAddress, "Failed to search all in-memory records");
break;
}
@@ -217,7 +242,7 @@ internal OperationStatus ContinuePendingRMW(fasterSession, ref key, ref stackCtx);
}
- // Must do this *after* Unlocking. Retries should drop down to InternalRMW
+ // Must do this *after* Unlocking. Retries should drop down to InternalRMW
CheckRetry:
if (!HandleImmediateRetryStatus(status, fasterSession, ref pendingContext))
return status;
diff --git a/cs/src/core/Index/FASTER/Implementation/InternalRead.cs b/cs/src/core/Index/FASTER/Implementation/InternalRead.cs
index 8f63fe7f5..4050d2295 100644
--- a/cs/src/core/Index/FASTER/Implementation/InternalRead.cs
+++ b/cs/src/core/Index/FASTER/Implementation/InternalRead.cs
@@ -66,6 +66,7 @@ internal OperationStatus InternalRead(ref
{
if (startAddress < hlog.BeginAddress)
return OperationStatus.NOTFOUND;
+ pendingContext.HadStartAddress = true;
stackCtx.hei.entry.Address = startAddress;
}
diff --git a/cs/test/CompletePendingTests.cs b/cs/test/CompletePendingTests.cs
index c250f8a67..660271d3e 100644
--- a/cs/test/CompletePendingTests.cs
+++ b/cs/test/CompletePendingTests.cs
@@ -1,27 +1,45 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
-using System.Collections.Generic;
-using System.Threading.Tasks;
using FASTER.core;
using NUnit.Framework;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using static FASTER.test.TestUtils;
namespace FASTER.test
{
+ public struct LocalKeyStructComparer : IFasterEqualityComparer
+ {
+ internal long? forceCollisionHash;
+
+ public long GetHashCode64(ref KeyStruct key)
+ {
+ return forceCollisionHash.HasValue ? forceCollisionHash.Value : Utility.GetHashCode(key.kfield1);
+ }
+ public bool Equals(ref KeyStruct k1, ref KeyStruct k2)
+ {
+ return k1.kfield1 == k2.kfield1 && k1.kfield2 == k2.kfield2;
+ }
+
+ public override string ToString() => $"forceHashCollision: {forceCollisionHash}";
+ }
+
[TestFixture]
class CompletePendingTests
{
private FasterKV fht;
private IDevice log;
+ LocalKeyStructComparer comparer = new();
[SetUp]
public void Setup()
{
// Clean up log files from previous test runs in case they weren't cleaned up
- TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait:true);
+ DeleteDirectory(MethodTestDir, wait: true);
- log = Devices.CreateLogDevice($"{TestUtils.MethodTestDir}/CompletePendingTests.log", preallocateFile: true, deleteOnClose: true);
- fht = new FasterKV(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 });
+ log = Devices.CreateLogDevice($"{MethodTestDir}/CompletePendingTests.log", preallocateFile: true, deleteOnClose: true);
+ fht = new FasterKV(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 }, comparer: comparer);
}
[TearDown]
@@ -31,7 +49,7 @@ public void TearDown()
fht = null;
log?.Dispose();
log = null;
- TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
+ DeleteDirectory(MethodTestDir, wait: true);
}
const int numRecords = 1000;
@@ -39,7 +57,7 @@ public void TearDown()
static KeyStruct NewKeyStruct(int key) => new() { kfield1 = key, kfield2 = key + numRecords * 10 };
static ValueStruct NewValueStruct(int key) => new() { vfield1 = key, vfield2 = key + numRecords * 10 };
- static InputStruct NewInputStruct(int key) => new(){ ifield1 = key + numRecords * 30, ifield2 = key + numRecords * 40 };
+ static InputStruct NewInputStruct(int key) => new() { ifield1 = key + numRecords * 30, ifield2 = key + numRecords * 40 };
static ContextStruct NewContextStruct(int key) => new() { cfield1 = key + numRecords * 50, cfield2 = key + numRecords * 60 };
static void VerifyStructs(int key, ref KeyStruct keyStruct, ref InputStruct inputStruct, ref OutputStruct outputStruct, ref ContextStruct contextStruct, bool useRMW)
@@ -126,7 +144,7 @@ internal static void VerifyOneNotFound(CompletedOutputIterator()).NewSession>();
Assert.IsNull(session.completedOutputs); // Do not instantiate until we need it
@@ -214,5 +232,144 @@ public async ValueTask ReadAndCompleteWithPendingOutput([Values]bool useRMW, [Va
Assert.AreEqual(address, recordMetadata.Address);
}
}
+
+ public enum StartAddressMode
+ {
+ UseStartAddress,
+ NoStartAddress
+ }
+
+ public class PendingReadFunctions : FunctionsBase
+ {
+ public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct input, ref OutputStruct output, Empty ctx, Status status, RecordMetadata recordMetadata)
+ {
+ Assert.IsTrue(status.Found);
+ Assert.AreEqual(key.kfield1, output.value.vfield1);
+ // Do not compare field2; that's our updated value, and the key won't be found if we change kfield2
+ }
+
+ // Read functions
+ public override bool SingleReader(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct dst, ref ReadInfo readInfo)
+ {
+ Assert.IsFalse(readInfo.RecordInfo.IsNull());
+ dst.value = value;
+ return true;
+ }
+
+ public override bool ConcurrentReader(ref KeyStruct key, ref InputStruct input, ref ValueStruct value, ref OutputStruct dst, ref ReadInfo readInfo)
+ => SingleReader(ref key, ref input, ref value, ref dst, ref readInfo);
+ }
+
+ [Test]
+ [Category("FasterKV")]
+ public void ReadPendingWithNewSameKey([Values] StartAddressMode startAddressMode, [Values(FlushMode.NoFlush, FlushMode.OnDisk)] FlushMode secondRecordFlushMode)
+ {
+ const int valueMult = 1000;
+
+ using var session = fht.For(new PendingReadFunctions()).NewSession>();
+
+ // Store off startAddress before initial upsert
+ var startAddress = startAddressMode == StartAddressMode.UseStartAddress ? fht.Log.TailAddress : Constants.kInvalidAddress;
+
+ // Insert first record
+ var firstValue = 0; // same as key
+ var keyStruct = new KeyStruct { kfield1 = firstValue, kfield2 = firstValue * valueMult };
+ var valueStruct = new ValueStruct { vfield1 = firstValue, vfield2 = firstValue * valueMult };
+ session.Upsert(ref keyStruct, ref valueStruct);
+
+ // Flush to make the Read() go pending.
+ fht.Log.FlushAndEvict(wait: true);
+
+ ReadOptions readOptions = new() { StartAddress = startAddress };
+ var (status, outputStruct) = session.Read(keyStruct, ref readOptions);
+ Assert.IsTrue(status.IsPending, $"Expected status.IsPending: {status}");
+
+ // Insert next record with the same key and flush this too if requested.
+ var secondValue = firstValue + 1;
+ valueStruct.vfield2 = secondValue * valueMult;
+ session.Upsert(ref keyStruct, ref valueStruct);
+ if (secondRecordFlushMode == FlushMode.OnDisk)
+ fht.Log.FlushAndEvict(wait: true);
+
+ session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
+ (status, outputStruct) = GetSinglePendingResult(completedOutputs);
+
+ if (startAddressMode == StartAddressMode.UseStartAddress)
+ Assert.AreEqual(firstValue * valueMult, outputStruct.value.vfield2, "UseStartAddress should have returned first value");
+ else
+ Assert.AreEqual(secondValue * valueMult, outputStruct.value.vfield2, "NoStartAddress should have returned second value");
+ }
+
+ [Test]
+ [Category("FasterKV")]
+ public void ReadPendingWithNewDifferentKeyInChain([Values] StartAddressMode startAddressMode, [Values(FlushMode.NoFlush, FlushMode.OnDisk)] FlushMode secondRecordFlushMode)
+ {
+ const int valueMult = 1000;
+
+ using var session = fht.For(new PendingReadFunctions()).NewSession>();
+
+ // Store off startAddress before initial upsert
+ var startAddress = startAddressMode == StartAddressMode.UseStartAddress ? fht.Log.TailAddress : Constants.kInvalidAddress;
+
+ // Insert first record
+ var firstValue = 0; // same as key
+ var keyStruct = new KeyStruct { kfield1 = firstValue, kfield2 = firstValue * valueMult };
+ var valueStruct = new ValueStruct { vfield1 = firstValue, vfield2 = firstValue * valueMult };
+ session.Upsert(ref keyStruct, ref valueStruct);
+
+ // Force collisions to test having another key in the chain
+ comparer.forceCollisionHash = keyStruct.GetHashCode64(ref keyStruct);
+
+ // Flush to make the Read() go pending.
+ fht.Log.FlushAndEvict(wait: true);
+
+ ReadOptions readOptions = new() { StartAddress = startAddress };
+ var (status, outputStruct) = session.Read(keyStruct, ref readOptions);
+ Assert.IsTrue(status.IsPending, $"Expected status.IsPending: {status}");
+
+ // Insert next record with a different key and flush this too if requested.
+ var secondValue = firstValue + 1;
+ keyStruct = new() { kfield1 = secondValue, kfield2 = secondValue * valueMult };
+ valueStruct = new() { vfield1 = secondValue, vfield2 = secondValue * valueMult };
+ session.Upsert(ref keyStruct, ref valueStruct);
+ if (secondRecordFlushMode == FlushMode.OnDisk)
+ fht.Log.FlushAndEvict(wait: true);
+
+ session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
+ (status, outputStruct) = GetSinglePendingResult(completedOutputs);
+
+ Assert.AreEqual(firstValue * valueMult, outputStruct.value.vfield2, "Should have returned first value");
+ }
+
+ [Test]
+ [Category("FasterKV")]
+ public void ReadPendingWithNoNewKey([Values] StartAddressMode startAddressMode)
+ {
+ // Basic test of pending read
+ const int valueMult = 1000;
+
+ using var session = fht.For(new PendingReadFunctions()).NewSession>();
+
+ // Store off startAddress before initial upsert
+ var startAddress = startAddressMode == StartAddressMode.UseStartAddress ? fht.Log.TailAddress : Constants.kInvalidAddress;
+
+ // Insert first record
+ var firstValue = 0; // same as key
+ var keyStruct = new KeyStruct { kfield1 = firstValue, kfield2 = firstValue * valueMult };
+ var valueStruct = new ValueStruct { vfield1 = firstValue, vfield2 = firstValue * valueMult };
+ session.Upsert(ref keyStruct, ref valueStruct);
+
+ // Flush to make the Read() go pending.
+ fht.Log.FlushAndEvict(wait: true);
+
+ ReadOptions readOptions = new() { StartAddress = startAddress };
+ var (status, outputStruct) = session.Read(keyStruct, ref readOptions);
+ Assert.IsTrue(status.IsPending, $"Expected status.IsPending: {status}");
+
+ session.CompletePendingWithOutputs(out var completedOutputs, wait: true);
+ (status, outputStruct) = GetSinglePendingResult(completedOutputs);
+
+ Assert.AreEqual(firstValue * valueMult, outputStruct.value.vfield2, "Should have returned first value");
+ }
}
}