Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Various Fixes #635

Merged
merged 2 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

namespace FASTER.core
{

public partial class FasterKV<Key, Value> : FasterBase,
IFasterKV<Key, Value>
{
Expand All @@ -23,7 +22,7 @@ public partial class FasterKV<Key, Value> : FasterBase,
/// <summary>
/// Compares two keys
/// </summary>
internal protected readonly IFasterEqualityComparer<Key> comparer;
internal readonly IFasterEqualityComparer<Key> comparer;

internal readonly bool UseReadCache;
private readonly CopyReadsToTail CopyReadsToTail;
Expand Down Expand Up @@ -71,8 +70,8 @@ public partial class FasterKV<Key, Value> : FasterBase,

internal ConcurrentDictionary<string, CommitPoint> _recoveredSessions;

internal bool DisableLocking;
internal LockTable<Key> LockTable;
internal readonly bool DisableLocking;
internal readonly LockTable<Key> LockTable;
internal long NumActiveLockingSessions = 0;

internal void IncrementNumLockingSessions()
Expand Down
71 changes: 42 additions & 29 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ internal OperationStatus InternalRead<Input, Output, Context, FasterSession>(
if (CopyReadsToTail == CopyReadsToTail.FromReadOnly && !pendingContext.SkipCopyReadsToTail)
{
var container = hlog.GetValueContainer(ref hlog.GetValue(physicalAddress));
InternalTryCopyToTail(sessionCtx, ref pendingContext, ref key, ref input, ref container.Get(), ref output, logicalAddress, fasterSession, sessionCtx, WriteReason.Upsert);
InternalTryCopyToTail(sessionCtx, ref pendingContext, ref key, ref input, ref container.Get(), ref output, logicalAddress, fasterSession, sessionCtx, WriteReason.CopyToTail);
container.Dispose();
}
return OperationStatus.SUCCESS;
Expand Down Expand Up @@ -701,9 +701,9 @@ private OperationStatus CreateNewRecordUpsert<Input, Output, Context, FasterSess
recordInfo.CopyLocksFrom(hlog.GetInfo(unsealPhysicalAddress));
else if (LockTable.IsActive)
LockTable.TransferToLogRecord(ref key, ref recordInfo);
recordInfo.SetTentativeAtomic(false);

fasterSession.PostSingleWriter(ref key, ref input, ref value, ref newValue, ref output, ref recordInfo, newLogicalAddress, WriteReason.Upsert);
recordInfo.SetTentativeAtomic(false);
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
return OperationStatus.SUCCESS;
Expand Down Expand Up @@ -830,7 +830,7 @@ internal OperationStatus InternalRMW<Input, Output, Context, FasterSession>(

if (!recordInfo.Tombstone)
{
if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress, out bool lockFailed))
if (fasterSession.InPlaceUpdater(ref key, ref input, ref recordValue, ref output, ref recordInfo, logicalAddress, out bool lockFailed))
{
hlog.MarkPage(logicalAddress, sessionCtx.version);
pendingContext.recordInfo = recordInfo;
Expand Down Expand Up @@ -1175,7 +1175,6 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
recordInfo.CopyLocksFrom(hlog.GetInfo(unsealPhysicalAddress));
else if (LockTable.IsActive)
LockTable.TransferToLogRecord(ref key, ref recordInfo);
recordInfo.SetTentativeAtomic(false);

// If IU, status will be NOTFOUND; return that.
if (status != OperationStatus.SUCCESS)
Expand All @@ -1186,19 +1185,23 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
ref output, ref recordInfo, newLogicalAddress);
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
return status;
}

// Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op.
if (fasterSession.PostCopyUpdater(ref key,
ref input, ref hlog.GetValue(physicalAddress),
ref hlog.GetValue(newPhysicalAddress),
ref output, ref recordInfo, newLogicalAddress))
else
{
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
return status;
// Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op.
if (fasterSession.PostCopyUpdater(ref key,
ref input, ref hlog.GetValue(physicalAddress),
ref hlog.GetValue(newPhysicalAddress),
ref output, ref recordInfo, newLogicalAddress))
{
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
}
else
status = OperationStatus.RETRY_NOW;
}
recordInfo.SetTentativeAtomic(false);
return status;
}
else
{
Expand Down Expand Up @@ -1497,11 +1500,11 @@ internal OperationStatus InternalDelete<Input, Output, Context, FasterSession>(
recordInfo.CopyLocksFrom(hlog.GetInfo(unsealPhysicalAddress));
else if (LockTable.IsActive)
LockTable.TransferToLogRecord(ref key, ref recordInfo);
recordInfo.SetTentativeAtomic(false);

// Note that this is the new logicalAddress; we have not retrieved the old one if it was below HeadAddress, and thus
// we do not know whether 'logicalAddress' belongs to 'key' or is a collision.
fasterSession.PostSingleDeleter(ref key, ref recordInfo, newLogicalAddress);
recordInfo.SetTentativeAtomic(false);
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
status = OperationStatus.SUCCESS;
Expand Down Expand Up @@ -2022,7 +2025,6 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
{
if (LockTable.IsActive)
LockTable.TransferToLogRecord(ref key, ref recordInfo);
recordInfo.SetTentativeAtomic(false);

// If IU, status will be NOTFOUND; return that.
if (status != OperationStatus.SUCCESS)
Expand All @@ -2034,20 +2036,26 @@ ref hlog.GetValue(newPhysicalAddress),
ref pendingContext.output, ref recordInfo, newLogicalAddress);
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
return status;
}

// Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op.
if (fasterSession.PostCopyUpdater(ref key,
ref pendingContext.input.Get(),
ref hlog.GetContextRecordValue(ref request),
ref hlog.GetValue(newPhysicalAddress),
ref pendingContext.output, ref recordInfo, newLogicalAddress))
else
{
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
return status;

// Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op.
if (fasterSession.PostCopyUpdater(ref key,
ref pendingContext.input.Get(),
ref hlog.GetContextRecordValue(ref request),
ref hlog.GetValue(newPhysicalAddress),
ref pendingContext.output, ref recordInfo, newLogicalAddress))
{
pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = newLogicalAddress;
}
else
status = OperationStatus.RETRY_NOW;
}

recordInfo.SetTentativeAtomic(false);
return status;
}
else
{
Expand Down Expand Up @@ -2474,7 +2482,7 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes
readcache.Serialize(ref key, newPhysicalAddress);
fasterSession.SingleWriter(ref key, ref input, ref value,
ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output,
ref recordInfo, Constants.kInvalidAddress, reason); // We do not expose readcache addresses
ref recordInfo, Constants.kInvalidAddress, WriteReason.CopyToReadCache); // We do not expose readcache addresses
}
else
{
Expand All @@ -2486,6 +2494,11 @@ ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref
tombstone: false, dirty: true,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);

