diff --git a/README.md b/README.md
index f5d37ef29..d62c57e32 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
-
+
[![NuGet](https://img.shields.io/nuget/v/Microsoft.FASTER.Core.svg)](https://www.nuget.org/packages/Microsoft.FASTER.Core/)
diff --git a/cs/FASTER.sln b/cs/FASTER.sln
index 1ab8db8c5..b724c1553 100644
--- a/cs/FASTER.sln
+++ b/cs/FASTER.sln
@@ -61,8 +61,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SecondaryReaderStore", "sam
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "VersionedRead", "samples\ReadAddress\VersionedRead.csproj", "{33ED9E1B-1EF0-4984-A07A-7A26C205A446}"
EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MemOnlyCache", "samples\MemOnlyCache\MemOnlyCache.csproj", "{998D4C78-B0C5-40FF-9BDC-716BAC8CF864}"
-EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AsyncStress", "playground\AsyncStress\AsyncStress.csproj", "{9EFCF8C5-320B-473C-83DE-3815981D465B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogStress", "playground\FasterLogMLSDTest\FasterLogStress.csproj", "{E8C7FB0F-38B8-468A-B1CA-8793DF8F2693}"
@@ -112,6 +110,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "root", "root", "{CEDB9572-7
..\README.md = ..\README.md
EndProjectSection
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ResizableCacheStore", "samples\ResizableCacheStore\ResizableCacheStore.csproj", "{B4A55211-5457-44B9-8BCB-A5488C994965}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -288,14 +288,6 @@ Global
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Release|Any CPU.Build.0 = Release|x64
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Release|x64.ActiveCfg = Release|x64
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Release|x64.Build.0 = Release|x64
- {998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Debug|Any CPU.ActiveCfg = Debug|x64
- {998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Debug|Any CPU.Build.0 = Debug|x64
- {998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Debug|x64.ActiveCfg = Debug|x64
- {998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Debug|x64.Build.0 = Debug|x64
- {998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Release|Any CPU.ActiveCfg = Release|x64
- {998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Release|Any CPU.Build.0 = Release|x64
- {998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Release|x64.ActiveCfg = Release|x64
- {998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Release|x64.Build.0 = Release|x64
{9EFCF8C5-320B-473C-83DE-3815981D465B}.Debug|Any CPU.ActiveCfg = Debug|x64
{9EFCF8C5-320B-473C-83DE-3815981D465B}.Debug|Any CPU.Build.0 = Debug|x64
{9EFCF8C5-320B-473C-83DE-3815981D465B}.Debug|x64.ActiveCfg = Debug|x64
@@ -335,6 +327,14 @@ Global
{AF996720-DB6C-4ED7-9693-B9531F0B119A}.Release|Any CPU.Build.0 = Release|Any CPU
{AF996720-DB6C-4ED7-9693-B9531F0B119A}.Release|x64.ActiveCfg = Release|Any CPU
{AF996720-DB6C-4ED7-9693-B9531F0B119A}.Release|x64.Build.0 = Release|Any CPU
+ {B4A55211-5457-44B9-8BCB-A5488C994965}.Debug|Any CPU.ActiveCfg = Debug|x64
+ {B4A55211-5457-44B9-8BCB-A5488C994965}.Debug|Any CPU.Build.0 = Debug|x64
+ {B4A55211-5457-44B9-8BCB-A5488C994965}.Debug|x64.ActiveCfg = Debug|x64
+ {B4A55211-5457-44B9-8BCB-A5488C994965}.Debug|x64.Build.0 = Debug|x64
+ {B4A55211-5457-44B9-8BCB-A5488C994965}.Release|Any CPU.ActiveCfg = Release|x64
+ {B4A55211-5457-44B9-8BCB-A5488C994965}.Release|Any CPU.Build.0 = Release|x64
+ {B4A55211-5457-44B9-8BCB-A5488C994965}.Release|x64.ActiveCfg = Release|x64
+ {B4A55211-5457-44B9-8BCB-A5488C994965}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -363,12 +363,12 @@ Global
{E2A1C205-4D35-448C-A72F-B9A4AE28EB4E} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{EBE313E5-22D2-4C74-BA1F-16B60404B335} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{33ED9E1B-1EF0-4984-A07A-7A26C205A446} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
- {998D4C78-B0C5-40FF-9BDC-716BAC8CF864} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{9EFCF8C5-320B-473C-83DE-3815981D465B} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{E8C7FB0F-38B8-468A-B1CA-8793DF8F2693} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{DC3E0640-9A36-43D0-AA37-A1B61B0BFBC9} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{AF996720-DB6C-4ED7-9693-B9531F0B119A} = {5E4C9997-3350-4761-9FC9-F27649848B1D}
+ {B4A55211-5457-44B9-8BCB-A5488C994965} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
diff --git a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs
index 988442412..adb467b73 100644
--- a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs
+++ b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs
@@ -77,12 +77,12 @@ internal FasterSpanByteYcsbBenchmark(KeySpanByte[] i_keys_, KeySpanByte[] t_keys
if (testLoader.Options.UseSmallMemoryLog)
store = new FasterKV
- (testLoader.MaxKey / 2, new LogSettings { LogDevice = device, PreallocateLog = true, PageSizeBits = 22, SegmentSizeBits = 26, MemorySizeBits = 26 },
- new CheckpointSettings { CheckpointDir = testLoader.BackupPath }, disableEphemeralLocking: testLoader.LockImpl != LockImpl.Ephemeral);
+ (testLoader.MaxKey / testLoader.Options.HashPacking, new LogSettings { LogDevice = device, PreallocateLog = true, PageSizeBits = 22, SegmentSizeBits = 26, MemorySizeBits = 26 },
+ new CheckpointSettings { CheckpointDir = testLoader.BackupPath }, lockingMode: testLoader.LockingMode);
else
store = new FasterKV
- (testLoader.MaxKey / 2, new LogSettings { LogDevice = device, PreallocateLog = true, MemorySizeBits = 35 },
- new CheckpointSettings { CheckpointDir = testLoader.BackupPath }, disableEphemeralLocking: testLoader.LockImpl != LockImpl.Ephemeral);
+ (testLoader.MaxKey / testLoader.Options.HashPacking, new LogSettings { LogDevice = device, PreallocateLog = true, MemorySizeBits = 35 },
+ new CheckpointSettings { CheckpointDir = testLoader.BackupPath }, lockingMode: testLoader.LockingMode);
}
internal void Dispose()
diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs
index fcde88918..5b2365dbc 100644
--- a/cs/benchmark/FasterYcsbBenchmark.cs
+++ b/cs/benchmark/FasterYcsbBenchmark.cs
@@ -77,12 +77,12 @@ internal FASTER_YcsbBenchmark(Key[] i_keys_, Key[] t_keys_, TestLoader testLoade
if (testLoader.Options.UseSmallMemoryLog)
store = new FasterKV
- (testLoader.MaxKey / 4, new LogSettings { LogDevice = device, PreallocateLog = true, PageSizeBits = 25, SegmentSizeBits = 30, MemorySizeBits = 28 },
- new CheckpointSettings { CheckpointDir = testLoader.BackupPath }, disableEphemeralLocking: testLoader.LockImpl != LockImpl.Ephemeral);
+ (testLoader.MaxKey / testLoader.Options.HashPacking, new LogSettings { LogDevice = device, PreallocateLog = true, PageSizeBits = 25, SegmentSizeBits = 30, MemorySizeBits = 28 },
+ new CheckpointSettings { CheckpointDir = testLoader.BackupPath }, lockingMode: testLoader.LockingMode);
else
store = new FasterKV
- (testLoader.MaxKey / 2, new LogSettings { LogDevice = device, PreallocateLog = true },
- new CheckpointSettings { CheckpointDir = testLoader.BackupPath }, disableEphemeralLocking: testLoader.LockImpl != LockImpl.Ephemeral);
+ (testLoader.MaxKey / testLoader.Options.HashPacking, new LogSettings { LogDevice = device, PreallocateLog = true },
+ new CheckpointSettings { CheckpointDir = testLoader.BackupPath }, lockingMode: testLoader.LockingMode);
}
internal void Dispose()
@@ -310,22 +310,6 @@ internal unsafe (double, double) Run(TestLoader testLoader)
dash.Start();
#endif
- ClientSession session = default;
- LockableUnsafeContext luContext = default;
-
- (Key key, LockType kind) xlock = (new Key { value = long.MaxValue }, LockType.Exclusive);
- (Key key, LockType kind) slock = (new Key { value = long.MaxValue - 1 }, LockType.Shared);
- if (testLoader.Options.LockImpl == (int)LockImpl.Manual)
- {
- session = store.For(functions).NewSession();
- luContext = session.LockableUnsafeContext;
- luContext.BeginLockable();
-
- Console.WriteLine("Taking 2 manual locks");
- luContext.Lock(xlock.key, xlock.kind);
- luContext.Lock(slock.key, slock.kind);
- }
-
Thread[] workers = new Thread[testLoader.Options.ThreadCount];
Console.WriteLine("Executing setup.");
@@ -432,14 +416,6 @@ internal unsafe (double, double) Run(TestLoader testLoader)
worker.Join();
}
- if (testLoader.Options.LockImpl == (int)LockImpl.Manual)
- {
- luContext.Unlock(xlock.key, xlock.kind);
- luContext.Unlock(slock.key, slock.kind);
- luContext.EndLockable();
- session.Dispose();
- }
-
waiter.Reset();
#if DASHBOARD
diff --git a/cs/benchmark/Options.cs b/cs/benchmark/Options.cs
index 0a0281fe0..02e35c00e 100644
--- a/cs/benchmark/Options.cs
+++ b/cs/benchmark/Options.cs
@@ -35,9 +35,8 @@ class Options
[Option('z', "locking", Required = false, Default = 0,
HelpText = "Locking Implementation:" +
"\n 0 = None (default)" +
- "\n 1 = Ephemeral locking using RecordInfo.SpinLock()" +
- "\n 2 = Manual locking using LockableUnsafeContext")]
- public int LockImpl { get; set; }
+ "\n 1 = Mixed-mode locking using main HashTable buckets")]
+ public int LockingMode { get; set; }
[Option('i', "iterations", Required = false, Default = 1,
HelpText = "Number of iterations of the test to run")]
@@ -71,6 +70,10 @@ class Options
HelpText = "Use Small Memory log in experiment")]
public bool UseSmallMemoryLog { get; set; }
+ [Option("hashpack", Required = false, Default = 2,
+ HelpText = "The hash table packing; divide the number of keys by this to cause hash collisions")]
+ public int HashPacking { get; set; }
+
[Option("safectx", Required = false, Default = false,
HelpText = "Use 'safe' context (slower, per-operation epoch control) in experiment")]
public bool UseSafeContext { get; set; }
@@ -96,7 +99,7 @@ class Options
public string GetOptionsString()
{
static string boolStr(bool value) => value ? "y" : "n";
- return $"d: {DistributionName.ToLower()}; n: {NumaStyle}; r: {ReadPercent}; t: {ThreadCount}; z: {LockImpl}; i: {IterationCount};"
+ return $"d: {DistributionName.ToLower()}; n: {NumaStyle}; r: {ReadPercent}; t: {ThreadCount}; z: {LockingMode}; i: {IterationCount}; hp: {HashPacking}"
+ $" sd: {boolStr(UseSmallData)}; sm: {boolStr(UseSmallMemoryLog)}; sy: {boolStr(this.UseSyntheticData)}; safectx: {boolStr(this.UseSafeContext)};"
+ $" chkptms: {this.PeriodicCheckpointMilliseconds}; chkpttype: {(this.PeriodicCheckpointMilliseconds > 0 ? this.PeriodicCheckpointType.ToString() : "None")};"
+ $" chkptincr: {boolStr(this.PeriodicCheckpointTryIncremental)}";
diff --git a/cs/benchmark/TestLoader.cs b/cs/benchmark/TestLoader.cs
index 8d73284bb..9e5f73739 100644
--- a/cs/benchmark/TestLoader.cs
+++ b/cs/benchmark/TestLoader.cs
@@ -28,7 +28,7 @@ class TestLoader
internal KeySpanByte[] txn_span_keys = default;
internal readonly BenchmarkType BenchmarkType;
- internal readonly LockImpl LockImpl;
+ internal readonly LockingMode LockingMode;
internal readonly long InitCount;
internal readonly long TxnCount;
internal readonly int MaxKey;
@@ -60,13 +60,21 @@ static bool verifyOption(bool isValid, string name)
if (!verifyOption(Options.NumaStyle >= 0 && Options.NumaStyle <= 1, "NumaStyle"))
return;
- this.LockImpl = (LockImpl)Options.LockImpl;
- if (!verifyOption(Enum.IsDefined(typeof(LockImpl), this.LockImpl), "Lock Implementation"))
+ this.LockingMode = Options.LockingMode switch
+ {
+ 0 => LockingMode.None,
+ 1 => LockingMode.Standard,
+ _ => throw new InvalidOperationException($"Unknown Locking mode int: {Options.LockingMode}")
+ };
+ if (!verifyOption(Enum.IsDefined(typeof(LockingMode), this.LockingMode), "LockingMode"))
return;
if (!verifyOption(Options.IterationCount > 0, "Iteration Count"))
return;
+ if (!verifyOption(Options.HashPacking > 0, "Iteration Count"))
+ return;
+
if (!verifyOption(Options.ReadPercent >= -1 && Options.ReadPercent <= 100, "Read Percent"))
return;
diff --git a/cs/benchmark/YcsbConstants.cs b/cs/benchmark/YcsbConstants.cs
index 77a56b8c7..ab9a33c22 100644
--- a/cs/benchmark/YcsbConstants.cs
+++ b/cs/benchmark/YcsbConstants.cs
@@ -12,13 +12,6 @@ enum BenchmarkType : byte
ConcurrentDictionaryYcsb
};
- enum LockImpl : byte
- {
- None = 0,
- Ephemeral = 1,
- Manual = 2
- };
-
enum AddressLineNum : int
{
Before = 1,
diff --git a/cs/remote/samples/FixedLenServer/Program.cs b/cs/remote/samples/FixedLenServer/Program.cs
index bc9285d5a..727dc8e7d 100644
--- a/cs/remote/samples/FixedLenServer/Program.cs
+++ b/cs/remote/samples/FixedLenServer/Program.cs
@@ -8,6 +8,7 @@
using FASTER.server;
using System.Diagnostics;
using Microsoft.Extensions.Logging;
+using FASTER.core;
namespace FasterFixedLenServer
{
@@ -38,7 +39,7 @@ static void Main(string[] args)
builder.SetMinimumLevel(LogLevel.Error);
});
- using var server = new FixedLenServer(opts.GetServerOptions(), () => new Functions(), disableEphemeralLocking: true);
+ using var server = new FixedLenServer(opts.GetServerOptions(), () => new Functions(), lockingMode: LockingMode.Standard);
server.Start();
Console.WriteLine("Started server");
diff --git a/cs/remote/src/FASTER.server/Servers/FixedLenServer.cs b/cs/remote/src/FASTER.server/Servers/FixedLenServer.cs
index b0dee88de..e535b8dac 100644
--- a/cs/remote/src/FASTER.server/Servers/FixedLenServer.cs
+++ b/cs/remote/src/FASTER.server/Servers/FixedLenServer.cs
@@ -23,11 +23,11 @@ public sealed class FixedLenServer : Gener
///
///
///
- ///
+ ///
///
///
- public FixedLenServer(ServerOptions opts, Func functionsGen, bool disableEphemeralLocking, MaxSizeSettings maxSizeSettings = default, ILoggerFactory loggerFactory = null)
- : base(opts, functionsGen, new FixedLenSerializer(), new FixedLenKeySerializer(), disableEphemeralLocking, maxSizeSettings, loggerFactory)
+ public FixedLenServer(ServerOptions opts, Func functionsGen, LockingMode lockingMode, MaxSizeSettings maxSizeSettings = default, ILoggerFactory loggerFactory = null)
+ : base(opts, functionsGen, new FixedLenSerializer(), new FixedLenKeySerializer(), lockingMode: lockingMode, maxSizeSettings, loggerFactory)
{
}
}
diff --git a/cs/remote/src/FASTER.server/Servers/GenericServer.cs b/cs/remote/src/FASTER.server/Servers/GenericServer.cs
index ac28969b5..89bfb6207 100644
--- a/cs/remote/src/FASTER.server/Servers/GenericServer.cs
+++ b/cs/remote/src/FASTER.server/Servers/GenericServer.cs
@@ -31,11 +31,11 @@ public class GenericServer
///
///
- ///
+ ///
///
///
public GenericServer(ServerOptions opts, Func functionsGen, ParameterSerializer serializer, IKeyInputSerializer keyInputSerializer,
- bool disableEphemeralLocking, MaxSizeSettings maxSizeSettings = default, ILoggerFactory loggerFactory = null)
+ LockingMode lockingMode, MaxSizeSettings maxSizeSettings = default, ILoggerFactory loggerFactory = null)
{
this.opts = opts;
@@ -45,7 +45,7 @@ public GenericServer(ServerOptions opts, Func functionsGen, Parameter
if (!Directory.Exists(opts.CheckpointDir))
Directory.CreateDirectory(opts.CheckpointDir);
- store = new FasterKV(indexSize, logSettings, checkpointSettings, disableEphemeralLocking: disableEphemeralLocking, loggerFactory: loggerFactory);
+ store = new FasterKV(indexSize, logSettings, checkpointSettings, lockingMode: lockingMode, loggerFactory: loggerFactory);
if (opts.Recover)
{
diff --git a/cs/remote/src/FASTER.server/Servers/VarLenServer.cs b/cs/remote/src/FASTER.server/Servers/VarLenServer.cs
index da251d239..2c12b374c 100644
--- a/cs/remote/src/FASTER.server/Servers/VarLenServer.cs
+++ b/cs/remote/src/FASTER.server/Servers/VarLenServer.cs
@@ -37,7 +37,7 @@ public VarLenServer(ServerOptions opts, ILoggerFactory loggerFactory = null)
if (!Directory.Exists(opts.CheckpointDir))
Directory.CreateDirectory(opts.CheckpointDir);
- store = new FasterKV(indexSize, logSettings, checkpointSettings, disableEphemeralLocking: false, loggerFactory: loggerFactory);
+ store = new FasterKV(indexSize, logSettings, checkpointSettings, lockingMode: LockingMode.Standard, loggerFactory: loggerFactory);
if (!opts.DisablePubSub)
{
diff --git a/cs/remote/test/FASTER.remote.test/TestUtils.cs b/cs/remote/test/FASTER.remote.test/TestUtils.cs
index 511a70a8d..de8126a63 100644
--- a/cs/remote/test/FASTER.remote.test/TestUtils.cs
+++ b/cs/remote/test/FASTER.remote.test/TestUtils.cs
@@ -34,7 +34,7 @@ public static FixedLenServer>(opts, () => new SimpleFunctions(merger), disableEphemeralLocking: true);
+ return new FixedLenServer>(opts, () => new SimpleFunctions(merger), lockingMode: LockingMode.Standard);
}
///
diff --git a/cs/samples/MemOnlyCache/CacheSizeTracker.cs b/cs/samples/MemOnlyCache/CacheSizeTracker.cs
deleted file mode 100644
index 93ee92cb7..000000000
--- a/cs/samples/MemOnlyCache/CacheSizeTracker.cs
+++ /dev/null
@@ -1,108 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT license.
-
-using FASTER.core;
-using System;
-using System.Threading;
-
-namespace MemOnlyCache
-{
- ///
- /// Cache size tracker
- ///
- public class CacheSizeTracker : IObserver>
- {
- readonly FasterKV store;
- long storeHeapSize;
-
- ///
- /// Target size request for FASTER
- ///
- public long TargetSizeBytes { get; private set; }
-
- ///
- /// Total size (bytes) used by FASTER including index and log
- ///
- public long TotalSizeBytes => storeHeapSize + store.IndexSize * 64 + store.Log.MemorySizeBytes + (store.ReadCache != null ? store.ReadCache.MemorySizeBytes : 0) + store.OverflowBucketCount * 64;
-
- ///
- /// Class to track and update cache size
- ///
- /// FASTER store instance
- /// Initial target memory size of FASTER in bytes
- public CacheSizeTracker(FasterKV store, long targetMemoryBytes = long.MaxValue)
- {
- this.store = store;
- if (targetMemoryBytes < long.MaxValue)
- Console.WriteLine("**** Setting initial target memory: {0,11:N2}KB", targetMemoryBytes / 1024.0);
- this.TargetSizeBytes = targetMemoryBytes;
-
- Console.WriteLine("Index size: {0}", store.IndexSize * 64);
- Console.WriteLine("Total store size: {0}", TotalSizeBytes);
-
- // Register subscriber to receive notifications of log evictions from memory
- store.Log.SubscribeEvictions(this);
-
- // Include the separate read cache, if enabled
- if (store.ReadCache != null)
- store.ReadCache.SubscribeEvictions(this);
- }
-
- ///
- /// Set target total memory size (in bytes) for the FASTER store
- ///
- /// Target size
- public void SetTargetSizeBytes(long newTargetSize)
- {
- if (newTargetSize < TargetSizeBytes)
- {
- TargetSizeBytes = newTargetSize;
- store.Log.EmptyPageCount++; // trigger eviction to start the memory reduction process
- }
- else
- TargetSizeBytes = newTargetSize;
- }
-
- ///
- /// Add to the tracked size of FASTER. This is called by IFunctions as well as the subscriber to evictions (OnNext)
- ///
- ///
- public void AddTrackedSize(int size) => Interlocked.Add(ref storeHeapSize, size);
-
- ///
- /// Subscriber to pages as they are getting evicted from main memory
- ///
- ///
- public void OnNext(IFasterScanIterator iter)
- {
- int size = 0;
- while (iter.GetNext(out RecordInfo info, out CacheKey key, out CacheValue value))
- {
- size += key.GetSize;
- if (!info.Tombstone) // ignore deleted values being evicted (they are accounted for by ConcurrentDeleter)
- size += value.GetSize;
- }
- AddTrackedSize(-size);
-
- // Adjust empty page count to drive towards desired memory utilization
- if (store.Log.PageAllocationStabilized())
- {
- if (TotalSizeBytes > TargetSizeBytes)
- store.Log.EmptyPageCount++;
- else
- store.Log.EmptyPageCount--;
- }
- }
-
- ///
- /// OnCompleted
- ///
- public void OnCompleted() { }
-
- ///
- /// OnError
- ///
- ///
- public void OnError(Exception error) { }
- }
-}
diff --git a/cs/samples/ResizableCacheStore/CacheSizeTracker.cs b/cs/samples/ResizableCacheStore/CacheSizeTracker.cs
new file mode 100644
index 000000000..a49bbe063
--- /dev/null
+++ b/cs/samples/ResizableCacheStore/CacheSizeTracker.cs
@@ -0,0 +1,101 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using FASTER.core;
+using System;
+
+namespace ResizableCacheStore
+{
+ ///
+ /// Cache size tracker
+ ///
+ public class CacheSizeTracker
+ {
+ readonly FasterKV store;
+
+ ///
+ /// Total size (bytes) used by FASTER including index and log
+ ///
+ public long TotalSizeBytes =>
+ IndexSizeBytes +
+ mainLog.TotalSizeBytes +
+ (readCache != null ? readCache.TotalSizeBytes : 0);
+
+ public long IndexSizeBytes =>
+ store.IndexSize * 64 +
+ store.OverflowBucketCount * 64;
+
+ public long LogSizeBytes => mainLog.TotalSizeBytes;
+ public long ReadCacheSizeBytes => readCache != null ? readCache.TotalSizeBytes : 0;
+
+ readonly LogSizeTracker mainLog;
+ readonly LogSizeTracker readCache;
+
+ public void PrintStats()
+ {
+ Console.WriteLine("Sizes: [store]: {0,8:N2}KB [index]: {1,9:N2}KB [hylog]: {2,8:N2}KB ({3,7} objs) [rcach]: {4,9:N2}KB ({5,7} objs)",
+ TotalSizeBytes / 1024.0,
+ IndexSizeBytes / 1024.0,
+ LogSizeBytes / 1024.0,
+ mainLog.NumRecords,
+ ReadCacheSizeBytes / 1024.0,
+ readCache != null ? readCache.NumRecords : 0
+ );
+ }
+
+ ///
+ /// Class to track and update cache size
+ ///
+ /// FASTER store instance
+ /// Initial target memory size of FASTER in bytes
+ public CacheSizeTracker(FasterKV store, long targetMemoryBytes = long.MaxValue)
+ {
+ this.store = store;
+ this.mainLog = new LogSizeTracker(store.Log, "mnlog");
+ if (store.ReadCache != null)
+ this.readCache = new LogSizeTracker(store.ReadCache, "readc");
+
+ if (targetMemoryBytes < long.MaxValue)
+ {
+ Console.WriteLine("**** Setting initial target memory: {0,11:N2}KB", targetMemoryBytes / 1024.0);
+ SetTargetSizeBytes(targetMemoryBytes);
+ }
+
+ PrintStats();
+ }
+
+ ///
+ /// Set target total memory size (in bytes) for the FASTER store
+ ///
+ /// Target size
+ public void SetTargetSizeBytes(long newTargetSize)
+ {
+ // In this sample, we split the residual space equally between the log and the read cache
+ long residual = newTargetSize - IndexSizeBytes;
+
+ if (residual > 0)
+ {
+ if (readCache == null)
+ mainLog.SetTargetSizeBytes(residual);
+ else
+ {
+ mainLog.SetTargetSizeBytes(residual / 2);
+ readCache.SetTargetSizeBytes(residual / 2);
+ }
+ }
+ }
+
+
+ ///
+ /// Add to the tracked size of FASTER. This is called by IFunctions as well as the subscriber to evictions (OnNext)
+ ///
+ ///
+ public void AddTrackedSize(int size, bool isReadCache = false)
+ {
+ if (isReadCache)
+ readCache.AddTrackedSize(size);
+ else
+ mainLog.AddTrackedSize(size);
+ }
+ }
+}
diff --git a/cs/samples/ResizableCacheStore/ISizeTracker.cs b/cs/samples/ResizableCacheStore/ISizeTracker.cs
new file mode 100644
index 000000000..97b89e09d
--- /dev/null
+++ b/cs/samples/ResizableCacheStore/ISizeTracker.cs
@@ -0,0 +1,10 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+namespace ResizableCacheStore
+{
+ public interface ISizeTracker
+ {
+ int GetSize { get; }
+ }
+}
diff --git a/cs/samples/ResizableCacheStore/LogSizeTracker.cs b/cs/samples/ResizableCacheStore/LogSizeTracker.cs
new file mode 100644
index 000000000..72e4b0385
--- /dev/null
+++ b/cs/samples/ResizableCacheStore/LogSizeTracker.cs
@@ -0,0 +1,115 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using FASTER.core;
+using System;
+using System.Threading;
+
+namespace ResizableCacheStore
+{
+ public class LogSizeTracker : IObserver>
+ where TCacheKey : ISizeTracker
+ where TCacheValue : ISizeTracker
+ {
+ readonly string name;
+
+ ///
+ /// Number of records in the log
+ ///
+ public int NumRecords;
+
+ ///
+ /// Total size occupied by log, including heap
+ ///
+ public long TotalSizeBytes => Log.MemorySizeBytes + heapSize;
+
+ ///
+ /// Target size request for FASTER
+ ///
+ public long TargetSizeBytes { get; private set; }
+
+
+ int heapSize;
+ readonly LogAccessor Log;
+
+ public LogSizeTracker(LogAccessor log, string name)
+ {
+ this.name = name;
+ Log = log;
+
+ // Register subscriber to receive notifications of log evictions from memory
+ Log.SubscribeEvictions(this);
+ }
+
+ ///
+ /// Add to the tracked size of FASTER. This is called by IFunctions as well as the subscriber to evictions (OnNext)
+ ///
+ ///
+ public void AddTrackedSize(int size)
+ {
+ Interlocked.Add(ref heapSize, size);
+ if (size > 0) Interlocked.Increment(ref NumRecords);
+ else Interlocked.Decrement(ref NumRecords);
+ }
+
+ ///
+ /// Set target total memory size (in bytes) for the FASTER store
+ ///
+ /// Target size
+ public void SetTargetSizeBytes(long newTargetSize)
+ {
+ TargetSizeBytes = newTargetSize;
+ AdjustAllocation();
+ }
+ public void OnNext(IFasterScanIterator iter)
+ {
+ int size = 0;
+ int count = 0;
+ while (iter.GetNext(out RecordInfo info, out TCacheKey key, out TCacheValue value))
+ {
+ size += key.GetSize;
+ count++;
+ if (!info.Tombstone) // ignore deleted values being evicted (they are accounted for by ConcurrentDeleter)
+ size += value.GetSize;
+ }
+ Interlocked.Add(ref heapSize, -size);
+ Interlocked.Add(ref NumRecords, -count);
+ AdjustAllocation();
+ }
+
+ public void OnCompleted() { }
+
+ public void OnError(Exception error) { }
+
+ void AdjustAllocation()
+ {
+ const long Delta = 1L << 15;
+ if (TotalSizeBytes > TargetSizeBytes + Delta)
+ {
+ while (TotalSizeBytes > TargetSizeBytes + Delta)
+ {
+ if (Log.AllocatedPageCount > Log.BufferSize - Log.EmptyPageCount + 1)
+ {
+ // Console.WriteLine($"{name}: {Log.EmptyPageCount} (wait++)");
+ return; // wait for allocation to stabilize
+ }
+ Log.EmptyPageCount++;
+ // Console.WriteLine($"{name}: {Log.EmptyPageCount} (++)");
+ }
+ }
+ else if (TotalSizeBytes < TargetSizeBytes - Delta)
+ {
+ while (TotalSizeBytes < TargetSizeBytes - Delta)
+ {
+ if (Log.AllocatedPageCount < Log.BufferSize - Log.EmptyPageCount - 1)
+ {
+ // Console.WriteLine($"{name}: {Log.EmptyPageCount} (wait--)");
+ return; // wait for allocation to stabilize
+ }
+ Log.EmptyPageCount--;
+ // Console.WriteLine($"{name}: {Log.EmptyPageCount} (--)");
+ }
+ }
+ }
+ }
+}
diff --git a/cs/samples/MemOnlyCache/Program.cs b/cs/samples/ResizableCacheStore/Program.cs
similarity index 96%
rename from cs/samples/MemOnlyCache/Program.cs
rename to cs/samples/ResizableCacheStore/Program.cs
index 3453b78fc..29622981e 100644
--- a/cs/samples/MemOnlyCache/Program.cs
+++ b/cs/samples/ResizableCacheStore/Program.cs
@@ -10,7 +10,7 @@
#pragma warning disable IDE0079 // Remove unnecessary suppression
#pragma warning disable CS0162 // Unreachable code detected
-namespace MemOnlyCache
+namespace ResizableCacheStore
{
class Program
{
@@ -85,7 +85,7 @@ class Program
const string UseUniformArg = "-u";
///
- /// If true, create a log file in the {tempdir}\MemOnlyCacheSample
+ /// If true, create a log file in the {tempdir}\ResizableCacheStoreSample
///
static bool UseLogFile = false;
const string UseLogFileArg = "-l";
@@ -97,13 +97,13 @@ class Program
const string QuietArg = "-q";
///
- /// Uniform random distribution (true) or Zipf distribution (false) of requests
+ /// Copy to tail on read
///
static bool UseReadCTT = true;
const string NoReadCTTArg = "--noreadctt";
///
- /// Uniform random distribution (true) or Zipf distribution (false) of requests
+ /// Copy to read cache on read
///
static bool UseReadCache = false;
const string UseReadCacheArg = "--readcache";
@@ -225,6 +225,11 @@ static bool GetArgs(string[] args)
MaxKeySize = int.Parse(val);
continue;
}
+ if (arg == MaxValueSizeArg)
+ {
+ MaxValueSize = int.Parse(val);
+ continue;
+ }
if (arg == MemorySizeBitsArg)
{
MemorySizeBits = int.Parse(val);
@@ -303,7 +308,7 @@ static bool GetArgs(string[] args)
return true;
}
- static string GetLogPath() => Path.GetTempPath() + "MemOnlyCacheSample\\";
+ static string GetLogPath() => Path.GetTempPath() + "ResizableCacheStoreSample\\";
static void Main(string[] args)
{
@@ -415,9 +420,11 @@ private static void ContinuousRandomWorkload()
var ts = TimeSpan.FromSeconds(currentTimeMs / 1000);
var totalElapsed = ts.ToString();
- Console.WriteLine("Throughput: {0,8:0.00}K ops/sec; Hit rate: {1:N2}; Memory footprint: {2,12:N2}KB, elapsed: {3:c}",
- (currentReads - _lastReads) / (double)(currentElapsed), statusFound / (double)(statusFound + statusNotFound),
- sizeTracker.TotalSizeBytes / 1024.0, totalElapsed);
+ Console.WriteLine("Throughput: {0,8:0.00}K ops/sec; Hit rate: {1:N2}; elapsed: {2:c}",
+ (currentReads - _lastReads) / (double)(currentElapsed),
+ statusFound / (double)(statusFound + statusNotFound),
+ totalElapsed);
+ sizeTracker.PrintStats();
Interlocked.Exchange(ref statusFound, 0);
Interlocked.Exchange(ref statusNotFound, 0);
diff --git a/cs/samples/MemOnlyCache/MemOnlyCache.csproj b/cs/samples/ResizableCacheStore/ResizableCacheStore.csproj
similarity index 100%
rename from cs/samples/MemOnlyCache/MemOnlyCache.csproj
rename to cs/samples/ResizableCacheStore/ResizableCacheStore.csproj
diff --git a/cs/samples/MemOnlyCache/Types.cs b/cs/samples/ResizableCacheStore/Types.cs
similarity index 94%
rename from cs/samples/MemOnlyCache/Types.cs
rename to cs/samples/ResizableCacheStore/Types.cs
index d2c4036b3..264e226d6 100644
--- a/cs/samples/MemOnlyCache/Types.cs
+++ b/cs/samples/ResizableCacheStore/Types.cs
@@ -5,9 +5,9 @@
using System;
using System.Threading;
-namespace MemOnlyCache
+namespace ResizableCacheStore
{
- public class CacheKey : IFasterEqualityComparer
+ public class CacheKey : IFasterEqualityComparer, ISizeTracker
{
public long key;
public byte[] extra;
@@ -47,7 +47,7 @@ public override void Serialize(ref CacheKey obj)
}
}
- public sealed class CacheValue
+ public sealed class CacheValue : ISizeTracker
{
public byte[] value;
@@ -104,7 +104,7 @@ public override bool ConcurrentWriter(ref CacheKey key, ref CacheValue input, re
public override void PostSingleWriter(ref CacheKey key, ref CacheValue input, ref CacheValue src, ref CacheValue dst, ref CacheValue output, ref UpsertInfo upsertInfo, WriteReason reason)
{
dst = src;
- sizeTracker.AddTrackedSize(key.GetSize + src.GetSize);
+ sizeTracker.AddTrackedSize(key.GetSize + src.GetSize, reason == WriteReason.CopyToReadCache);
}
public override bool ConcurrentDeleter(ref CacheKey key, ref CacheValue value, ref DeleteInfo deleteInfo)
diff --git a/cs/samples/MemOnlyCache/ZipfGenerator.cs b/cs/samples/ResizableCacheStore/ZipfGenerator.cs
similarity index 97%
rename from cs/samples/MemOnlyCache/ZipfGenerator.cs
rename to cs/samples/ResizableCacheStore/ZipfGenerator.cs
index 02932baa9..bd47d66d7 100644
--- a/cs/samples/MemOnlyCache/ZipfGenerator.cs
+++ b/cs/samples/ResizableCacheStore/ZipfGenerator.cs
@@ -3,7 +3,7 @@
using System;
-namespace MemOnlyCache
+namespace ResizableCacheStore
{
public class ZipfGenerator
{
diff --git a/cs/samples/StoreVarLenTypes/AsciiSumSample.cs b/cs/samples/StoreVarLenTypes/AsciiSumSample.cs
index 31b3551c0..31788e995 100644
--- a/cs/samples/StoreVarLenTypes/AsciiSumSample.cs
+++ b/cs/samples/StoreVarLenTypes/AsciiSumSample.cs
@@ -28,7 +28,7 @@ public static void Run()
// For this test we require record-level locking
using var store = new FasterKV(
size: 1L << 20,
- logSettings: new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 12 }, disableEphemeralLocking: false);
+ logSettings: new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 12 }, lockingMode: LockingMode.Standard);
// Create session for ASCII sums. We require two callback function types to be provided:
// AsciiSumSpanByteFunctions implements RMW callback functions
diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs
index 402419d02..257244513 100644
--- a/cs/src/core/Allocator/AllocatorBase.cs
+++ b/cs/src/core/Allocator/AllocatorBase.cs
@@ -141,12 +141,14 @@ public abstract partial class AllocatorBase : IDisposable
public long SafeReadOnlyAddress;
///
- /// Head address
+ /// The lowest in-memory address in the log. While we hold the epoch this may be changed by other threads as part of ShiftHeadAddress,
+ /// but as long as an address was >= HeadAddress while we held the epoch, it cannot be actually evicted until we release the epoch.
///
public long HeadAddress;
///
- /// Safe head address
+ /// The lowest reliable in-memory address. This is set by OnPagesClosed as the highest address of the range it is starting to close;
+ /// thus it leads . As long as we hold the epoch, records above this address will not be evicted.
///
public long SafeHeadAddress;
@@ -156,12 +158,13 @@ public abstract partial class AllocatorBase : IDisposable
public long FlushedUntilAddress;
///
- /// Flushed until address
+ /// The highest address that has been closed by . It will catch up to
+ /// when a region is closed.
///
public long ClosedUntilAddress;
///
- /// Begin address
+ /// The lowest valid address in the log
///
public long BeginAddress;
@@ -251,11 +254,6 @@ public override string ToString()
///
internal IObserver> OnEvictionObserver;
- ///
- /// Observer for locked records getting evicted from memory (page closed)
- ///
- internal IObserver> OnLockEvictionObserver;
-
///
/// The "event" to be waited on for flush completion by the initiator of an operation
///
@@ -574,8 +572,7 @@ internal unsafe void ApplyDelta(DeltaLog log, long startPage, long endPage, long
// Clean up temporary bits when applying the delta log
ref var destInfo = ref GetInfo(destination);
- destInfo.ClearLocks();
- destInfo.Unseal();
+ destInfo.ClearBitsForDiskImages();
}
physicalAddress += size;
}
@@ -1377,8 +1374,6 @@ private void OnPagesClosedWorker()
long start = closeStartAddress > closePageAddress ? closeStartAddress : closePageAddress;
long end = closeEndAddress < closePageAddress + PageSize ? closeEndAddress : closePageAddress + PageSize;
- if (OnLockEvictionObserver is not null)
- MemoryPageScan(start, end, OnLockEvictionObserver);
if (OnEvictionObserver is not null)
MemoryPageScan(start, end, OnEvictionObserver);
@@ -1948,7 +1943,7 @@ private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, obje
{
if (errorCode != 0)
{
- logger?.LogError("AsyncGetFromDiskCallback error: {errorCode}", errorCode);
+ logger?.LogError($"AsyncGetFromDiskCallback error: {errorCode}");
}
var result = (AsyncGetFromDiskResult>)context;
diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs
index 3f02085d0..fe1479ba4 100644
--- a/cs/src/core/Allocator/GenericAllocator.cs
+++ b/cs/src/core/Allocator/GenericAllocator.cs
@@ -424,7 +424,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres
if (!src[i].info.Invalid)
{
var address = (flushPage << LogPageSizeBits) + i * recordSize;
- if (address < fuzzyStartLogicalAddress || !src[i].info.InNewVersion)
+ if (address < fuzzyStartLogicalAddress || !src[i].info.IsInNewVersion)
{
if (KeyHasObjects())
{
diff --git a/cs/src/core/Allocator/LockEvictionObserver.cs b/cs/src/core/Allocator/LockEvictionObserver.cs
deleted file mode 100644
index e50008533..000000000
--- a/cs/src/core/Allocator/LockEvictionObserver.cs
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT license.
-
-using System;
-
-namespace FASTER.core
-{
- ///
- /// Observer for page-lock evictions
- ///
- public class LockEvictionObserver : IObserver>
- {
- readonly FasterKV store;
-
- ///
- /// Class to manage lock eviction transfers to LockTable
- ///
- /// FASTER store instance
- public LockEvictionObserver(FasterKV store) => this.store = store;
-
- ///
- /// Subscriber to pages as they are getting evicted from main memory
- ///
- ///
- public void OnNext(IFasterScanIterator iter)
- {
- while (iter.GetNext(out RecordInfo info))
- {
- // Note: we do not have to worry about conflicts with other threads, because other operations
- // (data operations and lock and unlock) stop at HeadAddress.
- if (info.IsLocked)
- this.store.LockTable.TransferFromLogRecord(ref iter.GetKey(), info);
- }
- }
-
- ///
- /// OnCompleted
- ///
- public void OnCompleted() { }
-
- ///
- /// OnError
- ///
- ///
- public void OnError(Exception error) { }
- }
-}
diff --git a/cs/src/core/Allocator/MemoryPageScanIterator.cs b/cs/src/core/Allocator/MemoryPageScanIterator.cs
index e3aa62573..e0a026369 100644
--- a/cs/src/core/Allocator/MemoryPageScanIterator.cs
+++ b/cs/src/core/Allocator/MemoryPageScanIterator.cs
@@ -88,5 +88,8 @@ public ref Value GetValue()
{
return ref page[offset].value;
}
+
+ ///
+ public override string ToString() => $"BA {BeginAddress}, EA {EndAddress}, CA {CurrentAddress}, NA {NextAddress}, start {start}, end {end}, recSize {recordSize}, pageSA {pageStartAddress}";
}
}
diff --git a/cs/src/core/Async/AsyncOperationInternal.cs b/cs/src/core/Async/AsyncOperationInternal.cs
index d5e2cdc78..dbea18bac 100644
--- a/cs/src/core/Async/AsyncOperationInternal.cs
+++ b/cs/src/core/Async/AsyncOperationInternal.cs
@@ -33,22 +33,20 @@ internal interface IAsyncOperation
/// The instance the async call was made on
/// The for the pending operation
/// The for this operation
- /// The for this operation
/// The output to be populated by this operation
///
Status DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, out Output output);
+ out Output output);
///
/// Performs the asynchronous operation. This may be a wait for either a page-flush or a disk-read IO.
///
/// The instance the async call was made on
/// The for this operation
- /// The for this operation
/// The for the pending operation
/// The cancellation token, if any
///
ValueTask DoSlowOperation(FasterKV fasterKV, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pendingContext, CancellationToken token);
+ PendingContext pendingContext, CancellationToken token);
///
/// For RMW only, indicates whether there is a pending IO; no-op for other implementations.
@@ -64,19 +62,16 @@ internal sealed class AsyncOperationInternal _fasterKV;
readonly IFasterSession _fasterSession;
- readonly FasterExecutionContext _currentCtx;
TAsyncOperation _asyncOperation;
PendingContext _pendingContext;
int CompletionComputeStatus;
internal AsyncOperationInternal(FasterKV fasterKV, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pendingContext,
- ExceptionDispatchInfo exceptionDispatchInfo, TAsyncOperation asyncOperation)
+ PendingContext pendingContext, ExceptionDispatchInfo exceptionDispatchInfo, TAsyncOperation asyncOperation)
{
_exception = exceptionDispatchInfo;
_fasterKV = fasterKV;
_fasterSession = fasterSession;
- _currentCtx = currentCtx;
_pendingContext = pendingContext;
_asyncOperation = asyncOperation;
CompletionComputeStatus = Pending;
@@ -94,7 +89,7 @@ internal ValueTask CompleteAsync(CancellationToken token = default
_exception.Throw();
// DoSlowOperation returns a new XxxAsyncResult, which contains a new UpdateAsyncInternal with a pendingContext with a default flushEvent
- return _asyncOperation.DoSlowOperation(_fasterKV, _fasterSession, _currentCtx, _pendingContext, token);
+ return _asyncOperation.DoSlowOperation(_fasterKV, _fasterSession, _pendingContext, token);
}
internal TAsyncResult CompleteSync()
@@ -139,9 +134,9 @@ private bool TryCompleteAsyncState(out TAsyncResult asyncResult)
{
if (hasPendingIO)
{
- _currentCtx.ioPendingRequests.Remove(pendingId);
- _currentCtx.asyncPendingCount--;
- _currentCtx.pendingReads.Remove();
+ _fasterSession.Ctx.ioPendingRequests.Remove(pendingId);
+ _fasterSession.Ctx.asyncPendingCount--;
+ _fasterSession.Ctx.pendingReads.Remove();
}
}
}
@@ -156,7 +151,7 @@ private bool TryCompleteSync(out TAsyncResult asyncResult)
_fasterSession.UnsafeResumeThread();
try
{
- Status status = _asyncOperation.DoFastOperation(_fasterKV, ref _pendingContext, _fasterSession, _currentCtx, out Output output);
+ Status status = _asyncOperation.DoFastOperation(_fasterKV, ref _pendingContext, _fasterSession, out Output output);
if (!status.IsPending)
{
@@ -244,7 +239,7 @@ private static async ValueTask WaitForFlushCompletionAsyn
// This takes flushEvent as a parameter because we can't pass by ref to an async method.
private static async ValueTask<(AsyncIOContext diskRequest, ExceptionDispatchInfo edi)> WaitForFlushOrIOCompletionAsync(
- FasterKV @this, FasterExecutionContext currentCtx,
+ FasterKV @this, FasterExecutionContext sessionCtx,
CompletionEvent flushEvent, AsyncIOContext diskRequest, CancellationToken token)
{
ExceptionDispatchInfo exceptionDispatchInfo = default;
@@ -264,8 +259,8 @@ private static async ValueTask WaitForFlushCompletionAsyn
else
{
Debug.Assert(flushEvent.IsDefault());
- currentCtx.asyncPendingCount++;
- currentCtx.pendingReads.Add();
+ sessionCtx.asyncPendingCount++;
+ sessionCtx.pendingReads.Add();
using (token.Register(() => diskRequest.asyncOperation.TrySetCanceled()))
diskRequest = await diskRequest.asyncOperation.Task.WithCancellationAsync(token).ConfigureAwait(false);
diff --git a/cs/src/core/Async/CompletePendingAsync.cs b/cs/src/core/Async/CompletePendingAsync.cs
index 23802bfe0..237b67e58 100644
--- a/cs/src/core/Async/CompletePendingAsync.cs
+++ b/cs/src/core/Async/CompletePendingAsync.cs
@@ -16,38 +16,38 @@ public partial class FasterKV : FasterBase, IFasterKV
///
/// Check if at least one (sync) request is ready for CompletePending to operate on
///
- ///
+ ///
///
///
- internal static ValueTask ReadyToCompletePendingAsync(FasterExecutionContext currentCtx, CancellationToken token = default)
- => currentCtx.WaitPendingAsync(token);
+ internal static ValueTask ReadyToCompletePendingAsync(FasterExecutionContext sessionCtx, CancellationToken token = default)
+ => sessionCtx.WaitPendingAsync(token);
///
/// Complete outstanding pending operations that were issued synchronously
/// Async operations (e.g., ReadAsync) need to be completed individually
///
///
- internal async ValueTask CompletePendingAsync(IFasterSession fasterSession,
- FasterExecutionContext currentCtx, CancellationToken token,
- CompletedOutputIterator completedOutputs)
+ internal async ValueTask CompletePendingAsync(FasterSession fasterSession,
+ CancellationToken token, CompletedOutputIterator completedOutputs)
+ where FasterSession : IFasterSession
{
while (true)
{
fasterSession.UnsafeResumeThread();
try
{
- InternalCompletePendingRequests(currentCtx, currentCtx, fasterSession, completedOutputs);
+ InternalCompletePendingRequests(fasterSession, completedOutputs);
}
finally
{
fasterSession.UnsafeSuspendThread();
}
- await currentCtx.WaitPendingAsync(token).ConfigureAwait(false);
+ await fasterSession.Ctx.WaitPendingAsync(token).ConfigureAwait(false);
- if (currentCtx.HasNoPendingRequests) return;
+ if (fasterSession.Ctx.HasNoPendingRequests) return;
- InternalRefresh(currentCtx, fasterSession);
+ InternalRefresh(fasterSession);
Thread.Yield();
}
diff --git a/cs/src/core/Async/DeleteAsync.cs b/cs/src/core/Async/DeleteAsync.cs
index edd267f3b..63e602ccd 100644
--- a/cs/src/core/Async/DeleteAsync.cs
+++ b/cs/src/core/Async/DeleteAsync.cs
@@ -18,21 +18,21 @@ internal struct DeleteAsyncOperation : IAsyncOperation
public Status DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, out Output output)
+ out Output output)
{
OperationStatus internalStatus;
do
{
- internalStatus = fasterKV.InternalDelete(ref pendingContext.key.Get(), ref pendingContext.userContext, ref pendingContext, fasterSession, currentCtx, pendingContext.serialNum);
- } while (fasterKV.HandleImmediateRetryStatus(internalStatus, currentCtx, currentCtx, fasterSession, ref pendingContext));
+ internalStatus = fasterKV.InternalDelete(ref pendingContext.key.Get(), ref pendingContext.userContext, ref pendingContext, fasterSession, pendingContext.serialNum);
+ } while (fasterKV.HandleImmediateRetryStatus(internalStatus, fasterSession, ref pendingContext));
output = default;
return TranslateStatus(internalStatus);
}
///
public ValueTask> DoSlowOperation(FasterKV fasterKV, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pendingContext, CancellationToken token)
- => SlowDeleteAsync(fasterKV, fasterSession, currentCtx, pendingContext, token);
+ PendingContext pendingContext, CancellationToken token)
+ => SlowDeleteAsync(fasterKV, fasterSession, pendingContext, token);
///
public bool HasPendingIO => false;
@@ -55,11 +55,11 @@ internal DeleteAsyncResult(Status status)
}
internal DeleteAsyncResult(FasterKV fasterKV, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pendingContext, ExceptionDispatchInfo exceptionDispatchInfo)
+ PendingContext pendingContext, ExceptionDispatchInfo exceptionDispatchInfo)
{
this.Status = new(StatusCode.Pending);
updateAsyncInternal = new AsyncOperationInternal, DeleteAsyncResult>(
- fasterKV, fasterSession, currentCtx, pendingContext, exceptionDispatchInfo, new ());
+ fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new ());
}
/// Complete the Delete operation, issuing additional allocation asynchronously if needed. It is usually preferable to use Complete() instead of this.
@@ -75,25 +75,20 @@ public ValueTask> CompleteAsync(Cancel
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- internal ValueTask> DeleteAsync(IFasterSession fasterSession,
- FasterExecutionContext currentCtx, ref Key key, Context userContext, long serialNo, CancellationToken token = default)
+ internal ValueTask> DeleteAsync(FasterSession fasterSession,
+ ref Key key, Context userContext, long serialNo, CancellationToken token = default)
+ where FasterSession : IFasterSession
{
var pcontext = new PendingContext { IsAsync = true };
- return DeleteAsync(fasterSession, currentCtx, ref pcontext, ref key, userContext, serialNo, token);
- }
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- internal ValueTask> DeleteAsync(IFasterSession fasterSession,
- FasterExecutionContext currentCtx, ref PendingContext pcontext, ref Key key, Context userContext, long serialNo, CancellationToken token)
- {
fasterSession.UnsafeResumeThread();
try
{
OperationStatus internalStatus;
do
{
- internalStatus = InternalDelete(ref key, ref userContext, ref pcontext, fasterSession, currentCtx, serialNo);
- } while (HandleImmediateRetryStatus(internalStatus, currentCtx, currentCtx, fasterSession, ref pcontext));
+ internalStatus = InternalDelete(ref key, ref userContext, ref pcontext, fasterSession, serialNo);
+ } while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pcontext));
if (OperationStatusUtils.TryConvertToCompletedStatusCode(internalStatus, out Status status))
return new ValueTask>(new DeleteAsyncResult(status));
@@ -101,23 +96,22 @@ internal ValueTask> DeleteAsync= currentCtx.serialNum, "Operation serial numbers must be non-decreasing");
- currentCtx.serialNum = serialNo;
+ Debug.Assert(serialNo >= fasterSession.Ctx.serialNum, "Operation serial numbers must be non-decreasing");
+ fasterSession.Ctx.serialNum = serialNo;
fasterSession.UnsafeSuspendThread();
}
- return SlowDeleteAsync(this, fasterSession, currentCtx, pcontext, token);
+ return SlowDeleteAsync(this, fasterSession, pcontext, token);
}
private static async ValueTask> SlowDeleteAsync(
FasterKV @this,
IFasterSession fasterSession,
- FasterExecutionContext currentCtx,
PendingContext pcontext, CancellationToken token = default)
{
ExceptionDispatchInfo exceptionDispatchInfo = await WaitForFlushCompletionAsync(@this, pcontext.flushEvent, token).ConfigureAwait(false);
pcontext.flushEvent = default;
- return new DeleteAsyncResult(@this, fasterSession, currentCtx, pcontext, exceptionDispatchInfo);
+ return new DeleteAsyncResult(@this, fasterSession, pcontext, exceptionDispatchInfo);
}
}
}
diff --git a/cs/src/core/Async/RMWAsync.cs b/cs/src/core/Async/RMWAsync.cs
index 8d7b7e448..f0dfae835 100644
--- a/cs/src/core/Async/RMWAsync.cs
+++ b/cs/src/core/Async/RMWAsync.cs
@@ -21,11 +21,11 @@ internal struct RmwAsyncOperation : IAsyncOperation
public Status DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, out Output output)
+ out Output output)
{
Status status = !this.diskRequest.IsDefault()
- ? fasterKV.InternalCompletePendingRequestFromContext(currentCtx, currentCtx, fasterSession, this.diskRequest, ref pendingContext, out AsyncIOContext newDiskRequest)
- : fasterKV.CallInternalRMW(fasterSession, currentCtx, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output, pendingContext.userContext,
+ ? fasterKV.InternalCompletePendingRequestFromContext(fasterSession, this.diskRequest, ref pendingContext, out AsyncIOContext newDiskRequest)
+ : fasterKV.CallInternalRMW(fasterSession, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output, pendingContext.userContext,
pendingContext.serialNum, out newDiskRequest);
output = pendingContext.output;
this.diskRequest = newDiskRequest;
@@ -34,8 +34,8 @@ public Status DoFastOperation(FasterKV fasterKV, ref PendingContext<
///
public ValueTask> DoSlowOperation(FasterKV fasterKV, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pendingContext, CancellationToken token)
- => SlowRmwAsync(fasterKV, fasterSession, currentCtx, pendingContext, diskRequest, token);
+ PendingContext pendingContext, CancellationToken token)
+ => SlowRmwAsync(fasterKV, fasterSession, pendingContext, diskRequest, token);
///
public bool HasPendingIO => !this.diskRequest.IsDefault();
@@ -67,14 +67,13 @@ internal RmwAsyncResult(Status status, TOutput output, RecordMetadata recordMeta
}
internal RmwAsyncResult(FasterKV fasterKV, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pendingContext,
- AsyncIOContext diskRequest, ExceptionDispatchInfo exceptionDispatchInfo)
+ PendingContext pendingContext, AsyncIOContext diskRequest, ExceptionDispatchInfo exceptionDispatchInfo)
{
Status = new(StatusCode.Pending);
this.Output = default;
this.RecordMetadata = default;
updateAsyncInternal = new AsyncOperationInternal, RmwAsyncResult>(
- fasterKV, fasterSession, currentCtx, pendingContext, exceptionDispatchInfo, new (diskRequest));
+ fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new (diskRequest));
}
/// Complete the RMW operation, issuing additional (rare) I/O asynchronously if needed. It is usually preferable to use Complete() instead of this.
@@ -105,8 +104,9 @@ public ValueTask> CompleteAsync(Cancella
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- internal ValueTask> RmwAsync(IFasterSession fasterSession,
- FasterExecutionContext currentCtx, ref Key key, ref Input input, Context context, long serialNo, CancellationToken token = default)
+ internal ValueTask> RmwAsync(FasterSession fasterSession,
+ ref Key key, ref Input input, Context context, long serialNo, CancellationToken token = default)
+ where FasterSession : IFasterSession
{
var pcontext = new PendingContext { IsAsync = true };
var diskRequest = default(AsyncIOContext);
@@ -115,43 +115,40 @@ internal ValueTask> RmwAsync>(new RmwAsyncResult(status, output, new RecordMetadata(pcontext.recordInfo, pcontext.logicalAddress)));
}
finally
{
- Debug.Assert(serialNo >= currentCtx.serialNum, "Operation serial numbers must be non-decreasing");
- currentCtx.serialNum = serialNo;
+ Debug.Assert(serialNo >= fasterSession.Ctx.serialNum, "Operation serial numbers must be non-decreasing");
+ fasterSession.Ctx.serialNum = serialNo;
fasterSession.UnsafeSuspendThread();
}
- return SlowRmwAsync(this, fasterSession, currentCtx, pcontext, diskRequest, token);
+ return SlowRmwAsync(this, fasterSession, pcontext, diskRequest, token);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private Status CallInternalRMW(IFasterSession fasterSession,
- FasterExecutionContext currentCtx, ref PendingContext pcontext, ref Key key, ref Input input, ref Output output, Context context, long serialNo,
- out AsyncIOContext diskRequest)
+ private Status CallInternalRMW(IFasterSession fasterSession, ref PendingContext pcontext,
+ ref Key key, ref Input input, ref Output output, Context context, long serialNo, out AsyncIOContext diskRequest)
{
- diskRequest = default;
OperationStatus internalStatus;
do
- internalStatus = InternalRMW(ref key, ref input, ref output, ref context, ref pcontext, fasterSession, currentCtx, serialNo);
- while (HandleImmediateRetryStatus(internalStatus, currentCtx, currentCtx, fasterSession, ref pcontext));
+ internalStatus = InternalRMW(ref key, ref input, ref output, ref context, ref pcontext, fasterSession, serialNo);
+ while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pcontext));
- return HandleOperationStatus(currentCtx, ref pcontext, internalStatus, out diskRequest);
+ return HandleOperationStatus(fasterSession.Ctx, ref pcontext, internalStatus, out diskRequest);
}
private static async ValueTask> SlowRmwAsync(
FasterKV @this, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pcontext,
- AsyncIOContext diskRequest, CancellationToken token = default)
+ PendingContext pcontext, AsyncIOContext diskRequest, CancellationToken token = default)
{
ExceptionDispatchInfo exceptionDispatchInfo;
- (diskRequest, exceptionDispatchInfo) = await WaitForFlushOrIOCompletionAsync(@this, currentCtx, pcontext.flushEvent, diskRequest, token);
+ (diskRequest, exceptionDispatchInfo) = await WaitForFlushOrIOCompletionAsync(@this, fasterSession.Ctx, pcontext.flushEvent, diskRequest, token);
pcontext.flushEvent = default;
- return new RmwAsyncResult(@this, fasterSession, currentCtx, pcontext, diskRequest, exceptionDispatchInfo);
+ return new RmwAsyncResult(@this, fasterSession, pcontext, diskRequest, exceptionDispatchInfo);
}
}
}
diff --git a/cs/src/core/Async/ReadAsync.cs b/cs/src/core/Async/ReadAsync.cs
index 8aa84636a..656dcd4f5 100644
--- a/cs/src/core/Async/ReadAsync.cs
+++ b/cs/src/core/Async/ReadAsync.cs
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
-using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
@@ -27,12 +26,12 @@ internal ReadAsyncOperation(AsyncIOContext diskRequest, ref ReadOpti
public ReadAsyncResult CreateCompletedResult(Status status, Output output, RecordMetadata recordMetadata) => new(status, output, recordMetadata);
///
- public Status DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, out Output output)
+ public Status DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext,
+ IFasterSession fasterSession, out Output output)
{
Status status = !this.diskRequest.IsDefault()
- ? fasterKV.InternalCompletePendingRequestFromContext(currentCtx, currentCtx, fasterSession, this.diskRequest, ref pendingContext, out var newDiskRequest)
- : fasterKV.CallInternalRead(fasterSession, currentCtx, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output,
+ ? fasterKV.InternalCompletePendingRequestFromContext(fasterSession, this.diskRequest, ref pendingContext, out var newDiskRequest)
+ : fasterKV.CallInternalRead(fasterSession, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output,
ref this.readOptions, pendingContext.userContext, pendingContext.serialNum, out newDiskRequest);
output = pendingContext.output;
this.diskRequest = newDiskRequest;
@@ -41,8 +40,8 @@ public Status DoFastOperation(FasterKV fasterKV, ref PendingContext<
///
public ValueTask> DoSlowOperation(FasterKV fasterKV, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pendingContext, CancellationToken token)
- => SlowReadAsync(fasterKV, fasterSession, currentCtx, pendingContext, this.readOptions, this.diskRequest, token);
+ PendingContext pendingContext, CancellationToken token)
+ => SlowReadAsync(fasterKV, fasterSession, pendingContext, this.readOptions, this.diskRequest, token);
///
public bool HasPendingIO => !this.diskRequest.IsDefault();
@@ -72,15 +71,14 @@ internal ReadAsyncResult(Status status, TOutput output, RecordMetadata recordMet
this.updateAsyncInternal = default;
}
- internal ReadAsyncResult(FasterKV fasterKV, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pendingContext,
- ref ReadOptions readOptions, AsyncIOContext diskRequest, ExceptionDispatchInfo exceptionDispatchInfo)
+ internal ReadAsyncResult(FasterKV fasterKV, IFasterSession fasterSession, PendingContext pendingContext,
+ ref ReadOptions readOptions, AsyncIOContext diskRequest, ExceptionDispatchInfo exceptionDispatchInfo)
{
Status = new(StatusCode.Pending);
this.Output = default;
this.RecordMetadata = default;
updateAsyncInternal = new AsyncOperationInternal, ReadAsyncResult>(
- fasterKV, fasterSession, currentCtx, pendingContext, exceptionDispatchInfo, new ReadAsyncOperation(diskRequest, ref readOptions));
+ fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new ReadAsyncOperation(diskRequest, ref readOptions));
}
/// Complete the RMW operation, issuing additional (rare) I/O synchronously if needed.
@@ -105,11 +103,10 @@ internal ReadAsyncResult(FasterKV fasterKV, IFasterSession> ReadAsync(IFasterSession fasterSession,
- FasterExecutionContext currentCtx,
ref Key key, ref Input input, ref ReadOptions readOptions, Context context, long serialNo, CancellationToken token, bool noKey = false)
{
var pcontext = new PendingContext { IsAsync = true };
- var operationFlags = PendingContext.GetOperationFlags(MergeReadFlags(currentCtx.ReadFlags, readOptions.ReadFlags), noKey);
+ var operationFlags = PendingContext.GetOperationFlags(MergeReadFlags(fasterSession.Ctx.ReadFlags, readOptions.ReadFlags), noKey);
pcontext.SetOperationFlags(operationFlags, readOptions.StopAddress);
var diskRequest = default(AsyncIOContext);
@@ -117,42 +114,41 @@ internal ValueTask> ReadAsync>(new ReadAsyncResult(status, output, new RecordMetadata(pcontext.recordInfo, pcontext.logicalAddress)));
}
finally
{
- Debug.Assert(serialNo >= currentCtx.serialNum, "Operation serial numbers must be non-decreasing");
- currentCtx.serialNum = serialNo;
+ Debug.Assert(serialNo >= fasterSession.Ctx.serialNum, "Operation serial numbers must be non-decreasing");
+ fasterSession.Ctx.serialNum = serialNo;
fasterSession.UnsafeSuspendThread();
}
- return SlowReadAsync(this, fasterSession, currentCtx, pcontext, readOptions, diskRequest, token);
+ return SlowReadAsync(this, fasterSession, pcontext, readOptions, diskRequest, token);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private Status CallInternalRead(IFasterSession fasterSession,
- FasterExecutionContext currentCtx, ref PendingContext pcontext, ref Key key, ref Input input, ref Output output, ref ReadOptions readOptions, Context context, long serialNo,
- out AsyncIOContext diskRequest)
+ ref PendingContext pcontext, ref Key key, ref Input input, ref Output output, ref ReadOptions readOptions, Context context, long serialNo,
+ out AsyncIOContext diskRequest)
{
OperationStatus internalStatus;
do
- internalStatus = InternalRead(ref key, ref input, ref output, readOptions.StartAddress, ref context, ref pcontext, fasterSession, currentCtx, serialNo);
- while (HandleImmediateRetryStatus(internalStatus, currentCtx, currentCtx, fasterSession, ref pcontext));
+ internalStatus = InternalRead(ref key, ref input, ref output, readOptions.StartAddress, ref context, ref pcontext, fasterSession, serialNo);
+ while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pcontext));
- return HandleOperationStatus(currentCtx, ref pcontext, internalStatus, out diskRequest);
+ return HandleOperationStatus(fasterSession.Ctx, ref pcontext, internalStatus, out diskRequest);
}
private static async ValueTask> SlowReadAsync(
FasterKV @this, IFasterSession fasterSession,
- FasterExecutionContext currentCtx,
PendingContext pcontext, ReadOptions readOptions, AsyncIOContext diskRequest, CancellationToken token = default)
{
ExceptionDispatchInfo exceptionDispatchInfo;
- (diskRequest, exceptionDispatchInfo) = await WaitForFlushOrIOCompletionAsync(@this, currentCtx, pcontext.flushEvent, diskRequest, token);
+ (diskRequest, exceptionDispatchInfo) = await WaitForFlushOrIOCompletionAsync(@this, fasterSession.Ctx, pcontext.flushEvent, diskRequest, token);
pcontext.flushEvent = default;
- return new ReadAsyncResult(@this, fasterSession, currentCtx, pcontext, ref readOptions, diskRequest, exceptionDispatchInfo);
+ return new ReadAsyncResult(@this, fasterSession, pcontext, ref readOptions, diskRequest, exceptionDispatchInfo);
}
}
}
diff --git a/cs/src/core/Async/UpsertAsync.cs b/cs/src/core/Async/UpsertAsync.cs
index f94c0e751..f4b175796 100644
--- a/cs/src/core/Async/UpsertAsync.cs
+++ b/cs/src/core/Async/UpsertAsync.cs
@@ -18,22 +18,22 @@ internal struct UpsertAsyncOperation : IAsyncOperation
public Status DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, out Output output)
+ out Output output)
{
output = default;
OperationStatus internalStatus;
do
{
internalStatus = fasterKV.InternalUpsert(ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.value.Get(), ref output, ref pendingContext.userContext, ref pendingContext,
- fasterSession, currentCtx, pendingContext.serialNum);
- } while (fasterKV.HandleImmediateRetryStatus(internalStatus, currentCtx, currentCtx, fasterSession, ref pendingContext));
+ fasterSession, pendingContext.serialNum);
+ } while (fasterKV.HandleImmediateRetryStatus(internalStatus, fasterSession, ref pendingContext));
return TranslateStatus(internalStatus);
}
///
public ValueTask> DoSlowOperation(FasterKV fasterKV, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pendingContext, CancellationToken token)
- => SlowUpsertAsync(fasterKV, fasterSession, currentCtx, pendingContext, token);
+ PendingContext pendingContext, CancellationToken token)
+ => SlowUpsertAsync(fasterKV, fasterSession, pendingContext, token);
///
public bool HasPendingIO => false;
@@ -64,13 +64,13 @@ internal UpsertAsyncResult(Status status, TOutput output, RecordMetadata recordM
}
internal UpsertAsyncResult(FasterKV fasterKV, IFasterSession fasterSession,
- FasterExecutionContext currentCtx, PendingContext pendingContext, ExceptionDispatchInfo exceptionDispatchInfo)
+ PendingContext pendingContext, ExceptionDispatchInfo exceptionDispatchInfo)
{
this.Status = new(StatusCode.Pending);
this.Output = default;
this.RecordMetadata = default;
updateAsyncInternal = new AsyncOperationInternal, UpsertAsyncResult>(
- fasterKV, fasterSession, currentCtx, pendingContext, exceptionDispatchInfo, new ());
+ fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new ());
}
/// Complete the Upsert operation, issuing additional allocation asynchronously if needed. It is usually preferable to use Complete() instead of this.
@@ -100,17 +100,11 @@ public ValueTask> CompleteAsync(Cance
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- internal ValueTask> UpsertAsync(IFasterSession fasterSession,
- FasterExecutionContext currentCtx, ref Key key, ref Input input, ref Value value, Context userContext, long serialNo, CancellationToken token = default)
+ internal ValueTask> UpsertAsync(FasterSession fasterSession,
+ ref Key key, ref Input input, ref Value value, Context userContext, long serialNo, CancellationToken token = default)
+ where FasterSession : IFasterSession
{
var pcontext = new PendingContext { IsAsync = true };
- return UpsertAsync(fasterSession, currentCtx, ref pcontext, ref key, ref input, ref value, userContext, serialNo, token);
- }
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- private ValueTask> UpsertAsync(IFasterSession fasterSession,
- FasterExecutionContext currentCtx, ref PendingContext pcontext, ref Key key, ref Input input, ref Value value, Context userContext, long serialNo, CancellationToken token)
- {
Output output = default;
fasterSession.UnsafeResumeThread();
@@ -119,8 +113,8 @@ private ValueTask> UpsertAsync>(new UpsertAsyncResult(status, output, new RecordMetadata(pcontext.recordInfo, pcontext.logicalAddress)));
@@ -128,23 +122,21 @@ private ValueTask> UpsertAsync= currentCtx.serialNum, "Operation serial numbers must be non-decreasing");
- currentCtx.serialNum = serialNo;
+ Debug.Assert(serialNo >= fasterSession.Ctx.serialNum, "Operation serial numbers must be non-decreasing");
+ fasterSession.Ctx.serialNum = serialNo;
fasterSession.UnsafeSuspendThread();
}
- return SlowUpsertAsync(this, fasterSession, currentCtx, pcontext, token);
+ return SlowUpsertAsync(this, fasterSession, pcontext, token);
}
private static async ValueTask> SlowUpsertAsync(
- FasterKV @this,
- IFasterSession fasterSession,
- FasterExecutionContext currentCtx,
+ FasterKV @this, IFasterSession fasterSession,
PendingContext pcontext, CancellationToken token = default)
{
ExceptionDispatchInfo exceptionDispatchInfo = await WaitForFlushCompletionAsync(@this, pcontext.flushEvent, token).ConfigureAwait(false);
pcontext.flushEvent = default;
- return new UpsertAsyncResult(@this, fasterSession, currentCtx, pcontext, exceptionDispatchInfo);
+ return new UpsertAsyncResult(@this, fasterSession, pcontext, exceptionDispatchInfo);
}
}
}
diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs
index 781e6a472..39ad23159 100644
--- a/cs/src/core/ClientSession/ClientSession.cs
+++ b/cs/src/core/ClientSession/ClientSession.cs
@@ -106,11 +106,15 @@ internal ClientSession(
SessionVariableLengthStructSettings sessionVariableLengthStructSettings,
ILoggerFactory loggerFactory = null)
{
- this.lContext = new(this);
this.bContext = new(this);
- this.luContext = new(this);
this.uContext = new(this);
+ if (fht.LockTable.IsEnabled)
+ {
+ this.lContext = new(this);
+ this.luContext = new(this);
+ }
+
this.loggerFactory = loggerFactory;
this.logger = loggerFactory?.CreateLogger($"ClientSession-{GetHashCode():X8}");
this.fht = fht;
@@ -228,12 +232,28 @@ public void Dispose()
///
/// Return a new interface to Faster operations that supports manual locking and epoch control.
///
- public LockableUnsafeContext LockableUnsafeContext => luContext;
+ public LockableUnsafeContext LockableUnsafeContext
+ {
+ get
+ {
+ if (!this.fht.LockTable.IsEnabled)
+ throw new FasterException($"LockableUnsafeContext requires {nameof(LockingMode.Standard)}");
+ return luContext;
+ }
+ }
///
/// Return a session wrapper that supports manual locking.
///
- public LockableContext LockableContext => lContext;
+ public LockableContext LockableContext
+ {
+ get
+ {
+ if (!this.fht.LockTable.IsEnabled)
+ throw new FasterException($"LockableContext requires {nameof(LockingMode.Standard)}");
+ return lContext;
+ }
+ }
///
/// Return a session wrapper struct that passes through to client session
@@ -248,7 +268,7 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
UnsafeResumeThread();
try
{
- return fht.ContextRead(ref key, ref input, ref output, userContext, FasterSession, serialNo, ctx);
+ return fht.ContextRead(ref key, ref input, ref output, userContext, FasterSession, serialNo);
}
finally
{
@@ -297,7 +317,7 @@ public Status Read(ref Key key, ref Input input, ref Output output, ref ReadOpti
UnsafeResumeThread();
try
{
- return fht.ContextRead(ref key, ref input, ref output, ref readOptions, out recordMetadata, userContext, FasterSession, serialNo, ctx);
+ return fht.ContextRead(ref key, ref input, ref output, ref readOptions, out recordMetadata, userContext, FasterSession, serialNo);
}
finally
{
@@ -312,7 +332,7 @@ public Status ReadAtAddress(ref Input input, ref Output output, ref ReadOptions
UnsafeResumeThread();
try
{
- return fht.ContextReadAtAddress(ref input, ref output, ref readOptions, userContext, FasterSession, serialNo, ctx);
+ return fht.ContextReadAtAddress(ref input, ref output, ref readOptions, userContext, FasterSession, serialNo);
}
finally
{
@@ -325,7 +345,7 @@ public Status ReadAtAddress(ref Input input, ref Output output, ref ReadOptions
public ValueTask.ReadAsyncResult> ReadAsync(ref Key key, ref Input input, Context userContext = default, long serialNo = 0, CancellationToken cancellationToken = default)
{
ReadOptions readOptions = default;
- return fht.ReadAsync(this.FasterSession, this.ctx, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
+ return fht.ReadAsync(this.FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
}
///
@@ -333,7 +353,7 @@ public ValueTask.ReadAsyncResult> R
public ValueTask.ReadAsyncResult> ReadAsync(Key key, Input input, Context context = default, long serialNo = 0, CancellationToken token = default)
{
ReadOptions readOptions = default;
- return fht.ReadAsync(this.FasterSession, this.ctx, ref key, ref input, ref readOptions, context, serialNo, token);
+ return fht.ReadAsync(this.FasterSession, ref key, ref input, ref readOptions, context, serialNo, token);
}
///
@@ -342,7 +362,7 @@ public ValueTask.ReadAsyncResult> R
{
Input input = default;
ReadOptions readOptions = default;
- return fht.ReadAsync(this.FasterSession, this.ctx, ref key, ref input, ref readOptions, userContext, serialNo, token);
+ return fht.ReadAsync(this.FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, token);
}
///
@@ -351,14 +371,14 @@ public ValueTask.ReadAsyncResult> R
{
Input input = default;
ReadOptions readOptions = default;
- return fht.ReadAsync(this.FasterSession, this.ctx, ref key, ref input, ref readOptions, context, serialNo, token);
+ return fht.ReadAsync(this.FasterSession, ref key, ref input, ref readOptions, context, serialNo, token);
}
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask.ReadAsyncResult> ReadAsync(ref Key key, ref Input input, ref ReadOptions readOptions,
Context userContext = default, long serialNo = 0, CancellationToken cancellationToken = default)
- => fht.ReadAsync(this.FasterSession, this.ctx, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
+ => fht.ReadAsync(this.FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -366,7 +386,7 @@ public ValueTask.ReadAsyncResult> R
Context userContext = default, long serialNo = 0, CancellationToken cancellationToken = default)
{
Key key = default;
- return fht.ReadAsync(this.FasterSession, this.ctx, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken, noKey: true);
+ return fht.ReadAsync(this.FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken, noKey: true);
}
///
@@ -385,7 +405,7 @@ public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref O
UnsafeResumeThread();
try
{
- return fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, userContext, FasterSession, serialNo, ctx);
+ return fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, userContext, FasterSession, serialNo);
}
finally
{
@@ -400,7 +420,7 @@ public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref O
UnsafeResumeThread();
try
{
- return fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, out recordMetadata, userContext, FasterSession, serialNo, ctx);
+ return fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, out recordMetadata, userContext, FasterSession, serialNo);
}
finally
{
@@ -429,7 +449,7 @@ public ValueTask.UpsertAsyncResult>
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask.UpsertAsyncResult> UpsertAsync(ref Key key, ref Input input, ref Value desiredValue, Context userContext = default, long serialNo = 0, CancellationToken token = default)
- => fht.UpsertAsync(this.FasterSession, this.ctx, ref key, ref input, ref desiredValue, userContext, serialNo, token);
+ => fht.UpsertAsync(this.FasterSession, ref key, ref input, ref desiredValue, userContext, serialNo, token);
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -453,7 +473,7 @@ public Status RMW(ref Key key, ref Input input, ref Output output, out RecordMet
UnsafeResumeThread();
try
{
- return fht.ContextRMW(ref key, ref input, ref output, out recordMetadata, userContext, FasterSession, serialNo, ctx);
+ return fht.ContextRMW(ref key, ref input, ref output, out recordMetadata, userContext, FasterSession, serialNo);
}
finally
{
@@ -488,7 +508,7 @@ public Status RMW(Key key, Input input, Context userContext = default, long seri
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask.RmwAsyncResult> RMWAsync(ref Key key, ref Input input, Context context = default, long serialNo = 0, CancellationToken token = default)
- => fht.RmwAsync(this.FasterSession, this.ctx, ref key, ref input, context, serialNo, token);
+ => fht.RmwAsync(this.FasterSession, ref key, ref input, context, serialNo, token);
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -502,7 +522,7 @@ public Status Delete(ref Key key, Context userContext = default, long serialNo =
UnsafeResumeThread();
try
{
- return fht.ContextDelete(ref key, userContext, FasterSession, serialNo, ctx);
+ return fht.ContextDelete(ref key, userContext, FasterSession, serialNo);
}
finally
{
@@ -518,7 +538,7 @@ public Status Delete(Key key, Context userContext = default, long serialNo = 0)
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask.DeleteAsyncResult> DeleteAsync(ref Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default)
- => fht.DeleteAsync(this.FasterSession, this.ctx, ref key, userContext, serialNo, token);
+ => fht.DeleteAsync(this.FasterSession, ref key, userContext, serialNo, token);
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -529,10 +549,49 @@ public ValueTask.DeleteAsyncResult>
public void Refresh()
{
UnsafeResumeThread();
- fht.InternalRefresh(ctx, FasterSession);
+ fht.InternalRefresh(FasterSession);
UnsafeSuspendThread();
}
+ ///
+ public void ResetModified(ref Key key)
+ {
+ UnsafeResumeThread();
+ try
+ {
+ UnsafeResetModified(ref key);
+ }
+ finally
+ {
+ UnsafeSuspendThread();
+ }
+ }
+
+ ///
+ public bool NeedKeyLockCode => this.fht.LockTable.IsEnabled && this.fht.LockTable.NeedKeyLockCode;
+
+ ///
+ public long GetLockCode(ref Key key, out long keyHash)
+ {
+ keyHash = this.fht.comparer.GetHashCode64(ref key);
+ return this.fht.LockTable.IsEnabled ? this.fht.LockTable.GetLockCode(ref key, keyHash) : keyHash;
+ }
+
+ ///
+ public long GetLockCode(ref Key key, long keyHash) => this.fht.LockTable.IsEnabled ? this.fht.LockTable.GetLockCode(ref key, keyHash) : keyHash;
+
+ ///
+ public int CompareLockCodes(TLockableKey key1, TLockableKey key2) where TLockableKey : ILockableKey => fht.LockTable.CompareLockCodes(key1, key2);
+
+ ///
+ public int CompareLockCodes(ref TLockableKey key1, ref TLockableKey key2) where TLockableKey : ILockableKey => fht.LockTable.CompareLockCodes(ref key1, ref key2);
+
+ ///
+ public void SortLockCodes(TLockableKey[] keys) where TLockableKey : ILockableKey => fht.LockTable.SortLockCodes(keys);
+
+ ///
+ public void SortLockCodes(TLockableKey[] keys, int start, int count) where TLockableKey : ILockableKey => fht.LockTable.SortLockCodes(keys, start, count);
+
#endregion IFasterContext
#region Pending Operations
@@ -601,7 +660,7 @@ internal bool UnsafeCompletePending(FasterSession fasterSession,
where FasterSession : IFasterSession
{
var requestedOutputs = getOutputs ? this.completedOutputs : default;
- var result = fht.InternalCompletePending(ctx, fasterSession, wait, requestedOutputs);
+ var result = fht.InternalCompletePending(fasterSession, wait, requestedOutputs);
if (spinWaitForCommit)
{
if (wait != true)
@@ -610,10 +669,10 @@ internal bool UnsafeCompletePending(FasterSession fasterSession,
}
do
{
- fht.InternalCompletePending(ctx, fasterSession, wait, requestedOutputs);
+ fht.InternalCompletePending(fasterSession, wait, requestedOutputs);
if (fht.InRestPhase())
{
- fht.InternalCompletePending(ctx, fasterSession, wait, requestedOutputs);
+ fht.InternalCompletePending(fasterSession, wait, requestedOutputs);
return true;
}
} while (wait);
@@ -641,7 +700,7 @@ private async ValueTask CompletePendingAsync(bool getOutputs, bool waitForCommit
throw new NotSupportedException("Async operations not supported over protected epoch");
// Complete all pending operations on session
- await fht.CompletePendingAsync(this.FasterSession, this.ctx, token, getOutputs ? this.completedOutputs : null).ConfigureAwait(false);
+ await fht.CompletePendingAsync(this.FasterSession, token, getOutputs ? this.completedOutputs : null).ConfigureAwait(false);
// Wait for commit if necessary
if (waitForCommit)
@@ -668,26 +727,12 @@ public async ValueTask ReadyToCompletePendingAsync(CancellationToken token = def
#region Other Operations
- ///
- public void ResetModified(ref Key key)
- {
- UnsafeResumeThread();
- try
- {
- UnsafeResetModified(ref key);
- }
- finally
- {
- UnsafeSuspendThread();
- }
- }
-
internal void UnsafeResetModified(ref Key key)
{
OperationStatus status;
do
status = fht.InternalModifiedBitOperation(ref key, out _);
- while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession));
+ while (fht.HandleImmediateNonPendingRetryStatus(status, FasterSession));
}
///
@@ -713,7 +758,7 @@ internal bool UnsafeIsModified(ref Key key)
OperationStatus status;
do
status = fht.InternalModifiedBitOperation(ref key, out modifiedInfo, false);
- while (fht.HandleImmediateNonPendingRetryStatus(status, ctx, FasterSession));
+ while (fht.HandleImmediateNonPendingRetryStatus(status, FasterSession));
return modifiedInfo.Modified;
}
@@ -823,7 +868,7 @@ internal OperationStatus CompactionCopyToTail(ref Key key, ref Input input, ref
UnsafeResumeThread();
try
{
- return fht.InternalCopyToTailForCompaction(ref key, ref input, ref desiredValue, ref output, untilAddress, actualAddress, FasterSession, ctx);
+ return fht.InternalCopyToTailForCompaction(ref key, ref input, ref desiredValue, ref output, untilAddress, actualAddress, FasterSession);
}
finally
{
@@ -846,7 +891,7 @@ internal Status ContainsKeyInMemory(ref Key key, out long logicalAddress, long f
UnsafeResumeThread();
try
{
- return fht.InternalContainsKeyInMemory(ref key, ctx, FasterSession, out logicalAddress, fromAddress);
+ return fht.InternalContainsKeyInMemory(ref key, FasterSession, out logicalAddress, fromAddress);
}
finally
{
@@ -877,7 +922,7 @@ internal void UnsafeResumeThread()
// We do not track any "acquired" state here; if someone mixes calls between safe and unsafe contexts, they will
// get the "trying to acquire already-acquired epoch" error.
fht.epoch.Resume();
- fht.InternalRefresh(ctx, FasterSession);
+ fht.InternalRefresh(FasterSession);
}
///
@@ -908,7 +953,7 @@ internal bool IsInPreparePhase()
#region IFasterSession
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- internal bool InPlaceUpdater(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status)
+ internal bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status)
{
recordInfo.SetDirty();
@@ -931,8 +976,8 @@ internal bool InPlaceUpdater(ref Key key, ref Input input, ref Output output, re
if (rmwInfo.Action == RMWAction.ExpireAndResume)
{
// This inserts the tombstone if appropriate
- return this.fht.ReinitializeExpiredRecord(ref key, ref input, ref value, ref output, ref recordInfo, ref rmwInfo,
- rmwInfo.Address, this.ctx, this.FasterSession, isIpu: true, out status);
+ return this.fht.ReinitializeExpiredRecord(ref key, ref input, ref value, ref output, ref recordInfo,
+ ref rmwInfo, rmwInfo.Address, this.FasterSession, isIpu: true, out status);
}
if (rmwInfo.Action == RMWAction.ExpireAndStop)
{
@@ -956,26 +1001,34 @@ public InternalFasterSession(ClientSession _clientSession.fht.DisableEphemeralLocking;
-
public bool IsManualLocking => false;
- public SessionType SessionType => SessionType.BasicContext;
- #endregion IFunctions - Optional features supported
-
#region IFunctions - Reads
public bool SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, ref ReadInfo readInfo)
=> _clientSession.functions.SingleReader(ref key, ref input, ref value, ref dst, ref readInfo);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public bool ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, ref ReadInfo readInfo)
+ public bool ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, ref ReadInfo readInfo, out EphemeralLockResult lockResult)
{
- if (_clientSession.functions.ConcurrentReader(ref key, ref input, ref value, ref dst, ref readInfo))
- return true;
- if (readInfo.Action == ReadAction.Expire)
- recordInfo.Tombstone = true;
- return false;
+ lockResult = EphemeralLockResult.Success;
+ return _clientSession.fht.DoEphemeralLocking
+ ? ConcurrentReaderLockEphemeral(ref key, ref input, ref value, ref dst, ref recordInfo, ref readInfo, out lockResult)
+ : _clientSession.functions.ConcurrentReader(ref key, ref input, ref value, ref dst, ref readInfo);
+ }
+
+ public bool ConcurrentReaderLockEphemeral(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, ref ReadInfo readInfo, out EphemeralLockResult lockResult)
+ {
+ lockResult = recordInfo.TryLockShared() ? EphemeralLockResult.Success : EphemeralLockResult.Failed;
+ if (lockResult == EphemeralLockResult.Failed)
+ return false;
+ try
+ {
+ return _clientSession.functions.ConcurrentReader(ref key, ref input, ref value, ref dst, ref readInfo);
+ }
+ finally
+ {
+ recordInfo.UnlockShared();
+ }
}
public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status, RecordMetadata recordMetadata)
@@ -989,19 +1042,72 @@ public bool SingleWriter(ref Key key, ref Input input, ref Value src, ref Value
=> _clientSession.functions.SingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, WriteReason reason)
+ public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, WriteReason reason)
+ {
+ if (_clientSession.fht.DoEphemeralLocking)
+ PostSingleWriterLockEphemeral(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, ref upsertInfo, reason);
+ else
+ PostSingleWriterNoLock(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, ref upsertInfo, reason);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void PostSingleWriterNoLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, WriteReason reason)
{
recordInfo.SetDirtyAndModified();
_clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason);
}
+ public void PostSingleWriterLockEphemeral(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, WriteReason reason)
+ {
+ try
+ {
+ PostSingleWriterNoLock(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, ref upsertInfo, reason);
+ }
+ finally
+ {
+ if (reason == WriteReason.Upsert)
+ recordInfo.UnlockExclusive();
+ else if (recordInfo.IsLockedShared) // readcache records are not locked
+ recordInfo.UnlockShared();
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, out EphemeralLockResult lockResult)
+ {
+ lockResult = EphemeralLockResult.Success;
+ return _clientSession.fht.DoEphemeralLocking
+ ? ConcurrentWriterLockEphemeral(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, ref upsertInfo, out lockResult)
+ : ConcurrentWriterNoLock(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, ref upsertInfo);
+ }
+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo)
+ private bool ConcurrentWriterNoLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo)
{
+ if (!_clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo))
+ return false;
recordInfo.SetDirtyAndModified();
+ return true;
+ }
- // Note: KeyIndexes do not need notification of in-place updates because the key does not change.
- return _clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo);
+ public bool ConcurrentWriterLockEphemeral(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, out EphemeralLockResult lockResult)
+ {
+ lockResult = recordInfo.TryLockExclusive() ? EphemeralLockResult.Success : EphemeralLockResult.Failed;
+ if (lockResult == EphemeralLockResult.Failed)
+ return false;
+ try
+ {
+ if (ConcurrentWriterNoLock(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, ref upsertInfo))
+ return true;
+ if (upsertInfo.Action != UpsertAction.CancelOperation)
+ lockResult = EphemeralLockResult.HoldForSeal;
+ return false;
+ }
+ finally
+ {
+ if (lockResult != EphemeralLockResult.HoldForSeal)
+ recordInfo.UnlockExclusive();
+ }
}
#endregion IFunctions - Upserts
@@ -1016,11 +1122,33 @@ public bool InitialUpdater(ref Key key, ref Input input, ref Value value, ref Ou
=> _clientSession.functions.InitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
+ public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
+ {
+ if (_clientSession.fht.DoEphemeralLocking)
+ PostInitialUpdaterLockEphemeral(ref key, ref input, ref value, ref output, ref recordInfo, ref rmwInfo);
+ else
+ PostInitialUpdaterNoLock(ref key, ref input, ref value, ref output, ref recordInfo, ref rmwInfo);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void PostInitialUpdaterNoLock(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
{
recordInfo.SetDirtyAndModified();
_clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref rmwInfo);
}
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void PostInitialUpdaterLockEphemeral(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
+ {
+ try
+ {
+ PostInitialUpdaterNoLock(ref key, ref input, ref value, ref output, ref recordInfo, ref rmwInfo);
+ }
+ finally
+ {
+ recordInfo.UnlockExclusive();
+ }
+ }
#endregion InitialUpdater
#region CopyUpdater
@@ -1033,19 +1161,78 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
=> _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
+ public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
+ {
+ if (_clientSession.fht.DoEphemeralLocking)
+ PostCopyUpdaterLockEphemeral(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, ref rmwInfo);
+ else
+ PostCopyUpdaterNoLock(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, ref rmwInfo);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void PostCopyUpdaterNoLock(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
{
recordInfo.SetDirtyAndModified();
_clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo);
}
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void PostCopyUpdaterLockEphemeral(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
+ {
+ try
+ {
+ PostCopyUpdaterNoLock(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, ref rmwInfo);
+ }
+ finally
+ {
+ recordInfo.UnlockExclusive();
+ }
+ }
#endregion CopyUpdater
#region InPlaceUpdater
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status)
+ public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status,
+ out EphemeralLockResult lockResult)
{
+ lockResult = EphemeralLockResult.Success;
+ return _clientSession.fht.DoEphemeralLocking
+ ? InPlaceUpdaterLockEphemeral(ref key, ref input, ref value, ref output, ref recordInfo, ref rmwInfo, out status, out lockResult)
+ : InPlaceUpdaterNoLock(ref key, ref input, ref value, ref output, ref recordInfo, ref rmwInfo, out status);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status)
+ {
+ if (!_clientSession.InPlaceUpdater(ref key, ref input, ref value, ref output, ref recordInfo, ref rmwInfo, out status))
+ return false;
recordInfo.SetDirtyAndModified();
- return _clientSession.InPlaceUpdater(ref key, ref input, ref output, ref value, ref recordInfo, ref rmwInfo, out status);
+ return true;
+ }
+
+ public bool InPlaceUpdaterLockEphemeral(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status,
+ out EphemeralLockResult lockResult)
+ {
+ lockResult = recordInfo.TryLockExclusive() ? EphemeralLockResult.Success : EphemeralLockResult.Failed;
+ if (lockResult == EphemeralLockResult.Failed)
+ {
+ status = OperationStatus.RETRY_LATER;
+ return false;
+ }
+ try
+ {
+ if (InPlaceUpdaterNoLock(ref key, ref input, ref value, ref output, ref recordInfo, ref rmwInfo, out status))
+ return true;
+ // Expiration sets additional bits beyond SUCCESS, and Cancel does not set SUCCESS.
+ if (status == OperationStatus.SUCCESS)
+ lockResult = EphemeralLockResult.HoldForSeal;
+ return false;
+ }
+ finally
+ {
+ if (lockResult != EphemeralLockResult.HoldForSeal)
+ recordInfo.UnlockExclusive();
+ }
}
public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status, RecordMetadata recordMetadata)
@@ -1060,18 +1247,65 @@ public bool SingleDeleter(ref Key key, ref Value value, ref RecordInfo recordInf
=> _clientSession.functions.SingleDeleter(ref key, ref value, ref deleteInfo);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
+ public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
+ {
+ if (_clientSession.fht.DoEphemeralLocking)
+ PostSingleDeleterLockEphemeral(ref key, ref recordInfo, ref deleteInfo);
+ else
+ PostSingleDeleterNoLock(ref key, ref recordInfo, ref deleteInfo);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void PostSingleDeleterNoLock(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
{
recordInfo.SetDirtyAndModified();
_clientSession.functions.PostSingleDeleter(ref key, ref deleteInfo);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
+ public void PostSingleDeleterLockEphemeral(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
{
+ try
+ {
+ PostSingleDeleterNoLock(ref key, ref recordInfo, ref deleteInfo);
+ }
+ finally
+ {
+ recordInfo.UnlockExclusive();
+ }
+ }
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo, out EphemeralLockResult lockResult)
+ {
+ lockResult = EphemeralLockResult.Success;
+ return _clientSession.fht.DoEphemeralLocking
+ ? ConcurrentDeleterLockEphemeral(ref key, ref value, ref recordInfo, ref deleteInfo, out lockResult)
+ : ConcurrentDeleterNoLock(ref key, ref value, ref recordInfo, ref deleteInfo);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool ConcurrentDeleterNoLock(ref Key key, ref Value value, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
+ {
+ if (!_clientSession.functions.ConcurrentDeleter(ref key, ref value, ref deleteInfo))
+ return false;
recordInfo.SetDirtyAndModified();
recordInfo.SetTombstone();
- return _clientSession.functions.ConcurrentDeleter(ref key, ref value, ref deleteInfo);
+ return true;
+ }
+
+ public bool ConcurrentDeleterLockEphemeral(ref Key key, ref Value value, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo, out EphemeralLockResult lockResult)
+ {
+ lockResult = recordInfo.TryLockExclusive() ? EphemeralLockResult.Success : EphemeralLockResult.Failed;
+ if (lockResult == EphemeralLockResult.Failed)
+ return false;
+ try
+ {
+ return ConcurrentDeleterNoLock(ref key, ref value, ref recordInfo, ref deleteInfo);
+ }
+ finally
+ {
+ recordInfo.UnlockExclusive();
+ }
}
#endregion IFunctions - Deletes
@@ -1096,16 +1330,41 @@ public void CheckpointCompletionCallback(int sessionID, string sessionName, Comm
}
#endregion IFunctions - Checkpointing
- #region Ephemeral locking
- public bool TryLockEphemeralExclusive(ref RecordInfo recordInfo) => _clientSession.fht.DisableEphemeralLocking || recordInfo.TryLockExclusive();
- public bool TryLockEphemeralShared(ref RecordInfo recordInfo) => _clientSession.fht.DisableEphemeralLocking || recordInfo.TryLockShared();
- public void UnlockEphemeralExclusive(ref RecordInfo recordInfo)
+ #region Transient locking
+ public bool TryLockTransientExclusive(ref Key key, ref OperationStackContext stackCtx)
{
- if (!_clientSession.fht.DisableEphemeralLocking)
- recordInfo.UnlockExclusive();
+ if (!_clientSession.fht.DoTransientLocking)
+ return true;
+ if (!_clientSession.fht.LockTable.TryLockTransientExclusive(ref key, ref stackCtx.hei))
+ return false;
+ return stackCtx.recSrc.HasTransientLock = true;
}
- public bool TryUnlockEphemeralShared(ref RecordInfo recordInfo) => _clientSession.fht.DisableEphemeralLocking || recordInfo.TryUnlockShared();
- #endregion Ephemeral locking
+
+ public bool TryLockTransientShared(ref Key key, ref OperationStackContext stackCtx)
+ {
+ if (!_clientSession.fht.DoTransientLocking)
+ return true;
+ if (!_clientSession.fht.LockTable.TryLockTransientShared(ref key, ref stackCtx.hei))
+ return false;
+ return stackCtx.recSrc.HasTransientLock = true;
+ }
+
+ public void UnlockTransientExclusive(ref Key key, ref OperationStackContext stackCtx)
+ {
+ if (!_clientSession.fht.DoTransientLocking)
+ return;
+ _clientSession.fht.LockTable.UnlockExclusive(ref key, ref stackCtx.hei);
+ stackCtx.recSrc.HasTransientLock = false;
+ }
+
+ public void UnlockTransientShared(ref Key key, ref OperationStackContext stackCtx)
+ {
+ if (!_clientSession.fht.DoTransientLocking)
+ return;
+ _clientSession.fht.LockTable.UnlockShared(ref key, ref stackCtx.hei);
+ stackCtx.recSrc.HasTransientLock = false;
+ }
+ #endregion Transient locking
#region Internal utilities
public int GetInitialLength(ref Input input)
@@ -1127,6 +1386,8 @@ public IHeapContainer GetHeapContainer(ref Input input)
public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false)
=> _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit);
+
+ public FasterKV.FasterExecutionContext Ctx => this._clientSession.ctx;
#endregion Internal utilities
}
#endregion IFasterSession
diff --git a/cs/src/core/ClientSession/IFasterContext.cs b/cs/src/core/ClientSession/IFasterContext.cs
index d1806b011..164cf3c4f 100644
--- a/cs/src/core/ClientSession/IFasterContext.cs
+++ b/cs/src/core/ClientSession/IFasterContext.cs
@@ -510,7 +510,6 @@ ValueTask.ReadAsyncResult> ReadAtAd
/// to complete the Upsert operation. Failure to complete the operation will result in leaked allocations.
ValueTask.DeleteAsyncResult> DeleteAsync(Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default);
-
///
/// Reset the modified bit of a record (for in memory records)
///
diff --git a/cs/src/core/ClientSession/ILockableContext.cs b/cs/src/core/ClientSession/ILockableContext.cs
index 2a7b11b9e..2ffc102bc 100644
--- a/cs/src/core/ClientSession/ILockableContext.cs
+++ b/cs/src/core/ClientSession/ILockableContext.cs
@@ -20,43 +20,119 @@ public interface ILockableContext
void EndLockable();
///
- /// Lock the key with the specified , waiting until it is acquired
+ /// If true, then keys must use one of the overloads to obtain a code by which groups of keys will be sorted for manual locking, to avoid deadlocks.
///
- /// The key to lock
- /// The type of lock to take
- void Lock(ref TKey key, LockType lockType);
+ /// Whether this returns true depends on the on , or passed to the FasterKV constructor.
+ bool NeedKeyLockCode { get; }
///
- /// Lock the key with the specified , waiting until it is acquired
+ /// Obtain a code by which groups of keys will be sorted for manual locking, to avoid deadlocks.
+ /// The key to obtain a code for
+ /// The hashcode of the key; created and returned by .
///
- /// The key to lock
- /// The type of lock to take
- void Lock(TKey key, LockType lockType);
+ /// If is true, this code is obtained by FASTER on method calls and is used in its locking scheme.
+ /// In that case the app must ensure that the keys in a group are sorted by this value, to avoid deadlock.
+ long GetLockCode(TKey key, out long keyHash);
///
- /// Lock the key with the specified
+ /// Obtain a code by which groups of keys will be sorted for manual locking, to avoid deadlocks.
+ /// The key to obtain a code for
+ /// The hashcode of the key; created and returned by .
///
- /// The key to lock
- /// The type of lock to release
- void Unlock(ref TKey key, LockType lockType);
+ /// If is true, this code is obtained by FASTER on method calls and is used in its locking scheme.
+ /// In that case the app must ensure that the keys in a group are sorted by this value, to avoid deadlock.
+ long GetLockCode(ref TKey key, out long keyHash);
///
- /// Unlock the key with the specified
+ /// Obtain a code by which groups of keys will be sorted for manual locking, to avoid deadlocks.
+ /// The key to obtain a code for
+ /// The hashcode of the key; must be the value returned by .
///
- /// The key to lock
- /// The type of lock to release
- void Unlock(TKey key, LockType lockType);
+ /// If is true, this code is obtained by FASTER on method calls and is used in its locking scheme.
+ /// In that case the app must ensure that the keys in a group are sorted by this value, to avoid deadlock.
+ long GetLockCode(TKey key, long keyHash);
///
- /// Determines if the key is locked. Note this value may be obsolete as soon as it returns.
+ /// Obtain a code by which groups of keys will be sorted for manual locking, to avoid deadlocks.
+ /// The key to obtain a code for
+ /// The hashcode of the key; must be the value returned by .
///
- /// The key to lock
- (bool exclusive, byte shared) IsLocked(ref TKey key);
+ /// If is true, this code is obtained by FASTER on method calls and is used in its locking scheme.
+ /// In that case the app must ensure that the keys in a group are sorted by this value, to avoid deadlock.
+ long GetLockCode(ref TKey key, long keyHash);
///
- /// Determines if the key is locked. Note this value may be obsolete as soon as it returns.
+ /// Compare two structures that implement ILockableKey.
///
- /// The key to lock
- (bool exclusive, byte shared) IsLocked(TKey key);
+ /// The type of the app data struct or class containing key info
+ /// The first key to compare
+ /// The first key to compare
+ /// The result of key1.CompareTo(key2)
+ int CompareLockCodes(TLockableKey key1, TLockableKey key2)
+ where TLockableKey : ILockableKey;
+
+ ///
+ /// Compare two structures that implement ILockableKey.
+ ///
+ /// The type of the app data struct or class containing key info
+ /// The first key to compare
+ /// The first key to compare
+ /// The result of key1.CompareTo(key2)
+ int CompareLockCodes(ref TLockableKey key1, ref TLockableKey key2)
+ where TLockableKey : ILockableKey;
+
+ ///
+ /// Sort an array of app data structures (or classes) by lock code and lock type; these will be passed to Lockable*Session.Lock
+ ///
+ /// The type of the app data struct or class containing key info
+ /// The array of app key data
+ void SortLockCodes(TLockableKey[] keys)
+ where TLockableKey : ILockableKey;
+
+ ///
+ /// Sort an array of app data structures (or classes) by lock code and lock type; these will be passed to Lockable*Session.Lock
+ ///
+ /// The type of the app data struct or class containing key info
+ /// The array of app key data
+ /// The starting index to sort
+ /// The number of keys to sort
+ void SortLockCodes(TLockableKey[] keys, int start, int count)
+ where TLockableKey : ILockableKey;
+
+ ///
+ /// Locks the keys identified in the passed array.
+ ///
+ ///
+ /// keyCodes to be locked, and whether that locking is shared or exclusive; must be sorted by .
+ void Lock(TLockableKey[] keys)
+ where TLockableKey : ILockableKey;
+
+ ///
+ /// Locks the keys identified in the passed array.
+ ///
+ ///
+ /// keyCodes to be locked, and whether that locking is shared or exclusive; must be sorted by .
+ /// The starting index to Lock
+ /// The number of keys to Lock
+ void Lock(TLockableKey[] keys, int start, int count)
+ where TLockableKey : ILockableKey;
+
+ ///
+ /// Unlocks the keys identified in the passed array.
+ ///
+ ///
+ /// keyCodes to be unlocked, and whether that unlocking is shared or exclusive; must be sorted by .
+ void Unlock(TLockableKey[] keys)
+ where TLockableKey : ILockableKey;
+
+ ///
+ /// Unlocks the keys identified in the passed array.
+ ///
+ ///
+ /// keyCodes to be unlocked, and whether that unlocking is shared or exclusive; must be sorted by .
+ /// The starting index to Unlock
+ /// The number of keys to Unlock
+ void Unlock(TLockableKey[] keys, int start, int count)
+ where TLockableKey : ILockableKey;
}
}
diff --git a/cs/src/core/ClientSession/LockableContext.cs b/cs/src/core/ClientSession/LockableContext.cs
index 89351cec5..39a2c34fa 100644
--- a/cs/src/core/ClientSession/LockableContext.cs
+++ b/cs/src/core/ClientSession/LockableContext.cs
@@ -39,55 +39,99 @@ internal LockableContext(ClientSession
- public unsafe void Lock(ref Key key, LockType lockType)
+ public bool NeedKeyLockCode => clientSession.NeedKeyLockCode;
+
+ ///
+ public long GetLockCode(Key key, out long keyHash) => clientSession.GetLockCode(ref key, out keyHash);
+
+ ///
+ public long GetLockCode(ref Key key, out long keyHash) => clientSession.GetLockCode(ref key, out keyHash);
+
+ ///
+ public long GetLockCode(Key key, long keyHash) => clientSession.GetLockCode(ref key, keyHash);
+
+ ///
+ public long GetLockCode(ref Key key, long keyHash) => clientSession.GetLockCode(ref key, keyHash);
+
+ ///
+ public int CompareLockCodes(TLockableKey key1, TLockableKey key2) where TLockableKey : ILockableKey => clientSession.CompareLockCodes(key1, key2);
+
+ ///
+ public int CompareLockCodes(ref TLockableKey key1, ref TLockableKey key2) where TLockableKey : ILockableKey => clientSession.CompareLockCodes(ref key1, ref key2);
+
+ ///
+ public void SortLockCodes(TLockableKey[] keys) where TLockableKey : ILockableKey => clientSession.SortLockCodes(keys);
+
+ ///
+ public void SortLockCodes(TLockableKey[] keys, int start, int count) where TLockableKey : ILockableKey => clientSession.SortLockCodes(keys, start, count);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal static unsafe void DoInternalLockOp(FasterSession fasterSession, ClientSession clientSession,
+ TLockableKey[] keys, int start, int count, LockOperationType lockOpType)
+ where FasterSession : IFasterSession
+ where TLockableKey : ILockableKey
{
- clientSession.CheckIsAcquiredLockable();
- Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
- clientSession.UnsafeResumeThread();
- try
+ // The key codes are sorted, but there may be duplicates; the sorting is such that exclusive locks come first for each key code,
+ // which of course allows the session to do shared operations as well, so we take the first occurrence of each key code.
+ // Unlock has to be done in the reverse order of locking, so we take the *last* occurrence of each key there.
+ var end = start + count - 1;
+ if (lockOpType == LockOperationType.Lock)
{
- LockOperation lockOp = new(LockOperationType.Lock, lockType);
-
- OperationStatus status;
- do
- status = clientSession.fht.InternalLock(ref key, lockOp, out _);
- while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
- Debug.Assert(status == OperationStatus.SUCCESS);
+ for (int ii = start; ii <= end; ++ii)
+ {
+ var lockType = DoLockOp(fasterSession, clientSession, keys, start, lockOpType, ii);
+ if (lockType == LockType.Exclusive)
+ ++clientSession.exclusiveLockCount;
+ else if (lockType == LockType.Shared)
+ ++clientSession.sharedLockCount;
+ }
+ return;
+ }
+ // LockOperationType.Unlock; go through the keys in reverse.
+ for (int ii = end; ii >= start; --ii)
+ {
+ var lockType = DoLockOp(fasterSession, clientSession, keys, start, lockOpType, ii);
if (lockType == LockType.Exclusive)
- ++clientSession.exclusiveLockCount;
- else
- ++clientSession.sharedLockCount;
+ --clientSession.exclusiveLockCount;
+ else if (lockType == LockType.Shared)
+ --clientSession.sharedLockCount;
}
- finally
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static unsafe LockType DoLockOp(FasterSession fasterSession, ClientSession clientSession,
+ TLockableKey[] keys, int start, LockOperationType lockOpType, int idx)
+ where FasterSession : IFasterSession
+ where TLockableKey : ILockableKey
+ {
+ ref var key = ref keys[idx];
+ if (idx == start || clientSession.fht.LockTable.GetBucketIndex(key.LockCode) != clientSession.fht.LockTable.GetBucketIndex(keys[idx - 1].LockCode))
{
- clientSession.UnsafeSuspendThread();
+ OperationStatus status;
+ do
+ status = clientSession.fht.InternalLock(key.LockCode, new(lockOpType, key.LockType));
+ while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, fasterSession));
+ Debug.Assert(status == OperationStatus.SUCCESS);
+ return key.LockType;
}
+ return LockType.None;
}
///
- public unsafe void Lock(Key key, LockType lockType) => Lock(ref key, lockType);
-
+ public void Lock(TLockableKey[] keys) where TLockableKey : ILockableKey => Lock(keys, 0, keys.Length);
+
///
- public void Unlock(ref Key key, LockType lockType)
+ public void Lock(TLockableKey[] keys, int start, int count)
+ where TLockableKey : ILockableKey
{
clientSession.CheckIsAcquiredLockable();
- Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
+ Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected(), "Trying to protect an already-protected epoch for LockableUnsafeContext.Lock()");
+
clientSession.UnsafeResumeThread();
try
{
- LockOperation lockOp = new(LockOperationType.Unlock, lockType);
-
- OperationStatus status;
- do
- status = clientSession.fht.InternalLock(ref key, lockOp, out _);
- while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
- Debug.Assert(status == OperationStatus.SUCCESS);
-
- if (lockType == LockType.Exclusive)
- --clientSession.exclusiveLockCount;
- else
- --clientSession.sharedLockCount;
+ DoInternalLockOp(FasterSession, clientSession, keys, start, count, LockOperationType.Lock);
}
finally
{
@@ -96,25 +140,19 @@ public void Unlock(ref Key key, LockType lockType)
}
///
- public void Unlock(Key key, LockType lockType) => Unlock(ref key, lockType);
+ public void Unlock(TLockableKey[] keys) where TLockableKey : ILockableKey => Unlock(keys, 0, keys.Length);
///
- public (bool exclusive, byte shared) IsLocked(ref Key key)
+ public void Unlock(TLockableKey[] keys, int start, int count)
+ where TLockableKey : ILockableKey
{
clientSession.CheckIsAcquiredLockable();
- Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
+ Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected(), "Trying to protect an already-protected epoch for LockableUnsafeContext.Unlock()");
+
clientSession.UnsafeResumeThread();
try
{
- LockOperation lockOp = new(LockOperationType.IsLocked, LockType.None);
-
- OperationStatus status;
- RecordInfo lockInfo;
- do
- status = clientSession.fht.InternalLock(ref key, lockOp, out lockInfo);
- while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
- Debug.Assert(status == OperationStatus.SUCCESS);
- return (lockInfo.IsLockedExclusive, lockInfo.NumLockedShared);
+ DoInternalLockOp(FasterSession, clientSession, keys, start, count, LockOperationType.Unlock);
}
finally
{
@@ -122,9 +160,6 @@ public void Unlock(ref Key key, LockType lockType)
}
}
- ///
- public (bool exclusive, byte shared) IsLocked(Key key) => IsLocked(ref key);
-
///
/// The session id of FasterSession
///
@@ -180,7 +215,7 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
clientSession.UnsafeResumeThread();
try
{
- return clientSession.fht.ContextRead(ref key, ref input, ref output, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextRead(ref key, ref input, ref output, userContext, FasterSession, serialNo);
}
finally
{
@@ -230,7 +265,7 @@ public Status Read(ref Key key, ref Input input, ref Output output, ref ReadOpti
clientSession.UnsafeResumeThread();
try
{
- return clientSession.fht.ContextRead(ref key, ref input, ref output, ref readOptions, out recordMetadata, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextRead(ref key, ref input, ref output, ref readOptions, out recordMetadata, userContext, FasterSession, serialNo);
}
finally
{
@@ -246,7 +281,7 @@ public Status ReadAtAddress(ref Input input, ref Output output, ref ReadOptions
clientSession.UnsafeResumeThread();
try
{
- return clientSession.fht.ContextReadAtAddress(ref input, ref output, ref readOptions, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextReadAtAddress(ref input, ref output, ref readOptions, userContext, FasterSession, serialNo);
}
finally
{
@@ -260,7 +295,7 @@ public ValueTask.ReadAsyncResult> R
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
ReadOptions readOptions = default;
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
}
///
@@ -269,7 +304,7 @@ public ValueTask.ReadAsyncResult> R
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
ReadOptions readOptions = default;
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, context, serialNo, token);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, context, serialNo, token);
}
///
@@ -279,7 +314,7 @@ public ValueTask.ReadAsyncResult> R
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
Input input = default;
ReadOptions readOptions = default;
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, userContext, serialNo, token);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, token);
}
///
@@ -289,7 +324,7 @@ public ValueTask.ReadAsyncResult> R
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
Input input = default;
ReadOptions readOptions = default;
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, context, serialNo, token);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, context, serialNo, token);
}
///
@@ -298,7 +333,7 @@ public ValueTask.ReadAsyncResult> R
Context userContext = default, long serialNo = 0, CancellationToken cancellationToken = default)
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
}
///
@@ -308,7 +343,7 @@ public ValueTask.ReadAsyncResult> R
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
Key key = default;
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken, noKey: true);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken, noKey: true);
}
///
@@ -329,7 +364,7 @@ public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref O
clientSession.UnsafeResumeThread();
try
{
- return clientSession.fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, userContext, FasterSession, serialNo);
}
finally
{
@@ -345,7 +380,7 @@ public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref O
clientSession.UnsafeResumeThread();
try
{
- return clientSession.fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, out recordMetadata, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, out recordMetadata, userContext, FasterSession, serialNo);
}
finally
{
@@ -376,7 +411,7 @@ public ValueTask.UpsertAsyncResult>
public ValueTask.UpsertAsyncResult> UpsertAsync(ref Key key, ref Input input, ref Value desiredValue, Context userContext = default, long serialNo = 0, CancellationToken token = default)
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
- return clientSession.fht.UpsertAsync(FasterSession, clientSession.ctx, ref key, ref input, ref desiredValue, userContext, serialNo, token);
+ return clientSession.fht.UpsertAsync(FasterSession, ref key, ref input, ref desiredValue, userContext, serialNo, token);
}
///
@@ -402,7 +437,7 @@ public Status RMW(ref Key key, ref Input input, ref Output output, out RecordMet
clientSession.UnsafeResumeThread();
try
{
- return clientSession.fht.ContextRMW(ref key, ref input, ref output, out recordMetadata, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextRMW(ref key, ref input, ref output, out recordMetadata, userContext, FasterSession, serialNo);
}
finally
{
@@ -439,7 +474,7 @@ public Status RMW(Key key, Input input, Context userContext = default, long seri
public ValueTask.RmwAsyncResult> RMWAsync(ref Key key, ref Input input, Context context = default, long serialNo = 0, CancellationToken token = default)
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
- return clientSession.fht.RmwAsync(FasterSession, clientSession.ctx, ref key, ref input, context, serialNo, token);
+ return clientSession.fht.RmwAsync(FasterSession, ref key, ref input, context, serialNo, token);
}
///
@@ -455,7 +490,7 @@ public Status Delete(ref Key key, Context userContext = default, long serialNo =
clientSession.UnsafeResumeThread();
try
{
- return clientSession.fht.ContextDelete(ref key, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextDelete(ref key, userContext, FasterSession, serialNo);
}
finally
{
@@ -473,7 +508,7 @@ public Status Delete(Key key, Context userContext = default, long serialNo = 0)
public ValueTask.DeleteAsyncResult> DeleteAsync(ref Key key, Context userContext = default, long serialNo = 0, CancellationToken token = default)
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
- return clientSession.fht.DeleteAsync(FasterSession, clientSession.ctx, ref key, userContext, serialNo, token);
+ return clientSession.fht.DeleteAsync(FasterSession, ref key, userContext, serialNo, token);
}
///
@@ -498,7 +533,7 @@ public void Refresh()
clientSession.UnsafeResumeThread();
try
{
- clientSession.fht.InternalRefresh(clientSession.ctx, FasterSession);
+ clientSession.fht.InternalRefresh(FasterSession);
}
finally
{
@@ -520,27 +555,20 @@ public InternalFasterSession(ClientSession true; // We only lock in Lock/Unlock, explicitly; these are longer-duration locks.
+ public bool DisableTransientLocking => true; // We only lock in Lock/Unlock, explicitly; these are longer-duration locks.
public bool IsManualLocking => true;
- public SessionType SessionType => SessionType.LockableContext;
- #endregion IFunctions - Optional features supported
-
#region IFunctions - Reads
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, ref ReadInfo readInfo)
=> _clientSession.functions.SingleReader(ref key, ref input, ref value, ref dst, ref readInfo);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public bool ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, ref ReadInfo readInfo)
+ public bool ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, ref ReadInfo readInfo, out EphemeralLockResult lockResult)
{
- if (_clientSession.functions.ConcurrentReader(ref key, ref input, ref value, ref dst, ref readInfo))
- return true;
- if (readInfo.Action == ReadAction.Expire)
- recordInfo.Tombstone = true;
- return false;
+ lockResult = EphemeralLockResult.Success; // Ephemeral locking is not used with Lockable contexts
+ return _clientSession.functions.ConcurrentReader(ref key, ref input, ref value, ref dst, ref readInfo);
}
public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status, RecordMetadata recordMetadata)
@@ -561,12 +589,13 @@ public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Va
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo)
+ public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, ref UpsertInfo upsertInfo, out EphemeralLockResult lockResult)
{
+ lockResult = EphemeralLockResult.Success; // Ephemeral locking is not used with Lockable contexts
+ if (!_clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo))
+ return false;
recordInfo.SetDirtyAndModified();
-
- // Note: KeyIndexes do not need notification of in-place updates because the key does not change.
- return _clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo);
+ return true;
}
#endregion IFunctions - Upserts
@@ -598,7 +627,7 @@ public bool CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
=> _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
+ public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo)
{
recordInfo.SetDirtyAndModified();
_clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref rmwInfo);
@@ -607,10 +636,13 @@ public void PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, re
#region InPlaceUpdater
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status)
+ public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, ref RMWInfo rmwInfo, out OperationStatus status, out EphemeralLockResult lockResult)
{
+ lockResult = EphemeralLockResult.Success; // Ephemeral locking is not used with Lockable contexts
+ if (!_clientSession.InPlaceUpdater(ref key, ref input, ref value, ref output, ref recordInfo, ref rmwInfo, out status))
+ return false;
recordInfo.SetDirtyAndModified();
- return _clientSession.InPlaceUpdater(ref key, ref input, ref output, ref value, ref recordInfo, ref rmwInfo, out status);
+ return true;
}
public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status, RecordMetadata recordMetadata)
@@ -625,18 +657,21 @@ public bool SingleDeleter(ref Key key, ref Value value, ref RecordInfo recordInf
=> _clientSession.functions.SingleDeleter(ref key, ref value, ref deleteInfo);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
+ public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
{
recordInfo.SetDirtyAndModified();
_clientSession.functions.PostSingleDeleter(ref key, ref deleteInfo);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo)
+ public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, ref DeleteInfo deleteInfo, out EphemeralLockResult lockResult)
{
+ lockResult = EphemeralLockResult.Success; // Ephemeral locking is not used with Lockable contexts
+ if (!_clientSession.functions.ConcurrentDeleter(ref key, ref value, ref deleteInfo))
+ return false;
recordInfo.SetDirtyAndModified();
recordInfo.SetTombstone();
- return _clientSession.functions.ConcurrentDeleter(ref key, ref value, ref deleteInfo);
+ return true;
}
#endregion IFunctions - Deletes
@@ -661,21 +696,40 @@ public void CheckpointCompletionCallback(int sessionID, string sessionName, Comm
}
#endregion IFunctions - Checkpointing
- #region Ephemeral locking
- public bool TryLockEphemeralExclusive(ref RecordInfo recordInfo)
+ #region Transient locking
+ public bool TryLockTransientExclusive(ref Key key, ref OperationStackContext stackCtx)
{
- Debug.Assert(recordInfo.IsLockedExclusive, $"Attempting to use a non-XLocked key in a Lockable context (requesting XLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}");
+ Debug.Assert(_clientSession.fht.LockTable.IsLockedExclusive(ref key, ref stackCtx.hei),
+ $"Attempting to use a non-XLocked key in a Lockable context (requesting XLock):"
+ + $" XLocked {_clientSession.fht.LockTable.IsLockedExclusive(ref key, ref stackCtx.hei)},"
+ + $" Slocked {_clientSession.fht.LockTable.IsLockedShared(ref key, ref stackCtx.hei)}");
return true;
}
- public bool TryLockEphemeralShared(ref RecordInfo recordInfo)
+ public bool TryLockTransientShared(ref Key key, ref OperationStackContext stackCtx)
{
- Debug.Assert(recordInfo.IsLocked, $"Attempting to use a non-Locked (S or X) key in a Lockable context (requesting SLock): XLocked {recordInfo.IsLockedExclusive}, Slocked {recordInfo.NumLockedShared}");
+ Debug.Assert(_clientSession.fht.LockTable.IsLocked(ref key, ref stackCtx.hei),
+ $"Attempting to use a non-Locked (S or X) key in a Lockable context (requesting SLock):"
+ + $" XLocked {_clientSession.fht.LockTable.IsLockedExclusive(ref key, ref stackCtx.hei)},"
+ + $" Slocked {_clientSession.fht.LockTable.IsLockedShared(ref key, ref stackCtx.hei)}");
return true;
}
- public void UnlockEphemeralExclusive(ref RecordInfo recordInfo) { }
- public bool TryUnlockEphemeralShared(ref RecordInfo recordInfo) => true;
+ public void UnlockTransientExclusive(ref Key key, ref OperationStackContext stackCtx)
+ {
+ Debug.Assert(_clientSession.fht.LockTable.IsLockedExclusive(ref key, ref stackCtx.hei),
+ $"Attempting to unlock a non-XLocked key in a Lockable context (requesting XLock):"
+ + $" XLocked {_clientSession.fht.LockTable.IsLockedExclusive(ref key, ref stackCtx.hei)},"
+ + $" Slocked {_clientSession.fht.LockTable.IsLockedShared(ref key, ref stackCtx.hei)}");
+ }
+
+ public void UnlockTransientShared(ref Key key, ref OperationStackContext stackCtx)
+ {
+ Debug.Assert(_clientSession.fht.LockTable.IsLockedShared(ref key, ref stackCtx.hei),
+ $"Attempting to use a non-XLocked key in a Lockable context (requesting XLock):"
+ + $" XLocked {_clientSession.fht.LockTable.IsLockedExclusive(ref key, ref stackCtx.hei)},"
+ + $" Slocked {_clientSession.fht.LockTable.IsLockedShared(ref key, ref stackCtx.hei)}");
+ }
#endregion
#region Internal utilities
@@ -698,6 +752,8 @@ public IHeapContainer GetHeapContainer(ref Input input)
public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false)
=> _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit);
+
+ public FasterKV.FasterExecutionContext Ctx => this._clientSession.ctx;
#endregion Internal utilities
}
#endregion IFasterSession
diff --git a/cs/src/core/ClientSession/LockableUnsafeContext.cs b/cs/src/core/ClientSession/LockableUnsafeContext.cs
index 85bb87cdf..4ffc23493 100644
--- a/cs/src/core/ClientSession/LockableUnsafeContext.cs
+++ b/cs/src/core/ClientSession/LockableUnsafeContext.cs
@@ -15,7 +15,7 @@ namespace FASTER.core
where Functions : IFunctions
{
readonly ClientSession clientSession;
- internal readonly InternalFasterSession FasterSession;
+ internal readonly LockableContext.InternalFasterSession FasterSession;
/// Indicates whether this struct has been initialized
public bool IsNull => this.clientSession is null;
@@ -23,7 +23,7 @@ namespace FASTER.core
internal LockableUnsafeContext(ClientSession clientSession)
{
this.clientSession = clientSession;
- FasterSession = new InternalFasterSession(clientSession);
+ FasterSession = new LockableContext.InternalFasterSession(clientSession);
}
#region Begin/EndUnsafe
@@ -48,72 +48,59 @@ internal LockableUnsafeContext(ClientSession
- public unsafe void Lock(ref Key key, LockType lockType)
- {
- clientSession.CheckIsAcquiredLockable();
- Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected(), "Epoch protection required for Lock()");
+ public bool NeedKeyLockCode => clientSession.NeedKeyLockCode;
- LockOperation lockOp = new(LockOperationType.Lock, lockType);
+ ///
+ public long GetLockCode(Key key, out long keyHash) => clientSession.GetLockCode(ref key, out keyHash);
- OperationStatus status;
- do
- status = clientSession.fht.InternalLock(ref key, lockOp, out _);
- while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
- Debug.Assert(status == OperationStatus.SUCCESS);
+ ///
+ public long GetLockCode(ref Key key, out long keyHash) => clientSession.GetLockCode(ref key, out keyHash);
- if (lockType == LockType.Exclusive)
- ++clientSession.exclusiveLockCount;
- else
- ++clientSession.sharedLockCount;
- }
+ ///
+ public long GetLockCode(Key key, long keyHash) => clientSession.GetLockCode(ref key, keyHash);
///
- public unsafe void Lock(Key key, LockType lockType) => Lock(ref key, lockType);
+ public long GetLockCode(ref Key key, long keyHash) => clientSession.GetLockCode(ref key, keyHash);
///
- public void Unlock(ref Key key, LockType lockType)
- {
- clientSession.CheckIsAcquiredLockable();
- Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected(), "Epoch protection required for Unlock()");
+ public int CompareLockCodes(TLockableKey key1, TLockableKey key2) where TLockableKey : ILockableKey => clientSession.CompareLockCodes(key1, key2);
- LockOperation lockOp = new(LockOperationType.Unlock, lockType);
+ ///
+ public int CompareLockCodes(ref TLockableKey key1, ref TLockableKey key2) where TLockableKey : ILockableKey => clientSession.CompareLockCodes(ref key1, ref key2);
- OperationStatus status;
- do
- status = clientSession.fht.InternalLock(ref key, lockOp, out _);
- while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
- Debug.Assert(status == OperationStatus.SUCCESS);
+ ///
+ public void SortLockCodes(TLockableKey[] keys) where TLockableKey : ILockableKey => clientSession.SortLockCodes(keys);
- if (lockType == LockType.Exclusive)
- --clientSession.exclusiveLockCount;
- else
- --clientSession.sharedLockCount;
- }
+ ///
+ public void SortLockCodes(TLockableKey[] keys, int start, int count) where TLockableKey : ILockableKey => clientSession.SortLockCodes(keys, start, count);
///
- public void Unlock(Key key, LockType lockType) => Unlock(ref key, lockType);
+ public void Lock(TLockableKey[] keys) where TLockableKey : ILockableKey => Lock(keys, 0, keys.Length);
///
- public (bool exclusive, byte shared) IsLocked(ref Key key)
+ public void Lock(TLockableKey[] keys, int start, int count)
+ where TLockableKey : ILockableKey
{
clientSession.CheckIsAcquiredLockable();
- Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected(), "Epoch protection required for IsLocked()");
-
- LockOperation lockOp = new(LockOperationType.IsLocked, LockType.None);
+ Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected(), "Epoch protection required for LockableUnsafeContext.Lock()");
- OperationStatus status;
- RecordInfo lockInfo;
- do
- status = clientSession.fht.InternalLock(ref key, lockOp, out lockInfo);
- while (clientSession.fht.HandleImmediateNonPendingRetryStatus(status, clientSession.ctx, FasterSession));
- Debug.Assert(status == OperationStatus.SUCCESS);
- return (lockInfo.IsLockedExclusive, lockInfo.NumLockedShared);
+ LockableContext.DoInternalLockOp(FasterSession, clientSession, keys, start, count, LockOperationType.Lock);
}
///
- public (bool exclusive, byte shared) IsLocked(Key key) => IsLocked(ref key);
+ public void Unlock(TLockableKey[] keys) where TLockableKey : ILockableKey => Unlock(keys, 0, keys.Length);
+
+ ///
+ public void Unlock(TLockableKey[] keys, int start, int count)
+ where TLockableKey : ILockableKey
+ {
+ clientSession.CheckIsAcquiredLockable();
+ Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected(), "Epoch protection required for LockableUnsafeContext.Unlock()");
+
+ LockableContext.DoInternalLockOp(FasterSession, clientSession, keys, start, count, LockOperationType.Unlock);
+ }
///
/// The session id of FasterSession
@@ -150,7 +137,7 @@ public ValueTask> Co
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
- return clientSession.fht.ContextRead(ref key, ref input, ref output, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextRead(ref key, ref input, ref output, userContext, FasterSession, serialNo);
}
///
@@ -192,7 +179,7 @@ public Status Read(Key key, out Output output, Context userContext = default, lo
public Status Read(ref Key key, ref Input input, ref Output output, ref ReadOptions readOptions, out RecordMetadata recordMetadata, Context userContext = default, long serialNo = 0)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
- return clientSession.fht.ContextRead(ref key, ref input, ref output, ref readOptions, out recordMetadata, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextRead(ref key, ref input, ref output, ref readOptions, out recordMetadata, userContext, FasterSession, serialNo);
}
///
@@ -200,7 +187,7 @@ public Status Read(ref Key key, ref Input input, ref Output output, ref ReadOpti
public Status ReadAtAddress(ref Input input, ref Output output, ref ReadOptions readOptions, Context userContext = default, long serialNo = 0)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
- return clientSession.fht.ContextReadAtAddress(ref input, ref output, ref readOptions, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextReadAtAddress(ref input, ref output, ref readOptions, userContext, FasterSession, serialNo);
}
///
@@ -209,7 +196,7 @@ public ValueTask.ReadAsyncResult> R
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
ReadOptions readOptions = default;
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
}
///
@@ -218,7 +205,7 @@ public ValueTask.ReadAsyncResult> R
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
ReadOptions readOptions = default;
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, context, serialNo, token);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, context, serialNo, token);
}
///
@@ -228,7 +215,7 @@ public ValueTask.ReadAsyncResult> R
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
Input input = default;
ReadOptions readOptions = default;
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, userContext, serialNo, token);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, token);
}
///
@@ -238,7 +225,7 @@ public ValueTask.ReadAsyncResult> R
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
Input input = default;
ReadOptions readOptions = default;
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, context, serialNo, token);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, context, serialNo, token);
}
///
@@ -247,7 +234,7 @@ public ValueTask.ReadAsyncResult> R
Context userContext = default, long serialNo = 0, CancellationToken cancellationToken = default)
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken);
}
///
@@ -257,7 +244,7 @@ public ValueTask.ReadAsyncResult> R
{
Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected());
Key key = default;
- return clientSession.fht.ReadAsync(FasterSession, clientSession.ctx, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken, noKey: true);
+ return clientSession.fht.ReadAsync(FasterSession, ref key, ref input, ref readOptions, userContext, serialNo, cancellationToken, noKey: true);
}
///
@@ -275,7 +262,7 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext =
public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref Output output, Context userContext = default, long serialNo = 0)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
- return clientSession.fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, userContext, FasterSession, serialNo);
}
///
@@ -283,7 +270,7 @@ public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref O
public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref Output output, out RecordMetadata recordMetadata, Context userContext = default, long serialNo = 0)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
- return clientSession.fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, out recordMetadata, userContext, FasterSession, serialNo, clientSession.ctx);
+ return clientSession.fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, out recordMetadata, userContext, FasterSession, serialNo);
}
///
@@ -309,7 +296,8 @@ public ValueTask.UpsertAsyncResult