Skip to content

Commit

Permalink
[C#] Various Fixes (#635)
Browse files Browse the repository at this point in the history
* Fixes:
- Fixed WriteReason when CTT overrides ReadCache
- make DisableLocking a readonly member
- Call SetTentativeAtomic after Post*
- Make readcache and comparer private with internal accessors for test
- FIx intermittent failure of BasicHighLatencyDeviceTest

* back out changes to move from internal -> private for FKV.readcache and .comparer
  • Loading branch information
TedHartMS authored Jan 27, 2022
1 parent 54a8581 commit 5f260bf
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 65 deletions.
7 changes: 3 additions & 4 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

namespace FASTER.core
{

public partial class FasterKV<Key, Value> : FasterBase,
IFasterKV<Key, Value>
{
Expand All @@ -23,7 +22,7 @@ public partial class FasterKV<Key, Value> : FasterBase,
/// <summary>
/// Compares two keys
/// </summary>
internal protected readonly IFasterEqualityComparer<Key> comparer;
internal readonly IFasterEqualityComparer<Key> comparer;

internal readonly bool UseReadCache;
private readonly CopyReadsToTail CopyReadsToTail;
Expand Down Expand Up @@ -71,8 +70,8 @@ public partial class FasterKV<Key, Value> : FasterBase,

internal ConcurrentDictionary<string, CommitPoint> _recoveredSessions;

internal bool DisableLocking;
internal LockTable<Key> LockTable;
internal readonly bool DisableLocking;
internal readonly LockTable<Key> LockTable;
internal long NumActiveLockingSessions = 0;

internal void IncrementNumLockingSessions()
Expand Down
71 changes: 42 additions & 29 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ internal OperationStatus InternalRead<Input, Output, Context, FasterSession>(
if (CopyReadsToTail == CopyReadsToTail.FromReadOnly && !pendingContext.SkipCopyReadsToTail)
{
var container = hlog.GetValueContainer(ref hlog.GetValue(physicalAddress));
InternalTryCopyToTail(sessionCtx, ref pendingContext, ref key, ref input, ref container.Get(), ref output, logicalAddress, fasterSession, sessionCtx, WriteReason.Upsert);
InternalTryCopyToTail(sessionCtx, ref pendingContext, ref key, ref input, ref container.Get(), ref output, logicalAddress, fasterSession, sessionCtx, WriteReason.CopyToTail);
container.Dispose();
}
return OperationStatus.SUCCESS;
Expand Down Expand Up @@ -701,9 +701,9 @@ private OperationStatus CreateNewRecordUpsert<Input, Output, Context, FasterSess
recordInfo.CopyLocksFrom(hlog.GetInfo(unsealPhysicalAddress));
else if (LockTable.IsActive)
LockTable.TransferToLogRecord(ref key, ref recordInfo);
recordInfo.SetTentativeAtomic(false);

fasterSession.PostSingleWriter(ref key, ref input, ref value, ref newValue, ref output, ref recordInfo, newLogicalAddress, WriteReason.Upsert);
recordInfo.SetTentativeAtomic(false);
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
return OperationStatus.SUCCESS;
Expand Down Expand Up @@ -830,7 +830,7 @@ internal OperationStatus InternalRMW<Input, Output, Context, FasterSession>(

if (!recordInfo.Tombstone)
{
if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress, out bool lockFailed))
if (fasterSession.InPlaceUpdater(ref key, ref input, ref recordValue, ref output, ref recordInfo, logicalAddress, out bool lockFailed))
{
hlog.MarkPage(logicalAddress, sessionCtx.version);
pendingContext.recordInfo = recordInfo;
Expand Down Expand Up @@ -1175,7 +1175,6 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
recordInfo.CopyLocksFrom(hlog.GetInfo(unsealPhysicalAddress));
else if (LockTable.IsActive)
LockTable.TransferToLogRecord(ref key, ref recordInfo);
recordInfo.SetTentativeAtomic(false);

// If IU, status will be NOTFOUND; return that.
if (status != OperationStatus.SUCCESS)
Expand All @@ -1186,19 +1185,23 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
ref output, ref recordInfo, newLogicalAddress);
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
return status;
}

// Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op.
if (fasterSession.PostCopyUpdater(ref key,
ref input, ref hlog.GetValue(physicalAddress),
ref hlog.GetValue(newPhysicalAddress),
ref output, ref recordInfo, newLogicalAddress))
else
{
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
return status;
// Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op.
if (fasterSession.PostCopyUpdater(ref key,
ref input, ref hlog.GetValue(physicalAddress),
ref hlog.GetValue(newPhysicalAddress),
ref output, ref recordInfo, newLogicalAddress))
{
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
}
else
status = OperationStatus.RETRY_NOW;
}
recordInfo.SetTentativeAtomic(false);
return status;
}
else
{
Expand Down Expand Up @@ -1497,11 +1500,11 @@ internal OperationStatus InternalDelete<Input, Output, Context, FasterSession>(
recordInfo.CopyLocksFrom(hlog.GetInfo(unsealPhysicalAddress));
else if (LockTable.IsActive)
LockTable.TransferToLogRecord(ref key, ref recordInfo);
recordInfo.SetTentativeAtomic(false);

// Note that this is the new logicalAddress; we have not retrieved the old one if it was below HeadAddress, and thus
// we do not know whether 'logicalAddress' belongs to 'key' or is a collision.
fasterSession.PostSingleDeleter(ref key, ref recordInfo, newLogicalAddress);
recordInfo.SetTentativeAtomic(false);
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
status = OperationStatus.SUCCESS;
Expand Down Expand Up @@ -2022,7 +2025,6 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
{
if (LockTable.IsActive)
LockTable.TransferToLogRecord(ref key, ref recordInfo);
recordInfo.SetTentativeAtomic(false);

// If IU, status will be NOTFOUND; return that.
if (status != OperationStatus.SUCCESS)
Expand All @@ -2034,20 +2036,26 @@ ref hlog.GetValue(newPhysicalAddress),
ref pendingContext.output, ref recordInfo, newLogicalAddress);
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
return status;
}

// Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op.
if (fasterSession.PostCopyUpdater(ref key,
ref pendingContext.input.Get(),
ref hlog.GetContextRecordValue(ref request),
ref hlog.GetValue(newPhysicalAddress),
ref pendingContext.output, ref recordInfo, newLogicalAddress))
else
{
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
return status;

// Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op.
if (fasterSession.PostCopyUpdater(ref key,
ref pendingContext.input.Get(),
ref hlog.GetContextRecordValue(ref request),
ref hlog.GetValue(newPhysicalAddress),
ref pendingContext.output, ref recordInfo, newLogicalAddress))
{
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
}
else
status = OperationStatus.RETRY_NOW;
}

recordInfo.SetTentativeAtomic(false);
return status;
}
else
{
Expand Down Expand Up @@ -2474,7 +2482,7 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes
readcache.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key, ref input, ref value,
ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output,
ref recordInfo, Constants.kInvalidAddress, reason); // We do not expose readcache addresses
ref recordInfo, Constants.kInvalidAddress, WriteReason.CopyToReadCache); // We do not expose readcache addresses
}
else
{
Expand All @@ -2486,6 +2494,11 @@ ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref
tombstone: false, dirty: true,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);