// Reflect whether we overrode a readcache reason
if (reason == WriteReason.CopyToReadCache)
reason = WriteReason.CopyToTail;

fasterSession.SingleWriter(ref key, ref input, ref value,
ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output,
ref recordInfo, newLogicalAddress, reason);
Expand Down Expand Up @@ -2575,13 +2588,13 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref outp
ref RecordInfo recordInfo = ref log.GetInfo(newPhysicalAddress);
if (LockTable.IsActive)
LockTable.TransferToLogRecord(ref key, ref recordInfo);
recordInfo.SetTentativeAtomic(false);

pendingContext.recordInfo = recordInfo;
pendingContext.logicalAddress = copyToReadCache ? Constants.kInvalidAddress /* We do not expose readcache addresses */ : newLogicalAddress;
fasterSession.PostSingleWriter(ref key, ref input, ref value,
ref log.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output,
ref recordInfo, pendingContext.logicalAddress, reason);
recordInfo.SetTentativeAtomic(false);
return OperationStatus.SUCCESS;
}
#endregion
Expand Down
2 changes: 1 addition & 1 deletion cs/test/DeviceFasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void BasicHighLatencyDeviceTest()
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);

// Create devices \ log for test for in memory device
using var device = new LocalMemoryDevice(1L << 28, 1L << 25, 2, latencyMs: 20);
using var device = new LocalMemoryDevice(1L << 28, 1L << 25, 2, latencyMs: 20, fileName: TestUtils.MethodTestDir + "/test.log");
using var LocalMemorylog = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 80, MemorySizeBits = 20, GetMemory = null, SegmentSizeBits = 80, MutableFraction = 0.2, LogCommitManager = null });

int entryLength = 10;
Expand Down
2 changes: 0 additions & 2 deletions cs/test/LockableUnsafeContextTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public long GetHashCode64(ref int k)