// Reflect whether we overrode a readcache reason
if (reason == WriteReason.CopyToReadCache)
reason = WriteReason.CopyToTail;

fasterSession.SingleWriter(ref key, ref input, ref value,
ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output,
ref recordInfo, newLogicalAddress, reason);
Expand Down Expand Up @@ -2575,13 +2588,13 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref outp
ref RecordInfo recordInfo = ref log.GetInfo(newPhysicalAddress);
if (LockTable.IsActive)
LockTable.TransferToLogRecord(ref key, ref recordInfo);
recordInfo.SetTentativeAtomic(false);

pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = copyToReadCache ? Constants.kInvalidAddress /* We do not expose readcache addresses */ : newLogicalAddress;
fasterSession.PostSingleWriter(ref key, ref input, ref value,
ref log.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output,
ref recordInfo, pendingContext.logicalAddress, reason);
recordInfo.SetTentativeAtomic(false);
return OperationStatus.SUCCESS;
}
#endregion
Expand Down
2 changes: 1 addition & 1 deletion cs/test/DeviceFasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void BasicHighLatencyDeviceTest()
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);

// Create devices \ log for test for in memory device
using var device = new LocalMemoryDevice(1L << 28, 1L << 25, 2, latencyMs: 20);
using var device = new LocalMemoryDevice(1L << 28, 1L << 25, 2, latencyMs: 20, fileName: TestUtils.MethodTestDir + "/test.log");
using var LocalMemorylog = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 80, MemorySizeBits = 20, GetMemory = null, SegmentSizeBits = 80, MutableFraction = 0.2, LogCommitManager = null });