public enum ResultLockTarget { MutableLock, LockTable }

public enum ReadCopyDestination { Tail, ReadCache }

public enum FlushMode { NoFlush, ReadOnly, OnDisk }

public enum UpdateOp { Upsert, RMW, Delete }
Expand Down
2 changes: 1 addition & 1 deletion cs/test/LowMemAsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void Setup()
{
path = TestUtils.MethodTestDir;
TestUtils.DeleteDirectory(path, wait: true);
log = new LocalMemoryDevice(1L << 30, 1L << 25, 1, latencyMs: 20);
log = new LocalMemoryDevice(1L << 30, 1L << 25, 1, latencyMs: 20, fileName: path + "/test.log");
Directory.CreateDirectory(path);
fht1 = new FasterKV<long, long>
(1L << 10,
Expand Down
16 changes: 8 additions & 8 deletions cs/test/ReadCacheChainTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace FASTER.test.ReadCacheTests
{
Expand Down Expand Up @@ -101,7 +100,7 @@ void CreateChain(bool immutable = false)
Assert.AreEqual((immutable && key >= immutableSplitKey) ? Status.OK : Status.PENDING, status);
session.CompletePending(wait: true);
if (ii == 0)
readCacheHighEvictionAddress = fht.readcache.GetTailAddress();
readCacheHighEvictionAddress = fht.ReadCache.TailAddress;
}

// Pass2: non-PENDING reads from the cache
Expand Down Expand Up @@ -131,7 +130,7 @@ internal static unsafe (long logicalAddress, long physicalAddress) GetHashChain(
var bucket = default(HashBucket*);
var slot = default(int);

var hash = fht.comparer.GetHashCode64(ref key);
var hash = fht.Comparer.GetHashCode64(ref key);
var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);

var entry = default(HashBucketEntry);
Expand All @@ -155,6 +154,7 @@ internal static (long logicalAddress, long physicalAddress) NextInChain(FasterKV
var log = isReadCache ? fht.readcache : fht.hlog;
var info = log.GetInfo(physicalAddress);
var la = info.PreviousAddress;

isReadCache = new HashBucketEntry { word = la }.ReadCache;
log = isReadCache ? fht.readcache : fht.hlog;
var pa = log.GetPhysicalAddress(la);
Expand Down Expand Up @@ -199,9 +199,9 @@ internal static (long logicalAddress, long physicalAddress) NextInChain(FasterKV

internal static (long logicalAddress, long physicalAddress) SkipReadCacheChain(FasterKV<int, int> fht, int key)
{
var (la, pa) = ChainTests.GetHashChain(fht, key, out _, out _, out bool isReadCache);
var (la, pa) = GetHashChain(fht, key, out _, out _, out bool isReadCache);
while (isReadCache)
(la, pa) = ChainTests.NextInChain(fht, pa, out _, out _, ref isReadCache);
(la, pa) = NextInChain(fht, pa, out _, out _, ref isReadCache);
return (la, pa);
}

Expand Down Expand Up @@ -249,7 +249,7 @@ void doTest(int key)
doTest(midChainKey);
ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: false);

fht.ReadCacheEvict(fht.readcache.BeginAddress, readCacheHighEvictionAddress);
fht.ReadCacheEvict(fht.ReadCache.BeginAddress, readCacheHighEvictionAddress);
ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: true);
}

Expand Down Expand Up @@ -280,7 +280,7 @@ void doTest(int key)
Assert.IsTrue(isReadCache);
Assert.IsTrue(invalid);

fht.ReadCacheEvict(fht.readcache.BeginAddress, readCacheHighEvictionAddress);
fht.ReadCacheEvict(fht.ReadCache.BeginAddress, readCacheHighEvictionAddress);
_ = GetHashChain(lowChainKey, out actualKey, out invalid, out isReadCache);
Assert.IsFalse(isReadCache);
Assert.IsFalse(invalid);
Expand Down Expand Up @@ -338,7 +338,7 @@ void doTest(int key)
doTest(midChainKey);
ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: false);

fht.ReadCacheEvict(fht.readcache.BeginAddress, readCacheHighEvictionAddress);
fht.ReadCacheEvict(fht.ReadCache.BeginAddress, readCacheHighEvictionAddress);
ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: true);
}

Expand Down
Loading