int entryLength = 10;
Expand Down
2 changes: 0 additions & 2 deletions cs/test/LockableUnsafeContextTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public long GetHashCode64(ref int k)

public enum ResultLockTarget { MutableLock, LockTable }

public enum ReadCopyDestination { Tail, ReadCache }

public enum FlushMode { NoFlush, ReadOnly, OnDisk }

public enum UpdateOp { Upsert, RMW, Delete }
Expand Down
2 changes: 1 addition & 1 deletion cs/test/LowMemAsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void Setup()
{
path = TestUtils.MethodTestDir;
TestUtils.DeleteDirectory(path, wait: true);
log = new LocalMemoryDevice(1L << 30, 1L << 25, 1, latencyMs: 20);
log = new LocalMemoryDevice(1L << 30, 1L << 25, 1, latencyMs: 20, fileName: path + "/test.log");
Directory.CreateDirectory(path);
fht1 = new FasterKV<long, long>
(1L << 10,
Expand Down
16 changes: 8 additions & 8 deletions cs/test/ReadCacheChainTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace FASTER.test.ReadCacheTests
{
Expand Down Expand Up @@ -101,7 +100,7 @@ void CreateChain(bool immutable = false)
Assert.AreEqual((immutable && key >= immutableSplitKey) ? Status.OK : Status.PENDING, status);
session.CompletePending(wait: true);
if (ii == 0)
readCacheHighEvictionAddress = fht.readcache.GetTailAddress();
readCacheHighEvictionAddress = fht.ReadCache.TailAddress;
}

// Pass2: non-PENDING reads from the cache
Expand Down Expand Up @@ -131,7 +130,7 @@ internal static unsafe (long logicalAddress, long physicalAddress) GetHashChain(
var bucket = default(HashBucket*);
var slot = default(int);

var hash = fht.comparer.GetHashCode64(ref key);
var hash = fht.Comparer.GetHashCode64(ref key);
var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);

var entry = default(HashBucketEntry);
Expand All @@ -155,6 +154,7 @@ internal static (long logicalAddress, long physicalAddress) NextInChain(FasterKV
var log = isReadCache ? fht.readcache : fht.hlog;
var info = log.GetInfo(physicalAddress);
var la = info.PreviousAddress;

isReadCache = new HashBucketEntry { word = la }.ReadCache;
log = isReadCache ? fht.readcache : fht.hlog;
var pa = log.GetPhysicalAddress(la);
Expand Down Expand Up @@ -199,9 +199,9 @@ internal static (long logicalAddress, long physicalAddress) NextInChain(FasterKV

internal static (long logicalAddress, long physicalAddress) SkipReadCacheChain(FasterKV<int, int> fht, int key)
{
var (la, pa) = ChainTests.GetHashChain(fht, key, out _, out _, out bool isReadCache);
var (la, pa) = GetHashChain(fht, key, out _, out _, out bool isReadCache);
while (isReadCache)
(la, pa) = ChainTests.NextInChain(fht, pa, out _, out _, ref isReadCache);
(la, pa) = NextInChain(fht, pa, out _, out _, ref isReadCache);
return (la, pa);
}

Expand Down Expand Up @@ -249,7 +249,7 @@ void doTest(int key)
doTest(midChainKey);
ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: false);

fht.ReadCacheEvict(fht.readcache.BeginAddress, readCacheHighEvictionAddress);
fht.ReadCacheEvict(fht.ReadCache.BeginAddress, readCacheHighEvictionAddress);
ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: true);
}

Expand Down Expand Up @@ -280,7 +280,7 @@ void doTest(int key)
Assert.IsTrue(isReadCache);
Assert.IsTrue(invalid);

fht.ReadCacheEvict(fht.readcache.BeginAddress, readCacheHighEvictionAddress);
fht.ReadCacheEvict(fht.ReadCache.BeginAddress, readCacheHighEvictionAddress);
_ = GetHashChain(lowChainKey, out actualKey, out invalid, out isReadCache);
Assert.IsFalse(isReadCache);
Assert.IsFalse(invalid);
Expand Down Expand Up @@ -338,7 +338,7 @@ void doTest(int key)
doTest(midChainKey);
ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: false);

fht.ReadCacheEvict(fht.readcache.BeginAddress, readCacheHighEvictionAddress);
fht.ReadCacheEvict(fht.ReadCache.BeginAddress, readCacheHighEvictionAddress);
ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: true);
}

Expand Down
Loading

0 comments on commit 5f260bf

Please sign in to comment.