diff --git a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs index cab049e11..0eeda11a3 100644 --- a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs +++ b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs @@ -51,7 +51,7 @@ internal FasterSpanByteYcsbBenchmark(KeySpanByte[] i_keys_, KeySpanByte[] t_keys numaStyle = testLoader.Options.NumaStyle; readPercent = testLoader.Options.ReadPercent; var lockImpl = testLoader.LockImpl; - functions = new FunctionsSB(lockImpl != LockImpl.None); + functions = new FunctionsSB(lockImpl != LockImpl.None, testLoader.Options.PostOps); #if DASHBOARD statsWritten = new AutoResetEvent[threadCount]; diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index 9852f0ef3..9df09fc4d 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -48,7 +48,7 @@ internal FASTER_YcsbBenchmark(Key[] i_keys_, Key[] t_keys_, TestLoader testLoade numaStyle = testLoader.Options.NumaStyle; readPercent = testLoader.Options.ReadPercent; var lockImpl = testLoader.LockImpl; - functions = new Functions(lockImpl != LockImpl.None); + functions = new Functions(lockImpl != LockImpl.None, testLoader.Options.PostOps); #if DASHBOARD statsWritten = new AutoResetEvent[threadCount]; diff --git a/cs/benchmark/Functions.cs b/cs/benchmark/Functions.cs index ac3701706..b52bb9b87 100644 --- a/cs/benchmark/Functions.cs +++ b/cs/benchmark/Functions.cs @@ -12,7 +12,7 @@ public struct Functions : IFunctions readonly bool locking; readonly bool postOps; - public Functions(bool locking, bool postOps = false) + public Functions(bool locking, bool postOps) { this.locking = locking; this.postOps = postOps; @@ -60,13 +60,13 @@ public bool ConcurrentReader(ref Key key, ref Input input, ref Value value, ref // Upsert functions [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) + public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { dst = src; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) + public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { dst = src; return true; @@ -94,23 +94,32 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) => true; + public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output) => true; + + public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) { } + + public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) => true; + + public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long address) { } + + public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { } + public bool SupportsLocking => locking; - public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) - { - if (lockType == LockType.Exclusive) - recordInfo.LockExclusive(); - else - recordInfo.LockShared(); - } + public void LockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) => recordInfo.LockExclusive(); + + public void UnlockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) => recordInfo.UnlockExclusive(); + + public bool TryLockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) => recordInfo.TryLockExclusive(spinCount); - public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) + public void LockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) => recordInfo.LockShared(); + + public bool UnlockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) { - if (lockType == LockType.Exclusive) - recordInfo.UnlockExclusive(); - else - recordInfo.UnlockShared(); + recordInfo.UnlockShared(); return true; } + + public bool TryLockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) => recordInfo.TryLockShared(spinCount); } } diff --git a/cs/benchmark/FunctionsSB.cs b/cs/benchmark/FunctionsSB.cs index 6cdb27eb0..9ff3382a0 100644 --- a/cs/benchmark/FunctionsSB.cs +++ b/cs/benchmark/FunctionsSB.cs @@ -7,6 +7,6 @@ namespace FASTER.benchmark { public sealed class FunctionsSB : SpanByteFunctions { - public FunctionsSB(bool locking) : base(locking: locking) { } + public FunctionsSB(bool locking, bool postOps) : base(locking: locking) { } } } diff --git a/cs/benchmark/Options.cs b/cs/benchmark/Options.cs index c138ed222..b97731938 100644 --- a/cs/benchmark/Options.cs +++ b/cs/benchmark/Options.cs @@ -74,6 +74,10 @@ class Options HelpText = "Do not use thread affinitization in experiment")] public bool NoThreadAffinity { get; set; } + [Option("post", Required = false, Default = false, + HelpText = "Support post-append operations")] + public bool PostOps { get; set; } + [Option("chkptms", Required = false, Default = 0, HelpText = "If > 0, the number of milliseconds between checkpoints in experiment (else checkpointing is not done")] public int PeriodicCheckpointMilliseconds { get; set; } @@ -96,7 +100,7 @@ public string GetOptionsString() { static string boolStr(bool value) => value ? "y" : "n"; return $"d: {DistributionName.ToLower()}; n: {NumaStyle}; r: {ReadPercent}; t: {ThreadCount}; z: {LockImpl}; i: {IterationCount};" - + $" sd: {boolStr(UseSmallData)}; sm: {boolStr(UseSmallMemoryLog)}; sy: {boolStr(this.UseSyntheticData)}; noaff: {boolStr(this.NoThreadAffinity)};" + + $" sd: {boolStr(UseSmallData)}; sm: {boolStr(UseSmallMemoryLog)}; sy: {boolStr(this.UseSyntheticData)}; noaff: {boolStr(this.NoThreadAffinity)}; post: {boolStr(this.PostOps)};" + $" chkptms: {this.PeriodicCheckpointMilliseconds}; chkpttype: {(this.PeriodicCheckpointMilliseconds > 0 ? this.PeriodicCheckpointType.ToString() : "None")}; chkptincr: {boolStr(this.PeriodicCheckpointTryIncremental)}"; } } diff --git a/cs/benchmark/scripts/compare_runs.ps1 b/cs/benchmark/scripts/compare_runs.ps1 index 9547312e8..376e32009 100644 --- a/cs/benchmark/scripts/compare_runs.ps1 +++ b/cs/benchmark/scripts/compare_runs.ps1 @@ -57,6 +57,7 @@ class Result : System.IComparable, System.IEquatable[Object] { [int]$ReadPercent [uint]$ThreadCount [uint]$LockMode + [uint]$PostOpsMode [uint]$Iterations [bool]$SmallData [bool]$SmallMemory @@ -81,6 +82,7 @@ class Result : System.IComparable, System.IEquatable[Object] { "r" { $this.ReadPercent = $value } "t" { $this.ThreadCount = $value } "z" { $this.LockMode = $value } + "post" { $this.PostOpsMode = $value -eq "y" } "i" { $this.Iterations = $value } "sd" { $this.SmallData = $value -eq "y" } "sm" { $this.SmallMemory = $value -eq "y" } @@ -99,6 +101,7 @@ class Result : System.IComparable, System.IEquatable[Object] { $this.ReadPercent = $other.ReadPercent $this.ThreadCount = $other.ThreadCount $this.LockMode = $other.LockMode + $this.PostOpsMode = $other.PostOpsMode $this.Iterations = $other.Iterations $this.SmallData = $other.SmallData $this.SmallMemory = $other.SmallMemory @@ -166,6 +169,7 @@ class Result : System.IComparable, System.IEquatable[Object] { -and $this.ReadPercent -eq $other.ReadPercent -and $this.ThreadCount -eq $other.ThreadCount -and $this.LockMode -eq $other.LockMode + -and $this.PostOpsMode -eq $other.PostOpsMode -and $this.Iterations -eq $other.Iterations -and $this.SmallData -eq $other.SmallData -and $this.SmallMemory -eq $other.SmallMemory @@ -177,7 +181,7 @@ class Result : System.IComparable, System.IEquatable[Object] { } [int] GetHashCode() { - return ($this.Numa, $this.Distribution, $this.ReadPercent, $this.ThreadCount, $this.LockMode, + return ($this.Numa, $this.Distribution, $this.ReadPercent, $this.ThreadCount, $this.LockMode, $this.PostOpsMode, $this.Iterations, $this.SmallData, $this.SmallMemory, $this.SyntheticData, $this.NoAff, $this.ChkptMs, $this.ChkptType, $this.ChkptIncr).GetHashCode(); } @@ -280,6 +284,7 @@ function RenameProperties([System.Object[]]$results) { ReadPercent, ThreadCount, LockMode, + PostOpsMode, Iterations, SmallData, SmallMemory, diff --git a/cs/benchmark/scripts/run_benchmark.ps1 b/cs/benchmark/scripts/run_benchmark.ps1 index d94171d90..8b84696ff 100644 --- a/cs/benchmark/scripts/run_benchmark.ps1 +++ b/cs/benchmark/scripts/run_benchmark.ps1 @@ -34,12 +34,17 @@ Locking mode to use: 0 = No locking, 1 = RecordInfo locking Used primarily to debug changes to this script or do a quick one-off run; the default is multiple counts as defined in the script. +.PARAMETER PostOpsMode + Post-append operations mode to use: 0 = No post ops, 1 = do post ops + Used primarily to debug changes to this script or do a quick one-off run; the default is multiple counts as defined in the script. + .PARAMETER ReadPercentages Keys the Operation to perform: An array of one or more of: 0 = No read (Upsert workload only) 100 = All reads Between 0 and 100 = mix of reads and upserts -1 = All RMWs + The default is 0,100: one pass with all upserts, and one pass with all reads .PARAMETER UseRecover Recover the FasterKV from a checkpoint of a previous run rather than loading it from data. @@ -78,12 +83,19 @@ pwsh -c "./run_benchmark.ps1 master,branch_with_my_changes -CloneAndBuild " Clones the master branch to the .\master folder, the branch_with_my_changes to the branch_with_my_changes folder, and runs those with any specified. + +.EXAMPLE + pwsh -c "./run_benchmark.ps1 master,branch_with_my_changes -CloneAndBuild -LockMode 0 -PostOpsMode 0" + + Clones the master branch to the .\master folder, the branch_with_my_changes to the branch_with_my_changes folder, and runs those with no locking or post-append operations; + this is for best performance. #> param ( [Parameter(Mandatory=$true)] [string[]]$ExeDirs, [Parameter(Mandatory=$false)] [int]$RunSeconds = 30, [Parameter(Mandatory=$false)] [int]$ThreadCount = -1, [Parameter(Mandatory=$false)] [int]$LockMode = -1, + [Parameter(Mandatory=$false)] [int]$PostOpsMode = -1, [Parameter(Mandatory=$false)] [int[]]$ReadPercentages, [Parameter(Mandatory=$false)] [switch]$UseRecover, [Parameter(Mandatory=$false)] [switch]$CloneAndBuild, @@ -133,6 +145,7 @@ $distributions = ("uniform", "zipf") $readPercents = (0, 100) $threadCounts = (1, 20, 40, 60, 80) $lockModes = (0, 1) +$postOpsModes = (0, 1) $smallDatas = (0) #, 1) $smallMemories = (0) #, 1) $syntheticDatas = (0) #, 1) @@ -144,6 +157,9 @@ if ($ThreadCount -ge 0) { if ($LockMode -ge 0) { $lockModes = ($LockMode) } +if ($PostOpsMode -ge 0) { + $postOpsModes = ($PostOpsMode) +} if ($ReadPercentages) { $readPercents = $ReadPercentages } @@ -156,6 +172,7 @@ $permutations = $distributions.Count * $readPercents.Count * $threadCounts.Count * $lockModes.Count * + $postOpsModes.Count * $smallDatas.Count * $smallMemories.Count * $syntheticDatas.Count @@ -165,26 +182,29 @@ foreach ($d in $distributions) { foreach ($r in $readPercents) { foreach ($t in $threadCounts) { foreach ($z in $lockModes) { - foreach ($sd in $smallDatas) { - foreach ($sm in $smallMemories) { - foreach ($sy in $syntheticDatas) { - Write-Host - Write-Host "Permutation $permutation of $permutations" + foreach ($p in $postOpsModes) { + foreach ($sd in $smallDatas) { + foreach ($sm in $smallMemories) { + foreach ($sy in $syntheticDatas) { + Write-Host + Write-Host "Permutation $permutation of $permutations" - # Only certain combinations of Numa/Threads are supported - $n = ($t -lt 48) ? 0 : 1; + # Only certain combinations of Numa/Threads are supported + $n = ($t -lt 48) ? 0 : 1; - for($ii = 0; $ii -lt $exeNames.Count; ++$ii) { - $exeName = $exeNames[$ii] - $resultDir = $resultDirs[$ii] + for($ii = 0; $ii -lt $exeNames.Count; ++$ii) { + $exeName = $exeNames[$ii] + $resultDir = $resultDirs[$ii] - Write-Host - Write-Host "Permutation $permutation/$permutations generating results $($ii + 1)/$($exeNames.Count) to $resultDir for: -n $n -d $d -r $r -t $t -z $z -i $iterations --runsec $RunSeconds $k" + Write-Host + Write-Host "Permutation $permutation/$permutations generating results $($ii + 1)/$($exeNames.Count) to $resultDir for: -n $n -d $d -r $r -t $t -z $z -post $p -i $iterations --runsec $RunSeconds $k" - # RunSec and Recover are for one-off operations and are not recorded in the filenames. - & "$exeName" -b 0 -n $n -d $d -r $r -t $t -z $z -i $iterations --runsec $RunSeconds $k | Tee-Object "$resultDir/results_n-$($n)_d-$($d)_r-$($r)_t-$($t)_z-$($z).txt" + # RunSec and Recover are for one-off operations and are not recorded in the filenames. + $post = $p -eq 0 ? "" : "--post" + & "$exeName" -b 0 -n $n -d $d -r $r -t $t -z $z $post -i $iterations --runsec $RunSeconds $k | Tee-Object "$resultDir/results_n-$($n)_d-$($d)_r-$($r)_t-$($t)_z-$($z)_post-$($p).txt" + } + ++$permutation } - ++$permutation } } } diff --git a/cs/remote/samples/FixedLenServer/Types.cs b/cs/remote/samples/FixedLenServer/Types.cs index f24b59b1e..c9e26f025 100644 --- a/cs/remote/samples/FixedLenServer/Types.cs +++ b/cs/remote/samples/FixedLenServer/Types.cs @@ -87,10 +87,10 @@ public bool ConcurrentReader(ref Key key, ref Input input, ref Value value, ref // Upsert functions [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) => dst = src; + public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) => dst = src; [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) + public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { dst = src; return true; @@ -119,12 +119,27 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va output.value = newValue; } + public bool SupportsPostOperations => false; + [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) => true; - public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) { } + public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output) => true; + + public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) { } + + public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) => true; + + public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long address) { } + + public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { } - public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) => true; + public void LockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) { } + public void UnlockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) { } + public bool TryLockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) => true; + public void LockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) { } + public bool UnlockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) => true; + public bool TryLockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) => true; public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) => true; } diff --git a/cs/remote/src/FASTER.client/FASTER.client.csproj b/cs/remote/src/FASTER.client/FASTER.client.csproj index 936955e16..d6b7c92f4 100644 --- a/cs/remote/src/FASTER.client/FASTER.client.csproj +++ b/cs/remote/src/FASTER.client/FASTER.client.csproj @@ -1,7 +1,19 @@  + + + + net6.0;net5.0;netstandard2.1;netstandard2.0 + + + + + net5.0;netstandard2.1;netstandard2.0 + + + + - net5.0;netstandard2.1;netstandard2.0;net461 true AnyCPU latest @@ -24,10 +36,6 @@ bin\$(Platform)\Release\ - - - - diff --git a/cs/remote/src/FASTER.common/FASTER.common.csproj b/cs/remote/src/FASTER.common/FASTER.common.csproj index 33dde3b9a..0b833dfda 100644 --- a/cs/remote/src/FASTER.common/FASTER.common.csproj +++ b/cs/remote/src/FASTER.common/FASTER.common.csproj @@ -1,7 +1,19 @@  + + + + net6.0;net5.0;netstandard2.1;netstandard2.0 + + + + + net5.0;netstandard2.1;netstandard2.0 + + + + - net5.0;netstandard2.1;netstandard2.0;net461 true AnyCPU latest @@ -29,8 +41,4 @@ - - - - diff --git a/cs/remote/src/FASTER.server/FASTER.server.csproj b/cs/remote/src/FASTER.server/FASTER.server.csproj index 3cc22c43b..c4eab12b6 100644 --- a/cs/remote/src/FASTER.server/FASTER.server.csproj +++ b/cs/remote/src/FASTER.server/FASTER.server.csproj @@ -1,7 +1,19 @@  + + + + net6.0;net5.0;netstandard2.1;netstandard2.0 + + + + + net5.0;netstandard2.1;netstandard2.0 + + + + - net5.0;netstandard2.1;netstandard2.0;net461 true AnyCPU latest diff --git a/cs/remote/src/FASTER.server/ServerKVFunctions.cs b/cs/remote/src/FASTER.server/ServerKVFunctions.cs index 5b6011f4c..fc22ae638 100644 --- a/cs/remote/src/FASTER.server/ServerKVFunctions.cs +++ b/cs/remote/src/FASTER.server/ServerKVFunctions.cs @@ -32,8 +32,8 @@ public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recor public bool ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, long address) => functions.ConcurrentReader(ref key, ref input, ref value, ref dst, ref recordInfo, address); - public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) - => functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref recordInfo, address); + public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) + => functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output) => functions.NeedInitialUpdate(ref key, ref input, ref output); @@ -73,18 +73,35 @@ public void RMWCompletionCallback(ref Key key, ref Input input, ref Output outpu public bool SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, long address) => functions.SingleReader(ref key, ref input, ref value, ref dst, ref recordInfo, address); - public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) - => functions.SingleWriter(ref key, ref input, ref src, ref dst, ref recordInfo, address); + public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) + => functions.SingleWriter(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); - public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) { } + public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { } public void UpsertCompletionCallback(ref Key key, ref Input input, ref Value value, long ctx) => functions.UpsertCompletionCallback(ref key, ref input, ref value, ctx); - public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) - => functions.Lock(ref recordInfo, ref key, ref value, lockType, ref lockContext); + public void LockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) + => functions.LockExclusive(ref recordInfo, ref key, ref value, ref lockContext); - public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) - => functions.Unlock(ref recordInfo, ref key, ref value, lockType, lockContext); + /// + public void UnlockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) + => functions.UnlockExclusive(ref recordInfo, ref key, ref value, lockContext); + + /// + public bool TryLockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) + => functions.TryLockExclusive(ref recordInfo, ref key, ref value, ref lockContext, spinCount); + + /// + public void LockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) + => functions.LockShared(ref recordInfo, ref key, ref value, ref lockContext); + + /// + public bool UnlockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) + => functions.UnlockShared(ref recordInfo, ref key, ref value, lockContext); + + /// + public bool TryLockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) + => functions.TryLockShared(ref recordInfo, ref key, ref value, ref lockContext, spinCount); } } diff --git a/cs/remote/test/FASTER.remote.test/FASTER.remote.test.csproj b/cs/remote/test/FASTER.remote.test/FASTER.remote.test.csproj index 619528a48..7e7324be1 100644 --- a/cs/remote/test/FASTER.remote.test/FASTER.remote.test.csproj +++ b/cs/remote/test/FASTER.remote.test/FASTER.remote.test.csproj @@ -1,8 +1,19 @@  + + + + net6.0;net5.0;netcoreapp3.1 + + + + + net5.0;netcoreapp3.1 + + + + - net5.0;netcoreapp3.1;netcoreapp2.1;net461 - net5.0;netcoreapp3.1 AnyCPU;x64 latest true diff --git a/cs/samples/MemOnlyCache/Types.cs b/cs/samples/MemOnlyCache/Types.cs index 880fdc021..460daaae2 100644 --- a/cs/samples/MemOnlyCache/Types.cs +++ b/cs/samples/MemOnlyCache/Types.cs @@ -52,14 +52,14 @@ public CacheFunctions(CacheSizeTracker sizeTracker) this.sizeTracker = sizeTracker; } - public override bool ConcurrentWriter(ref CacheKey key, ref CacheValue input, ref CacheValue src, ref CacheValue dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref CacheKey key, ref CacheValue input, ref CacheValue src, ref CacheValue dst, ref CacheValue output, ref RecordInfo recordInfo, long address) { var old = Interlocked.Exchange(ref dst, src); sizeTracker.AddTrackedSize(dst.GetSize - old.GetSize); return true; } - public override void SingleWriter(ref CacheKey key, ref CacheValue input, ref CacheValue src, ref CacheValue dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref CacheKey key, ref CacheValue input, ref CacheValue src, ref CacheValue dst, ref CacheValue output, ref RecordInfo recordInfo, long address) { dst = src; sizeTracker.AddTrackedSize(key.GetSize + src.GetSize); diff --git a/cs/samples/ReadAddress/Types.cs b/cs/samples/ReadAddress/Types.cs index b145c08f9..34b1505ff 100644 --- a/cs/samples/ReadAddress/Types.cs +++ b/cs/samples/ReadAddress/Types.cs @@ -37,7 +37,7 @@ public struct Value public class Functions : SimpleFunctions { // Return false to force a chain of values. - public override bool ConcurrentWriter(ref Key key, ref Value input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) => false; + public override bool ConcurrentWriter(ref Key key, ref Value input, ref Value src, ref Value dst, ref Value output, ref RecordInfo recordInfo, long address) => false; public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref Value output, ref RecordInfo recordInfo, long address) => false; } diff --git a/cs/src/core/Async/RMWAsync.cs b/cs/src/core/Async/RMWAsync.cs index f9284b383..ec41be657 100644 --- a/cs/src/core/Async/RMWAsync.cs +++ b/cs/src/core/Async/RMWAsync.cs @@ -73,7 +73,6 @@ public void DecrementPending(FasterExecutionContext curr public struct RmwAsyncResult { internal readonly UpdateAsyncInternal, RmwAsyncResult> updateAsyncInternal; - readonly RecordMetadata recordMetadata; /// Current status of the RMW operation public Status Status { get; } @@ -81,11 +80,14 @@ public struct RmwAsyncResult /// Output of the RMW operation if current status is not public TOutput Output { get; } + /// Metadata of the updated record + public RecordMetadata RecordMetadata { get; } + internal RmwAsyncResult(Status status, TOutput output, RecordMetadata recordMetadata) { this.Status = status; this.Output = output; - this.recordMetadata = recordMetadata; + this.RecordMetadata = recordMetadata; this.updateAsyncInternal = default; } @@ -95,7 +97,7 @@ internal RmwAsyncResult(FasterKV fasterKV, IFasterSession, RmwAsyncResult>( fasterKV, fasterSession, currentCtx, pendingContext, exceptionDispatchInfo, new RmwAsyncOperation(diskRequest)); } @@ -104,7 +106,7 @@ internal RmwAsyncResult(FasterKV fasterKV, IFasterSessionValueTask for RMW result. User needs to await again if result status is . public ValueTask> CompleteAsync(CancellationToken token = default) => this.Status != Status.PENDING - ? new ValueTask>(new RmwAsyncResult(this.Status, this.Output, this.recordMetadata)) + ? new ValueTask>(new RmwAsyncResult(this.Status, this.Output, this.RecordMetadata)) : updateAsyncInternal.CompleteAsync(token); /// Complete the RMW operation, issuing additional (rare) I/O synchronously if needed. @@ -118,11 +120,11 @@ public ValueTask> CompleteAsync(Cancella { if (this.Status != Status.PENDING) { - recordMetadata = this.recordMetadata; + recordMetadata = this.RecordMetadata; return (this.Status, this.Output); } var rmwAsyncResult = updateAsyncInternal.Complete(); - recordMetadata = rmwAsyncResult.recordMetadata; + recordMetadata = rmwAsyncResult.RecordMetadata; return (rmwAsyncResult.Status, rmwAsyncResult.Output); } } diff --git a/cs/src/core/Async/ReadAsync.cs b/cs/src/core/Async/ReadAsync.cs index 7503ed7f6..63afbf913 100644 --- a/cs/src/core/Async/ReadAsync.cs +++ b/cs/src/core/Async/ReadAsync.cs @@ -96,8 +96,8 @@ internal ReadAsyncInternal(FasterKV fasterKV, IFasterSession public struct ReadAsyncResult { - internal readonly Status status; - internal readonly Output output; + private readonly Status status; + private readonly Output output; readonly RecordMetadata recordMetadata; internal readonly ReadAsyncInternal readAsyncInternal; diff --git a/cs/src/core/Async/UpsertAsync.cs b/cs/src/core/Async/UpsertAsync.cs index 33d132d24..c590e03d0 100644 --- a/cs/src/core/Async/UpsertAsync.cs +++ b/cs/src/core/Async/UpsertAsync.cs @@ -14,17 +14,18 @@ public partial class FasterKV : FasterBase, IFasterKV internal struct UpsertAsyncOperation : IUpdateAsyncOperation> { /// - public UpsertAsyncResult CreateResult(Status status, Output output, RecordMetadata recordMetadata) => new UpsertAsyncResult(status); + public UpsertAsyncResult CreateResult(Status status, Output output, RecordMetadata recordMetadata) => new UpsertAsyncResult(status, output, recordMetadata); /// public Status DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext, IFasterSession fasterSession, FasterExecutionContext currentCtx, bool asyncOp, out CompletionEvent flushEvent, out Output output) { + output = default; OperationStatus internalStatus; do { flushEvent = fasterKV.hlog.FlushEvent; - internalStatus = fasterKV.InternalUpsert(ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.value.Get(), ref pendingContext.userContext, ref pendingContext, fasterSession, currentCtx, pendingContext.serialNum); + internalStatus = fasterKV.InternalUpsert(ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.value.Get(), ref output, ref pendingContext.userContext, ref pendingContext, fasterSession, currentCtx, pendingContext.serialNum); } while (internalStatus == OperationStatus.RETRY_NOW); output = default; return TranslateStatus(internalStatus); @@ -45,37 +46,61 @@ public void DecrementPending(FasterExecutionContext curr /// /// State storage for the completion of an async Upsert, or the result if the Upsert was completed synchronously /// - public struct UpsertAsyncResult + public struct UpsertAsyncResult { - internal readonly UpdateAsyncInternal, UpsertAsyncResult> updateAsyncInternal; + internal readonly UpdateAsyncInternal, UpsertAsyncResult> updateAsyncInternal; /// Current status of the Upsert operation public Status Status { get; } - internal UpsertAsyncResult(Status status) + /// Output of the Upsert operation + public TOutput Output { get; } + + /// Metadata of the updated record + public RecordMetadata RecordMetadata { get; } + + internal UpsertAsyncResult(Status status, TOutput output, RecordMetadata recordMetadata) { this.Status = status; + this.Output = output; + this.RecordMetadata = recordMetadata; this.updateAsyncInternal = default; } - internal UpsertAsyncResult(FasterKV fasterKV, IFasterSession fasterSession, - FasterExecutionContext currentCtx, PendingContext pendingContext, ExceptionDispatchInfo exceptionDispatchInfo) + internal UpsertAsyncResult(FasterKV fasterKV, IFasterSession fasterSession, + FasterExecutionContext currentCtx, PendingContext pendingContext, ExceptionDispatchInfo exceptionDispatchInfo) { this.Status = Status.PENDING; - updateAsyncInternal = new UpdateAsyncInternal, UpsertAsyncResult>( - fasterKV, fasterSession, currentCtx, pendingContext, exceptionDispatchInfo, new UpsertAsyncOperation()); + this.Output = default; + this.RecordMetadata = default; + updateAsyncInternal = new UpdateAsyncInternal, UpsertAsyncResult>( + fasterKV, fasterSession, currentCtx, pendingContext, exceptionDispatchInfo, new UpsertAsyncOperation()); } /// Complete the Upsert operation, issuing additional allocation asynchronously if needed. It is usually preferable to use Complete() instead of this. /// ValueTask for Upsert result. User needs to await again if result status is Status.PENDING. - public ValueTask> CompleteAsync(CancellationToken token = default) + public ValueTask> CompleteAsync(CancellationToken token = default) => this.Status != Status.PENDING - ? new ValueTask>(new UpsertAsyncResult(this.Status)) + ? new ValueTask>(new UpsertAsyncResult(this.Status, this.Output, this.RecordMetadata)) : updateAsyncInternal.CompleteAsync(token); /// Complete the Upsert operation, issuing additional I/O synchronously if needed. /// Status of Upsert operation public Status Complete() => this.Status != Status.PENDING ? this.Status : updateAsyncInternal.Complete().Status; + + /// Complete the Upsert operation, issuing additional I/O synchronously if needed. + /// Status and Output of Upsert operation + public (Status status, TOutput output) Complete(out RecordMetadata recordMetadata) + { + if (this.Status != Status.PENDING) + { + recordMetadata = this.RecordMetadata; + return (this.Status, this.Output); + } + var upsertAsyncResult = updateAsyncInternal.Complete(); + recordMetadata = upsertAsyncResult.RecordMetadata; + return (upsertAsyncResult.Status, upsertAsyncResult.Output); + } } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -91,6 +116,7 @@ private ValueTask> UpsertAsync currentCtx, ref PendingContext pcontext, ref Key key, ref Input input, ref Value value, Context userContext, long serialNo, CancellationToken token) { CompletionEvent flushEvent; + Output output = default; fasterSession.UnsafeResumeThread(); try @@ -99,11 +125,11 @@ private ValueTask> UpsertAsync>(new UpsertAsyncResult((Status)internalStatus)); + return new ValueTask>(new UpsertAsyncResult((Status)internalStatus, output, new RecordMetadata(pcontext.recordInfo, pcontext.logicalAddress))); Debug.Assert(internalStatus == OperationStatus.ALLOCATE_FAILED); } finally diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index 9eb8e99fb..24fe1a08e 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -464,7 +464,8 @@ public ValueTask.ReadAsyncResult> R public Status Upsert(ref Key key, ref Value desiredValue, Context userContext = default, long serialNo = 0) { Input input = default; - return Upsert(ref key, ref input, ref desiredValue, userContext, serialNo); + Output output = default; + return Upsert(ref key, ref input, ref desiredValue, ref output, out _, userContext, serialNo); } /// @@ -473,16 +474,32 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext = /// /// /// + /// /// /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, Context userContext = default, long serialNo = 0) + public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref Output output, Context userContext = default, long serialNo = 0) + => Upsert(ref key, ref input, ref desiredValue, ref output, out _, userContext, serialNo); + + /// + /// Upsert operation + /// + /// + /// + /// + /// + /// + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public Status Upsert(ref Key key, ref Input input, ref Value desiredValue, ref Output output, out RecordMetadata recordMetadata, Context userContext = default, long serialNo = 0) { if (SupportAsync) UnsafeResumeThread(); try { - return fht.ContextUpsert(ref key, ref input, ref desiredValue, userContext, FasterSession, serialNo, ctx); + return fht.ContextUpsert(ref key, ref input, ref desiredValue, ref output, out recordMetadata, userContext, FasterSession, serialNo, ctx); } finally { @@ -508,12 +525,13 @@ public Status Upsert(Key key, Value desiredValue, Context userContext = default, /// /// /// + /// /// /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public Status Upsert(Key key, Input input, Value desiredValue, Context userContext = default, long serialNo = 0) - => Upsert(ref key, ref input, ref desiredValue, userContext, serialNo); + public Status Upsert(Key key, Input input, Value desiredValue, ref Output output, Context userContext = default, long serialNo = 0) + => Upsert(ref key, ref input, ref desiredValue, ref output, out _, userContext, serialNo); /// /// Async Upsert operation @@ -1037,15 +1055,16 @@ public long Compact(long untilAddress, bool shiftBeginAddre /// /// /// + /// /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void CopyToTail(ref Key key, ref Input input, ref Value desiredValue, long foundLogicalAddress) + internal void CopyToTail(ref Key key, ref Input input, ref Value desiredValue, ref Output output, long foundLogicalAddress) { if (SupportAsync) UnsafeResumeThread(); try { - fht.InternalCopyToTail(ref key, ref input, ref desiredValue, foundLogicalAddress, FasterSession, ctx, noReadCache: true); + fht.InternalCopyToTail(ref key, ref input, ref desiredValue, ref output, foundLogicalAddress, FasterSession, ctx, noReadCache: true); } finally { @@ -1104,13 +1123,15 @@ public InternalFasterSession(ClientSession _clientSession.functions.SupportsLocking; + public bool SupportsPostOperations => _clientSession.functions.SupportsPostOperations; + #endregion IFunctions - Optional features supported - public void CheckpointCompletionCallback(string guid, CommitPoint commitPoint) - { - _clientSession.functions.CheckpointCompletionCallback(guid, commitPoint); - _clientSession.LatestCommitPoint = commitPoint; - } + #region IFunctions - Reads + public bool SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, long address) + => _clientSession.functions.SingleReader(ref key, ref input, ref value, ref dst, ref recordInfo, address); [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, long address) @@ -1125,93 +1146,199 @@ public bool ConcurrentReaderLock(ref Key key, ref Input input, ref Value value, { success = false; long context = 0; - this.Lock(ref recordInfo, ref key, ref value, LockType.Shared, ref context); + this.LockShared(ref recordInfo, ref key, ref value, ref context); try { success = _clientSession.functions.ConcurrentReader(ref key, ref input, ref value, ref dst, ref recordInfo, address); } finally { - retry = !this.Unlock(ref recordInfo, ref key, ref value, LockType.Shared, context); + retry = !this.UnlockShared(ref recordInfo, ref key, ref value, context); } } return success; } + public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status, RecordMetadata recordMetadata) + => _clientSession.functions.ReadCompletionCallback(ref key, ref input, ref output, ctx, status, recordMetadata); + + #endregion IFunctions - Reads + + #region IFunctions - Upserts + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) + => _clientSession.functions.SingleWriter(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); + [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) + public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address, out long lockContext) + { + lockContext = 0; + this.SingleWriter(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); + if (this.SupportsPostOperations) + { + // Lock must be taken after the value is initialized. Unlocked in PostSingleWriterLock. + this.LockExclusive(ref recordInfo, ref key, ref dst, ref lockContext); + } + } + + public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) + => throw new FasterException("The lockContext form of PostSingleWriter should always be called"); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address, long lockContext) + { + if (!this.SupportsPostOperations) + return; + if (!this.SupportsLocking) + PostSingleWriterNoLock(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); + else + PostSingleWriterLock(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address, lockContext); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void PostSingleWriterNoLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) + { + recordInfo.Version = _clientSession.ctx.version; + _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void PostSingleWriterLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address, long lockContext) + { + // Lock was taken in SingleWriterLock + try + { + PostSingleWriterNoLock(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); + } + finally + { + this.UnlockExclusive(ref recordInfo, ref key, ref dst, lockContext); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) => !this.SupportsLocking - ? ConcurrentWriterNoLock(ref key, ref input, ref src, ref dst, ref recordInfo, address) - : ConcurrentWriterLock(ref key, ref input, ref src, ref dst, ref recordInfo, address); + ? ConcurrentWriterNoLock(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address) + : ConcurrentWriterLock(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool ConcurrentWriterNoLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) + private bool ConcurrentWriterNoLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { recordInfo.Version = _clientSession.ctx.version; // Note: KeyIndexes do not need notification of in-place updates because the key does not change. - return _clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref recordInfo, address); + return _clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool ConcurrentWriterLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) + private bool ConcurrentWriterLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { long context = 0; - this.Lock(ref recordInfo, ref key, ref dst, LockType.Exclusive, ref context); + this.LockExclusive(ref recordInfo, ref key, ref dst, ref context); try { - return !recordInfo.Tombstone && ConcurrentWriterNoLock(ref key, ref input, ref src, ref dst, ref recordInfo, address); + return !recordInfo.Tombstone && ConcurrentWriterNoLock(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); } finally { - this.Unlock(ref recordInfo, ref key, ref dst, LockType.Exclusive, context); + this.UnlockExclusive(ref recordInfo, ref key, ref dst, context); } } + public void UpsertCompletionCallback(ref Key key, ref Input input, ref Value value, Context ctx) + => _clientSession.functions.UpsertCompletionCallback(ref key, ref input, ref value, ctx); + #endregion IFunctions - Upserts + + #region IFunctions - RMWs + #region InitialUpdater [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long address) - => _clientSession.functions.PostSingleDeleter(ref key, ref recordInfo, address); + public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output) + => _clientSession.functions.NeedInitialUpdate(ref key, ref input, ref output); + + public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) + => throw new FasterException("The lockContext form of InitialUpdater should always be called"); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) - => (!this.SupportsLocking) - ? ConcurrentDeleterNoLock(ref key, ref value, ref recordInfo, address) - : ConcurrentDeleterLock(ref key, ref value, ref recordInfo, address); + public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address, out long lockContext) + { + lockContext = 0; + _clientSession.functions.InitialUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); + if (this.SupportsPostOperations) + { + // Lock must be taken after the value is initialized. Unlocked in PostInitialUpdaterLock. + this.LockExclusive(ref recordInfo, ref key, ref value, ref lockContext); + } + } + + public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) + => throw new FasterException("The lockContext form of PostInitialUpdater should always be called"); [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool ConcurrentDeleterNoLock(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) + public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address, long lockContext) + { + if (!this.SupportsPostOperations) + return; + if (!this.SupportsLocking) + PostInitialUpdaterNoLock(ref key, ref input, ref value, ref output, ref recordInfo, address); + else + PostInitialUpdaterLock(ref key, ref input, ref value, ref output, ref recordInfo, address, lockContext); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void PostInitialUpdaterNoLock(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) { recordInfo.Version = _clientSession.ctx.version; - recordInfo.Tombstone = true; - return _clientSession.functions.ConcurrentDeleter(ref key, ref value, ref recordInfo, address); + _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); } - private bool ConcurrentDeleterLock(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void PostInitialUpdaterLock(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address, long lockContext) { - long context = 0; - this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context); + // Lock was taken in InitialUpdaterLock try { - return ConcurrentDeleterNoLock(ref key, ref value, ref recordInfo, address); + PostInitialUpdaterNoLock(ref key, ref input, ref value, ref output, ref recordInfo, address); } finally { - this.Unlock(ref recordInfo, ref key, ref value, LockType.Exclusive, context); + this.UnlockExclusive(ref recordInfo, ref key, ref value, lockContext); } } + #endregion InitialUpdater - public bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output) - => _clientSession.functions.NeedInitialUpdate(ref key, ref input, ref output); - + #region CopyUpdater + [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) => _clientSession.functions.NeedCopyUpdate(ref key, ref input, ref oldValue, ref output); - public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) - => _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, address); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) + => throw new FasterException("The lockContext form of CopyUpdater should always be called"); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) => !this.SupportsLocking - ? PostCopyUpdaterNoLock(ref key, ref input, ref output, ref oldValue, ref newValue, ref recordInfo, address) - : PostCopyUpdaterLock(ref key, ref input, ref output, ref oldValue, ref newValue, ref recordInfo, address); + public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address, out long lockContext) + { + lockContext = 0; + _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, address); + if (this.SupportsPostOperations) + { + // Lock must be taken after the value is initialized. Unlocked in PostInitialUpdaterLock. + this.LockExclusive(ref recordInfo, ref key, ref newValue, ref lockContext); + } + } + + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) + => throw new FasterException("The lockContext form of PostCopyUpdater should always be called"); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address, long lockContext) + { + if (!this.SupportsPostOperations) + return true; + return !this.SupportsLocking + ? PostCopyUpdaterNoLock(ref key, ref input, ref output, ref oldValue, ref newValue, ref recordInfo, address) + : PostCopyUpdaterLock(ref key, ref input, ref output, ref oldValue, ref newValue, ref recordInfo, address, lockContext); + } [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool PostCopyUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value oldValue, ref Value newValue, ref RecordInfo recordInfo, long address) @@ -1220,10 +1347,10 @@ private bool PostCopyUpdaterNoLock(ref Key key, ref Input input, ref Output outp return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, address); } - private bool PostCopyUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value oldValue, ref Value newValue, ref RecordInfo recordInfo, long address) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool PostCopyUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value oldValue, ref Value newValue, ref RecordInfo recordInfo, long address, long lockContext) { - long context = 0; - this.Lock(ref recordInfo, ref key, ref newValue, LockType.Exclusive, ref context); + // Lock was taken in CopyUpdaterLock try { // KeyIndexes do not need notification of in-place updates because the key does not change. @@ -1231,25 +1358,12 @@ private bool PostCopyUpdaterLock(ref Key key, ref Input input, ref Output output } finally { - this.Unlock(ref recordInfo, ref key, ref newValue, LockType.Exclusive, context); + this.UnlockExclusive(ref recordInfo, ref key, ref newValue, lockContext); } } + #endregion CopyUpdater - public void DeleteCompletionCallback(ref Key key, Context ctx) - => _clientSession.functions.DeleteCompletionCallback(ref key, ctx); - - public int GetInitialLength(ref Input input) - => _clientSession.variableLengthStruct.GetInitialLength(ref input); - - public int GetLength(ref Value t, ref Input input) - => _clientSession.variableLengthStruct.GetLength(ref t, ref input); - - public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) - => _clientSession.functions.InitialUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); - - public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) - => _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); - + #region InPlaceUpdater [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => !this.SupportsLocking @@ -1267,44 +1381,105 @@ private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Output outpu private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) { long context = 0; - this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context); + this.LockExclusive(ref recordInfo, ref key, ref value, ref context); try { return !recordInfo.Tombstone && InPlaceUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address); } finally { - this.Unlock(ref recordInfo, ref key, ref value, LockType.Exclusive, context); + this.UnlockExclusive(ref recordInfo, ref key, ref value, context); } } - public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status, RecordMetadata recordMetadata) - => _clientSession.functions.ReadCompletionCallback(ref key, ref input, ref output, ctx, status, recordMetadata); - - public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status, RecordMetadata recordMetadata) + public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, Context ctx, Status status, RecordMetadata recordMetadata) => _clientSession.functions.RMWCompletionCallback(ref key, ref input, ref output, ctx, status, recordMetadata); - public bool SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, long address) - => _clientSession.functions.SingleReader(ref key, ref input, ref value, ref dst, ref recordInfo, address); + #endregion InPlaceUpdater + #endregion IFunctions - RMWs - public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) - => _clientSession.functions.SingleWriter(ref key, ref input, ref src, ref dst, ref recordInfo, address); + #region IFunctions - Deletes + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long address) + { + if (!this.SupportsPostOperations) + return; - public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) - => _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref recordInfo, address); + // There is no value to lock here, so we take a RecordInfo lock in InternalDelete and release it here. + recordInfo.Version = _clientSession.ctx.version; + _clientSession.functions.PostSingleDeleter(ref key, ref recordInfo, address); + if (this.SupportsLocking) + recordInfo.UnlockExclusive(); + } - public void UpsertCompletionCallback(ref Key key, ref Input input, ref Value value, Context ctx) - => _clientSession.functions.UpsertCompletionCallback(ref key, ref input, ref value, ctx); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) + => (!this.SupportsLocking) + ? ConcurrentDeleterNoLock(ref key, ref value, ref recordInfo, address) + : ConcurrentDeleterLock(ref key, ref value, ref recordInfo, address); - public void UnsafeResumeThread() => _clientSession.UnsafeResumeThread(); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool ConcurrentDeleterNoLock(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) + { + recordInfo.Version = _clientSession.ctx.version; + recordInfo.Tombstone = true; + return _clientSession.functions.ConcurrentDeleter(ref key, ref value, ref recordInfo, address); + } - public void UnsafeSuspendThread() => _clientSession.UnsafeSuspendThread(); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool ConcurrentDeleterLock(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) + { + long context = 0; + this.LockExclusive(ref recordInfo, ref key, ref value, ref context); + try + { + return ConcurrentDeleterNoLock(ref key, ref value, ref recordInfo, address); + } + finally + { + this.UnlockExclusive(ref recordInfo, ref key, ref value, context); + } + } - public bool SupportsLocking => _clientSession.functions.SupportsLocking; + public void DeleteCompletionCallback(ref Key key, Context ctx) + => _clientSession.functions.DeleteCompletionCallback(ref key, ctx); + #endregion IFunctions - Deletes - public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) => _clientSession.functions.Lock(ref recordInfo, ref key, ref value, lockType, ref lockContext); + #region IFunctions - Locking - public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) => _clientSession.functions.Unlock(ref recordInfo, ref key, ref value, lockType, lockContext); + public void LockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) + => _clientSession.functions.LockExclusive(ref recordInfo, ref key, ref value, ref lockContext); + + public void UnlockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) + => _clientSession.functions.UnlockExclusive(ref recordInfo, ref key, ref value, lockContext); + + public bool TryLockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) + => _clientSession.functions.TryLockExclusive(ref recordInfo, ref key, ref value, ref lockContext, spinCount); + + public void LockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) + => _clientSession.functions.LockShared(ref recordInfo, ref key, ref value, ref lockContext); + + public bool UnlockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) + => _clientSession.functions.UnlockShared(ref recordInfo, ref key, ref value, lockContext); + + public bool TryLockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) + => _clientSession.functions.TryLockShared(ref recordInfo, ref key, ref value, ref lockContext, spinCount); + #endregion IFunctions - Locking + + #region IFunctions - Checkpointing + public void CheckpointCompletionCallback(string guid, CommitPoint commitPoint) + { + _clientSession.functions.CheckpointCompletionCallback(guid, commitPoint); + _clientSession.LatestCommitPoint = commitPoint; + } + #endregion IFunctions - Checkpointing + + #region Internal utilities + public int GetInitialLength(ref Input input) + => _clientSession.variableLengthStruct.GetInitialLength(ref input); + + public int GetLength(ref Value t, ref Input input) + => _clientSession.variableLengthStruct.GetLength(ref t, ref input); public IHeapContainer GetHeapContainer(ref Input input) { @@ -1313,8 +1488,13 @@ public IHeapContainer GetHeapContainer(ref Input input) return new VarLenHeapContainer(ref input, _clientSession.inputVariableLengthStruct, _clientSession.fht.hlog.bufferPool); } + public void UnsafeResumeThread() => _clientSession.UnsafeResumeThread(); + + public void UnsafeSuspendThread() => _clientSession.UnsafeSuspendThread(); + public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) - => throw new NotImplementedException(); + => _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit); + #endregion Internal utilities } } } diff --git a/cs/src/core/Device/Devices.cs b/cs/src/core/Device/Devices.cs index 1de1ae936..cad821bd6 100644 --- a/cs/src/core/Device/Devices.cs +++ b/cs/src/core/Device/Devices.cs @@ -29,13 +29,11 @@ public static IDevice CreateLogDevice(string logPath, bool preallocateFile = fal { IDevice logDevice; -#if NETSTANDARD || NET if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { logDevice = new ManagedLocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity, recoverDevice); } else -#endif { logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, true, capacity, recoverDevice, useIoCompletionPort); } diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index 3468ec242..cafd4d1d8 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -101,12 +101,11 @@ protected internal LocalStorageDevice(string filename, bool useIoCompletionPort = true) : base(filename, GetSectorSize(filename), capacity) { -#if NETSTANDARD || NET if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { throw new FasterException("Cannot use LocalStorageDevice from non-Windows OS platform, use ManagedLocalStorageDevice instead."); } -#endif + ThrottleLimit = 120; this.useIoCompletionPort = useIoCompletionPort; this._disposed = false; diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs index 496aa5617..188aaa1ad 100644 --- a/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -471,13 +471,12 @@ private string GetSegmentName(int segmentId) private static uint GetSectorSize(string filename) { -#if NETSTANDARD || NET - if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { Debug.WriteLine("Assuming 512 byte sector alignment for disk with file " + filename); return 512; } -#endif + if (!Native32.GetDiskFreeSpace(filename.Substring(0, 3), out uint lpSectorsPerCluster, out uint _sectorSize, diff --git a/cs/src/core/FASTER.core.csproj b/cs/src/core/FASTER.core.csproj index c3f3080c6..33fc13c17 100644 --- a/cs/src/core/FASTER.core.csproj +++ b/cs/src/core/FASTER.core.csproj @@ -1,7 +1,19 @@  + + + + net6.0;net5.0;netstandard2.1;netstandard2.0 + + + + + net5.0;netstandard2.1;netstandard2.0 + + + + - net5.0;netstandard2.1;netstandard2.0;net461 AnyCPU;x64 latest @@ -33,22 +45,19 @@ bin\$(Platform)\Release\ + + ;NU1605 + + - - - - - - - diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index ff54f490e..afff26c1a 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -613,24 +613,26 @@ internal Status ContextReadAtAddress(long } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal Status ContextUpsert(ref Key key, ref Input input, ref Value value, Context context, FasterSession fasterSession, long serialNo, - FasterExecutionContext sessionCtx) + internal Status ContextUpsert(ref Key key, ref Input input, ref Value value, ref Output output, out RecordMetadata recordMetadata, + Context context, FasterSession fasterSession, long serialNo, FasterExecutionContext sessionCtx) where FasterSession : IFasterSession { var pcontext = default(PendingContext); OperationStatus internalStatus; do - internalStatus = InternalUpsert(ref key, ref input, ref value, ref context, ref pcontext, fasterSession, sessionCtx, serialNo); + internalStatus = InternalUpsert(ref key, ref input, ref value, ref output, ref context, ref pcontext, fasterSession, sessionCtx, serialNo); while (internalStatus == OperationStatus.RETRY_NOW); Status status; if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND) { + recordMetadata = new(pcontext.recordInfo, pcontext.logicalAddress); status = (Status)internalStatus; } else { + recordMetadata = default; status = HandleOperationStatus(sessionCtx, sessionCtx, ref pcontext, fasterSession, internalStatus, false, out _); } diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 33f59592f..c5e5cc123 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -219,7 +219,7 @@ internal OperationStatus InternalRead( if (CopyReadsToTail == CopyReadsToTail.FromReadOnly && !pendingContext.SkipCopyReadsToTail) { var container = hlog.GetValueContainer(ref hlog.GetValue(physicalAddress)); - InternalTryCopyToTail(ref key, ref input, ref container.Get(), logicalAddress, fasterSession, sessionCtx); + InternalTryCopyToTail(ref key, ref input, ref container.Get(), ref output, logicalAddress, fasterSession, sessionCtx); container.Dispose(); } return OperationStatus.SUCCESS; @@ -311,6 +311,7 @@ private enum LatchDestination /// key of the record. /// input used to update the value. /// value to be updated to (or inserted if key does not exist). + /// output where the result of the update can be placed /// User context for the operation, in case it goes pending. /// Pending context used internally to store the context of the operation. /// Callback functions. @@ -338,7 +339,7 @@ private enum LatchDestination /// [MethodImpl(MethodImplOptions.AggressiveInlining)] internal OperationStatus InternalUpsert( - ref Key key, ref Input input, ref Value value, + ref Key key, ref Input input, ref Value value, ref Output output, ref Context userContext, ref PendingContext pendingContext, FasterSession fasterSession, @@ -388,9 +389,11 @@ internal OperationStatus InternalUpsert( { ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress); if (!recordInfo.Tombstone - && fasterSession.ConcurrentWriter(ref key, ref input, ref value, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress)) + && fasterSession.ConcurrentWriter(ref key, ref input, ref value, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress)) { hlog.MarkPage(logicalAddress, sessionCtx.version); + pendingContext.recordInfo = recordInfo; + pendingContext.logicalAddress = logicalAddress; return OperationStatus.SUCCESS; } goto CreateNewRecord; @@ -413,10 +416,12 @@ internal OperationStatus InternalUpsert( { ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress); if (!recordInfo.Tombstone - && fasterSession.ConcurrentWriter(ref key, ref input, ref value, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress)) + && fasterSession.ConcurrentWriter(ref key, ref input, ref value, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress)) { if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version); else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version); + pendingContext.recordInfo = recordInfo; + pendingContext.logicalAddress = logicalAddress; status = OperationStatus.SUCCESS; goto LatchRelease; // Release shared latch (if acquired) } @@ -431,7 +436,7 @@ internal OperationStatus InternalUpsert( if (latchDestination != LatchDestination.CreatePendingContext) { // Immutable region or new record - status = CreateNewRecordUpsert(ref key, ref input, ref value, ref pendingContext, fasterSession, sessionCtx, bucket, slot, tag, entry, latestLogicalAddress); + status = CreateNewRecordUpsert(ref key, ref input, ref value, ref output, ref pendingContext, fasterSession, sessionCtx, bucket, slot, tag, entry, latestLogicalAddress); if (status != OperationStatus.ALLOCATE_FAILED) goto LatchRelease; latchDestination = LatchDestination.CreatePendingContext; @@ -445,6 +450,11 @@ internal OperationStatus InternalUpsert( if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key); if (pendingContext.input == default) pendingContext.input = fasterSession.GetHeapContainer(ref input); if (pendingContext.value == default) pendingContext.value = hlog.GetValueContainer(ref value); + + pendingContext.output = output; + if (pendingContext.output is IHeapConvertible heapConvertible) + heapConvertible.ConvertToHeap(); + pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; @@ -549,7 +559,7 @@ private LatchDestination AcquireLatchUpsert(FasterExecut return LatchDestination.NormalProcessing; } - private OperationStatus CreateNewRecordUpsert(ref Key key, ref Input input, ref Value value, ref PendingContext pendingContext, FasterSession fasterSession, + private OperationStatus CreateNewRecordUpsert(ref Key key, ref Input input, ref Value value, ref Output output, ref PendingContext pendingContext, FasterSession fasterSession, FasterExecutionContext sessionCtx, HashBucket* bucket, int slot, ushort tag, HashBucketEntry entry, long latestLogicalAddress) where FasterSession : IFasterSession @@ -566,7 +576,7 @@ private OperationStatus CreateNewRecordUpsert( && fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress)) { hlog.MarkPage(logicalAddress, sessionCtx.version); + pendingContext.recordInfo = recordInfo; + pendingContext.logicalAddress = logicalAddress; return OperationStatus.SUCCESS; } goto CreateNewRecord; @@ -727,6 +742,8 @@ internal OperationStatus InternalRMW( { if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version); else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version); + pendingContext.recordInfo = recordInfo; + pendingContext.logicalAddress = logicalAddress; status = OperationStatus.SUCCESS; goto LatchRelease; // Release shared latch (if acquired) } @@ -943,23 +960,24 @@ private OperationStatus CreateNewRecordRMW= hlog.HeadAddress) { if (hlog.GetInfo(physicalAddress).Tombstone) { - fasterSession.InitialUpdater(ref key, ref input, ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output, ref recordInfo, newLogicalAddress); + fasterSession.InitialUpdater(ref key, ref input, ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output, ref recordInfo, newLogicalAddress, out lockContext); status = OperationStatus.NOTFOUND; } else { fasterSession.CopyUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), - ref output, ref hlog.GetInfo(physicalAddress), newLogicalAddress); + ref output, ref recordInfo, newLogicalAddress, out lockContext); status = OperationStatus.SUCCESS; } } @@ -986,7 +1004,8 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), Debug.Assert(OperationStatus.NOTFOUND == status); fasterSession.PostInitialUpdater(ref key, ref input, ref hlog.GetValue(newPhysicalAddress), - ref output, ref recordInfo, newLogicalAddress); + ref output, ref recordInfo, newLogicalAddress, lockContext); + pendingContext.recordInfo = recordInfo; pendingContext.logicalAddress = newLogicalAddress; return status; } @@ -995,8 +1014,9 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), if (fasterSession.PostCopyUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref hlog.GetValue(newPhysicalAddress), - ref output, ref hlog.GetInfo(physicalAddress), newLogicalAddress)) + ref output, ref recordInfo, newLogicalAddress, lockContext)) { + pendingContext.recordInfo = recordInfo; pendingContext.logicalAddress = newLogicalAddress; return status; } @@ -1004,6 +1024,8 @@ ref hlog.GetValue(newPhysicalAddress), else { // CAS failed + if (fasterSession.SupportsLocking) + fasterSession.UnlockExclusive(ref recordInfo, ref key, ref hlog.GetValue(newPhysicalAddress), lockContext); hlog.GetInfo(newPhysicalAddress).Invalid = true; } status = OperationStatus.RETRY_NOW; @@ -1222,11 +1244,15 @@ internal OperationStatus InternalDelete( goto CreatePendingContext; } var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); - RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), + ref RecordInfo recordInfo = ref hlog.GetInfo(newPhysicalAddress); + RecordInfo.WriteInfo(ref recordInfo, sessionCtx.version, tombstone:true, invalidBit:false, latestLogicalAddress); hlog.Serialize(ref key, newPhysicalAddress); + // There is no Value to lock, so we lock the RecordInfo directly. TODO: Updaters must honor this lock as well + recordInfo.LockExclusive(); + var updatedEntry = default(HashBucketEntry); updatedEntry.Tag = tag; updatedEntry.Address = newLogicalAddress & Constants.kAddressMask; @@ -1242,14 +1268,17 @@ internal OperationStatus InternalDelete( { // Note that this is the new logicalAddress; we have not retrieved the old one if it was below HeadAddress, and thus // we do not know whether 'logicalAddress' belongs to 'key' or is a collision. - fasterSession.PostSingleDeleter(ref key, ref hlog.GetInfo(newPhysicalAddress), newLogicalAddress); + fasterSession.PostSingleDeleter(ref key, ref recordInfo, newLogicalAddress); + recordInfo.UnlockExclusive(); + pendingContext.recordInfo = recordInfo; pendingContext.logicalAddress = newLogicalAddress; status = OperationStatus.SUCCESS; goto LatchRelease; } else { - hlog.GetInfo(newPhysicalAddress).Invalid = true; + recordInfo.UnlockExclusive(); + recordInfo.Invalid = true; status = OperationStatus.RETRY_NOW; goto LatchRelease; } @@ -1445,7 +1474,8 @@ internal void InternalContinuePendingReadCopyToTail @@ -1563,11 +1593,12 @@ internal OperationStatus InternalContinuePendingRMW( - ref Key key, ref Input input, ref Value value, + ref Key key, ref Input input, ref Value value, ref Output output, long expectedLogicalAddress, FasterSession fasterSession, FasterExecutionContext currentCtx, @@ -1918,7 +1952,7 @@ internal OperationStatus InternalCopyToTail( - ref Key key, ref Input input, ref Value value, + ref Key key, ref Input input, ref Value value, ref Output output, long foundLogicalAddress, FasterSession fasterSession, FasterExecutionContext currentCtx, bool noReadCache = false) where FasterSession : IFasterSession - => InternalTryCopyToTail(currentCtx, ref key, ref input, ref value, foundLogicalAddress, fasterSession, currentCtx, noReadCache); + => InternalTryCopyToTail(currentCtx, ref key, ref input, ref value, ref output, foundLogicalAddress, fasterSession, currentCtx, noReadCache); /// /// Helper function for trying to copy existing immutable records (at foundLogicalAddress) to the tail, /// used in /// , - /// and + /// and /// /// Succeed only if the record for the same key hasn't changed. /// @@ -1953,6 +1987,7 @@ internal OperationStatus InternalTryCopyToTail /// /// + /// /// /// The expected address of the record being copied. /// @@ -1970,7 +2005,7 @@ internal OperationStatus InternalTryCopyToTail internal OperationStatus InternalTryCopyToTail( FasterExecutionContext opCtx, - ref Key key, ref Input input, ref Value value, + ref Key key, ref Input input, ref Value value, ref Output output, long expectedLogicalAddress, FasterSession fasterSession, FasterExecutionContext currentCtx, @@ -2028,7 +2063,7 @@ internal OperationStatus InternalTryCopyToTail(Func if (untilAddress < scanUntil) LogScanForValidity(ref untilAddress, scanUntil, tempKvSession); - Input input = default; + Input input = default; + Output output = default; using var iter3 = tempKv.Log.Scan(tempKv.Log.BeginAddress, tempKv.Log.TailAddress); while (iter3.GetNext(out var recordInfo)) { @@ -396,7 +397,7 @@ public long Compact(Func // Note: we use untilAddress as expectedAddress here. // As long as there's no record of the same key whose address is greater than untilAddress, // i.e., the last address that this compact covers, we are safe to copy the old record to the tail. - fhtSession.CopyToTail(ref iter3.GetKey(), ref input, ref iter3.GetValue(), expectedAddress); + fhtSession.CopyToTail(ref iter3.GetKey(), ref input, ref iter3.GetValue(), ref output, expectedAddress); } } } diff --git a/cs/src/core/Index/FASTER/LogCompactionFunctions.cs b/cs/src/core/Index/FASTER/LogCompactionFunctions.cs index 102252282..23490a735 100644 --- a/cs/src/core/Index/FASTER/LogCompactionFunctions.cs +++ b/cs/src/core/Index/FASTER/LogCompactionFunctions.cs @@ -34,7 +34,7 @@ public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long addre /// For compaction, we never perform concurrent writes as rolled over data defers to /// newly inserted data for the same key. /// - public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) => true; + public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) => true; public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) { } @@ -63,13 +63,18 @@ public void RMWCompletionCallback(ref Key key, ref Input input, ref Output outpu /// /// Write compacted live value to store /// - public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) => _functions.SingleWriter(ref key, ref input, ref src, ref dst, ref recordInfo, address); - public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) { } + public void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) + => _functions.SingleWriter(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); + public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { } public void UpsertCompletionCallback(ref Key key, ref Input input, ref Value value, Context ctx) { } public bool SupportsLocking => false; - public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) { } - public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) => true; + public void LockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) { } + public void UnlockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) { } + public bool TryLockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) => true; + public void LockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) { } + public bool UnlockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) => true; + public bool TryLockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) => true; } } \ No newline at end of file diff --git a/cs/src/core/Index/Interfaces/FunctionsBase.cs b/cs/src/core/Index/Interfaces/FunctionsBase.cs index b0b86f462..29508f1fd 100644 --- a/cs/src/core/Index/Interfaces/FunctionsBase.cs +++ b/cs/src/core/Index/Interfaces/FunctionsBase.cs @@ -35,11 +35,11 @@ protected FunctionsBase(bool locking = false, bool postOps = false) public virtual bool SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, long address) => true; /// - public virtual bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) { dst = src; return true; } + public virtual bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { dst = src; return true; } /// - public virtual void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) => dst = src; + public virtual void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) => dst = src; /// - public virtual void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) { } + public virtual void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { } /// public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) { } @@ -75,23 +75,26 @@ public virtual void CheckpointCompletionCallback(string sessionId, CommitPoint c public virtual bool SupportsLocking => locking; /// - public virtual void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) - { - if (lockType == LockType.Exclusive) - recordInfo.LockExclusive(); - else - recordInfo.LockShared(); - } + public virtual void LockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) => recordInfo.LockExclusive(); + + /// + public virtual void UnlockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) => recordInfo.UnlockExclusive(); + + /// + public virtual bool TryLockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) => recordInfo.TryLockExclusive(spinCount); + + /// + public virtual void LockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext) => recordInfo.LockShared(); /// - public virtual bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) + public virtual bool UnlockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext) { - if (lockType == LockType.Exclusive) - recordInfo.UnlockExclusive(); - else - recordInfo.UnlockShared(); + recordInfo.UnlockShared(); return true; } + + /// + public virtual bool TryLockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1) => recordInfo.TryLockShared(spinCount); } /// @@ -123,9 +126,9 @@ public override bool SingleReader(ref Key key, ref Value input, ref Value value, } /// - public override bool ConcurrentWriter(ref Key key, ref Value input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) { dst = src; return true; } + public override bool ConcurrentWriter(ref Key key, ref Value input, ref Value src, ref Value dst, ref Value output, ref RecordInfo recordInfo, long address) { dst = src; return true; } /// - public override void SingleWriter(ref Key key, ref Value input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) => dst = src; + public override void SingleWriter(ref Key key, ref Value input, ref Value src, ref Value dst, ref Value output, ref RecordInfo recordInfo, long address) => dst = src; /// public override void InitialUpdater(ref Key key, ref Value input, ref Value value, ref Value output, ref RecordInfo recordInfo, long address) => value = input; diff --git a/cs/src/core/Index/Interfaces/IFasterSession.cs b/cs/src/core/Index/Interfaces/IFasterSession.cs index da361efa2..33df4c8c1 100644 --- a/cs/src/core/Index/Interfaces/IFasterSession.cs +++ b/cs/src/core/Index/Interfaces/IFasterSession.cs @@ -24,6 +24,15 @@ internal interface IFasterSession /// internal interface IFasterSession : IFunctions, IFasterSession, IVariableLengthStruct { + // Overloads for locking. Except for readcache/copy-to-tail usage of SingleWriter, all operations that append a record must lock in the () call and unlock + // in the Post call; otherwise another session can try to access the record as soon as it's CAS'd and before Post is called. + void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address, out long lockContext); + void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address, long lockContext); + void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address, out long lockContext); + void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address, long lockContext); + void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address, out long lockContext); + bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address, long lockContext); + bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false); IHeapContainer GetHeapContainer(ref Input input); diff --git a/cs/src/core/Index/Interfaces/IFunctions.cs b/cs/src/core/Index/Interfaces/IFunctions.cs index 7b581f4ed..4d9ab1b07 100644 --- a/cs/src/core/Index/Interfaces/IFunctions.cs +++ b/cs/src/core/Index/Interfaces/IFunctions.cs @@ -15,15 +15,11 @@ public interface IFunctions { #region Optional features supported by this implementation /// - /// Whether this Functions instance supports locking. Iff so, FASTER will call - /// and . + /// Whether this Functions instance supports locking. Iff so, FASTER will call + /// or to lock the record as appropriate, and + /// or to match. /// - bool SupportsLocking -#if NETSTANDARD2_1 || NET - => false; -#else - { get; } -#endif + bool SupportsLocking { get; } /// /// Whether this Functions instance supports operations on records after they have been successfully appended to the log. For example, @@ -31,12 +27,7 @@ bool SupportsLocking /// can add items to it. /// /// Once the record has been appended it is visible to other sessions, so locking will be done per - bool SupportsPostOperations -#if NETSTANDARD2_1 || NET - => false; -#else - { get; } -#endif + bool SupportsPostOperations { get; } #endregion Optional features supported by this implementation #region Reads @@ -47,7 +38,7 @@ bool SupportsPostOperations /// The user input for computing from /// The value for the record being read /// The location where is to be copied - /// A reference to the header of the record; may be used by + /// A reference to the header of the record /// The logical address of the record being read from, or zero if this was a readcache write; used as a RecordId by indexing if nonzero /// True if the value was available, else false (e.g. the value was expired) bool SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, long address); @@ -59,7 +50,7 @@ bool SupportsPostOperations /// The user input for computing from /// The value for the record being read /// The location where is to be copied - /// A reference to the header of the record; may be used by + /// A reference to the header of the record /// The logical address of the record being copied to; used as a RecordId by indexing /// True if the value was available, else false (e.g. the value was expired) bool ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, long address); @@ -85,9 +76,10 @@ bool SupportsPostOperations /// The user input to be used for computing /// The previous value to be copied/updated /// The destination to be updated; because this is an copy to a new location, there is no previous value there. - /// A reference to the header of the record; may be used by + /// The location where the result of the update may be placed + /// A reference to the header of the record /// The logical address of the record being copied to; used as a RecordId by indexing - void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address); + void SingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address); /// /// Called after a record containing an upsert of a new key has been successfully inserted at the tail of the log. @@ -96,14 +88,10 @@ bool SupportsPostOperations /// The user input that was used to compute /// The previous value to be copied/updated /// The destination to be updated; because this is an copy to a new location, there is no previous value there. - /// A reference to the header of the record; may be used by + /// The location where the result of the update may be placed + /// A reference to the header of the record /// The logical address of the record being written; used as a RecordId by indexing - void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) -#if NETSTANDARD2_1 || NET - { } -#else - ; -#endif + void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address); /// /// Concurrent writer; called on an Upsert that finds the record in the mutable range. @@ -112,10 +100,11 @@ void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst /// The user input to be used for computing /// The value to be copied to /// The location where is to be copied; because this method is called only for in-place updates, there is a previous value there. - /// A reference to the header of the record; may be used by + /// The location where the result of the update may be placed + /// A reference to the header of the record /// The logical address of the record being copied to; used as a RecordId by indexing"/> /// True if the value was written, else false - bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address); + bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address); /// /// Upsert completion @@ -135,11 +124,7 @@ void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Value dst /// The key for this record /// The user input to be used for computing the updated value /// The location where the result of the operation is to be copied - bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output) -#if NETSTANDARD2_1 || NET - => true -#endif - ; + bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output); /// /// Initial update for RMW (insert at the tail of the log). @@ -148,7 +133,7 @@ bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output) /// The user input to be used for computing the updated /// The destination to be updated; because this is an insert, there is no previous value there. /// The location where the result of the operation on is to be copied - /// A reference to the header of the record; may be used by + /// A reference to the header of the record /// The logical address of the record being updated; used as a RecordId by indexing void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address); @@ -159,14 +144,9 @@ bool NeedInitialUpdate(ref Key key, ref Input input, ref Output output) /// The user input to be used for computing the updated /// The destination to be updated; because this is an insert, there is no previous value there. /// The location where the result of the operation on is to be copied - /// A reference to the header of the record; may be used by + /// A reference to the header of the record /// The logical address of the record being written; used as a RecordId by indexing - void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) -#if NETSTANDARD2_1 || NET - { } -#else - ; -#endif + void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address); #endregion InitialUpdater #region CopyUpdater @@ -177,11 +157,7 @@ void PostInitialUpdater(ref Key key, ref Input input, ref Value value, ref Outpu /// The user input to be used for computing the updated value /// The existing value that would be copied. /// The location where the result of the operation on is to be copied - bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) -#if NETSTANDARD2_1 || NET - => true -#endif - ; + bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output); /// /// Copy-update for RMW (RCU (Read-Copy-Update) to the tail of the log) @@ -191,7 +167,7 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output /// The previous value to be copied/updated /// The destination to be updated; because this is an copy to a new location, there is no previous value there. /// The location where is to be copied - /// A reference to the header of the record; may be used by + /// A reference to the header of the record /// The logical address of the record being updated; used as a RecordId by indexing void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address); @@ -203,14 +179,10 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output /// The previous value to be copied/updated; may also be disposed here if appropriate /// The destination to be updated; because this is an copy to a new location, there is no previous value there. /// The location where is to be copied - /// A reference to the header of the record; may be used by + /// A reference to the header of the record /// The logical address of the record being copied into; used as a RecordId by indexing /// True if the value was successfully updated, else false (e.g. the value was expired) - bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) -#if NETSTANDARD2_1 || NET - => true -#endif - ; + bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address); #endregion CopyUpdater #region InPlaceUpdater @@ -221,7 +193,7 @@ bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value /// The user input to be used for computing the updated /// The destination to be updated; because this is an in-place update, there is a previous value there. /// The location where the result of the operation on is to be copied - /// A reference to the header of the record; may be used by + /// A reference to the header of the record /// The logical address of the record being updated; used as a RecordId by indexing /// True if the value was successfully updated, else false (e.g. the value was expired) bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address); @@ -244,23 +216,18 @@ bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value /// Called after a record marking a Delete (with Tombstone set) has been successfully inserted at the tail of the log. /// /// The key for the record that was deleted - /// A reference to the header of the record; may be used by + /// A reference to the header of the record /// The logical address of the record that was inserted with its tombstone set; used as a RecordId by indexing /// This does not have the address of the record that contains the value at 'key'; Delete does not retrieve records below HeadAddress, so /// the last record we have in the 'key' chain may belong to 'key' or may be a collision. - void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long address) -#if NETSTANDARD2_1 || NET - { } -#else - ; -#endif + void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long address); /// /// Concurrent deleter; called on an Delete that finds the record in the mutable range. /// /// The key for the record to be deleted /// The value for the record being deleted; because this method is called only for in-place updates, there is a previous value there. Usually this is ignored or assigned 'default'. - /// A reference to the header of the record; may be used by + /// A reference to the header of the record /// The logical address of the record being deleted; used as a RecordId by indexing /// For Object Value types, Dispose() can be called here. If recordInfo.Invalid is true, this is called after the record was allocated and populated, but could not be appended at the end of the log. /// True if the value was successfully deleted, else false (e.g. the record was sealed) @@ -276,26 +243,64 @@ void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long address) #region Locking /// - /// User-provided lock call, defaulting to no-op. A default implementation is available via and . + /// User-provided exclusive-lock call, defaulting to no-op. A default implementation is available via . /// /// The header for the current record /// The key for the current record /// The value for the current record - /// The type of lock being taken - /// Context-specific information; will be passed to + /// Context-specific information; will be passed to /// /// This is called only for records guaranteed to be in the mutable range. /// - void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext); + void LockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext); /// - /// User-provided unlock call, defaulting to no-op. A default exclusive implementation is available via and . + /// User-provided exclusive unlock call, defaulting to no-op. A default exclusive implementation is available via . /// /// The header for the current record /// The key for the current record /// The value for the current record - /// The type of lock being released, as passed to - /// The context returned from + /// The context returned from + /// + /// This is called only for records guaranteed to be in the mutable range. + /// + void UnlockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext); + + /// + /// User-provided try-exclusive-lock call, defaulting to no-op. A default implementation is available via . + /// + /// The header for the current record + /// The key for the current record + /// The value for the current record + /// Context-specific information; will be passed to + /// The number of times to spin in a try/yield loop until giving up; default is once + /// + /// This is called only for records guaranteed to be in the mutable range. + /// + /// + /// True if the lock was acquired, else false. + /// + bool TryLockExclusive(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1); + + /// + /// User-provided shared-lock call, defaulting to no-op. A default implementation is available via . + /// + /// The header for the current record + /// The key for the current record + /// The value for the current record + /// Context-specific information; will be passed to + /// + /// This is called only for records guaranteed to be in the mutable range. + /// + void LockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext); + + /// + /// User-provided shared-unlock call, defaulting to no-op. A default exclusive implementation is available via . + /// + /// The header for the current record + /// The key for the current record + /// The value for the current record + /// The context returned from /// /// This is called only for records guaranteed to be in the mutable range. /// @@ -303,17 +308,33 @@ void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long address) /// True if no inconsistencies detected. Otherwise, the lock and user's callback are reissued. /// Currently this is handled only for . /// - bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext); + bool UnlockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, long lockContext); + + /// + /// User-provided try-shared-lock call, defaulting to no-op. A default implementation is available via . + /// + /// The header for the current record + /// The key for the current record + /// The value for the current record + /// Context-specific information; will be passed to + /// The number of times to spin in a try/yield loop until giving up; default is once + /// + /// This is called only for records guaranteed to be in the mutable range. + /// + /// + /// True if the lock was acquired, else false. + /// + bool TryLockShared(ref RecordInfo recordInfo, ref Key key, ref Value value, ref long lockContext, int spinCount = 1); #endregion Locking - #region Other + #region Checkpointing /// /// Checkpoint completion callback (called per client session) /// /// Session ID reporting persistence /// Commit point descriptor void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint); - #endregion Other + #endregion Checkpointing } /// diff --git a/cs/src/core/Utilities/LockType.cs b/cs/src/core/Utilities/LockType.cs deleted file mode 100644 index 9bc3345a8..000000000 --- a/cs/src/core/Utilities/LockType.cs +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT license. - -namespace FASTER.core -{ - /// - /// Type of lock taken by FASTER on Read, Upsert, RMW, or Delete operations, either directly or within concurrent callback operations - /// - public enum LockType - { - /// - /// Shared lock, taken on Read - /// - Shared, - - /// - /// Exclusive lock, taken on Upsert, RMW, or Delete - /// - Exclusive - } -} diff --git a/cs/src/core/Utilities/Native32.cs b/cs/src/core/Utilities/Native32.cs index ecba29d51..6c192d633 100644 --- a/cs/src/core/Utilities/Native32.cs +++ b/cs/src/core/Utilities/Native32.cs @@ -305,10 +305,9 @@ public static void AffinitizeThreadShardedNuma(uint threadIdx, ushort nrOfProces /// public static bool EnableProcessPrivileges() { -#if NETSTANDARD || NET if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) return false; -#endif + if (processPrivilegeEnabled.HasValue) return processPrivilegeEnabled.Value; TOKEN_PRIVILEGES token_privileges = default(TOKEN_PRIVILEGES); @@ -351,10 +350,9 @@ private static uint CTL_CODE(uint DeviceType, uint Function, uint Method, uint A internal static bool EnableVolumePrivileges(string filename, SafeFileHandle handle) { -#if NETSTANDARD || NET if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) return false; -#endif + if (processPrivilegeEnabled == false) return false; @@ -391,10 +389,8 @@ internal static bool EnableVolumePrivileges(string filename, SafeFileHandle hand /// public static bool SetFileSize(SafeFileHandle file_handle, long file_size) { -#if NETSTANDARD || NET if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) return false; -#endif if (!SetFilePointerEx(file_handle, file_size, out long newFilePtr, 0)) { diff --git a/cs/src/core/VarLen/MemoryFunctions.cs b/cs/src/core/VarLen/MemoryFunctions.cs index 0c16134bc..e87c7bf87 100644 --- a/cs/src/core/VarLen/MemoryFunctions.cs +++ b/cs/src/core/VarLen/MemoryFunctions.cs @@ -23,13 +23,13 @@ public MemoryFunctions(MemoryPool memoryPool = default, bool locking = false) } /// - public override void SingleWriter(ref Key key, ref Memory input, ref Memory src, ref Memory dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref Key key, ref Memory input, ref Memory src, ref Memory dst, ref (IMemoryOwner, int) output, ref RecordInfo recordInfo, long address) { src.CopyTo(dst); } /// - public override bool ConcurrentWriter(ref Key key, ref Memory input, ref Memory src, ref Memory dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref Key key, ref Memory input, ref Memory src, ref Memory dst, ref (IMemoryOwner, int) output, ref RecordInfo recordInfo, long address) { // We can write the source (src) data to the existing destination (dst) in-place, // only if there is sufficient space @@ -86,7 +86,7 @@ public override void CopyUpdater(ref Key key, ref Memory input, ref Memory public override bool InPlaceUpdater(ref Key key, ref Memory input, ref Memory value, ref (IMemoryOwner, int) output, ref RecordInfo recordInfo, long address) { // The default implementation of IPU simply writes input to destination, if there is space - return ConcurrentWriter(ref key, ref input, ref input, ref value, ref recordInfo, address); + return ConcurrentWriter(ref key, ref input, ref input, ref value, ref output, ref recordInfo, address); } } } \ No newline at end of file diff --git a/cs/src/core/VarLen/SpanByteFunctions.cs b/cs/src/core/VarLen/SpanByteFunctions.cs index a48101ed3..c4b96b378 100644 --- a/cs/src/core/VarLen/SpanByteFunctions.cs +++ b/cs/src/core/VarLen/SpanByteFunctions.cs @@ -14,16 +14,17 @@ public class SpanByteFunctions : FunctionsBase /// - public SpanByteFunctions(bool locking = false) : base(locking) { } + /// + public SpanByteFunctions(bool locking = false, bool postOps = false) : base(locking, postOps) { } /// - public override void SingleWriter(ref Key key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref Key key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref Output output, ref RecordInfo recordInfo, long address) { src.CopyTo(ref dst); } /// - public override bool ConcurrentWriter(ref Key key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref Key key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref Output output, ref RecordInfo recordInfo, long address) { // We can write the source (src) data to the existing destination (dst) in-place, // only if there is sufficient space @@ -63,7 +64,7 @@ public override void CopyUpdater(ref Key key, ref SpanByte input, ref SpanByte o public override bool InPlaceUpdater(ref Key key, ref SpanByte input, ref SpanByte value, ref Output output, ref RecordInfo recordInfo, long address) { // The default implementation of IPU simply writes input to destination, if there is space - return ConcurrentWriter(ref key, ref input, ref input, ref value, ref recordInfo, address); + return ConcurrentWriter(ref key, ref input, ref input, ref value, ref output, ref recordInfo, address); } } @@ -79,7 +80,8 @@ public class SpanByteFunctions : SpanByteFunctions /// /// - public SpanByteFunctions(MemoryPool memoryPool = default, bool locking = false) : base(locking) + /// + public SpanByteFunctions(MemoryPool memoryPool = default, bool locking = false, bool postOps = false) : base(locking, postOps) { this.memoryPool = memoryPool ?? MemoryPool.Shared; } @@ -102,24 +104,27 @@ public unsafe override bool ConcurrentReader(ref SpanByte key, ref SpanByte inpu public override bool SupportsLocking => locking; /// - public override void Lock(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, LockType lockType, ref long lockContext) - { - if (lockType == LockType.Exclusive) - recordInfo.LockExclusive(); - else - recordInfo.LockShared(); - } + public override void LockExclusive(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, ref long lockContext) => recordInfo.LockExclusive(); + + /// + public override void UnlockExclusive(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, long lockContext) => recordInfo.UnlockExclusive(); + + /// + public override bool TryLockExclusive(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, ref long lockContext, int spinCount = 1) => recordInfo.TryLockExclusive(spinCount); /// - public override bool Unlock(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, LockType lockType, long lockContext) + public override void LockShared(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, ref long lockContext) => recordInfo.LockShared(); + + /// + public override bool UnlockShared(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, long lockContext) { - if (lockType == LockType.Exclusive) - recordInfo.UnlockExclusive(); - else - recordInfo.UnlockShared(); + recordInfo.UnlockShared(); return true; } - } + + /// + public override bool TryLockShared(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, ref long lockContext, int spinCount = 1) => recordInfo.TryLockShared(spinCount); +} /// /// Callback functions for SpanByte with byte[] output, for SpanByte key, value, input @@ -150,22 +155,25 @@ public override bool ConcurrentReader(ref SpanByte key, ref SpanByte input, ref public override bool SupportsLocking => locking; /// - public override void Lock(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, LockType lockType, ref long lockContext) - { - if (lockType == LockType.Exclusive) - recordInfo.LockExclusive(); - else - recordInfo.LockShared(); - } + public override void LockExclusive(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, ref long lockContext) => recordInfo.LockExclusive(); /// - public override bool Unlock(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, LockType lockType, long lockContext) + public override void UnlockExclusive(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, long lockContext) => recordInfo.UnlockExclusive(); + + /// + public override bool TryLockExclusive(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, ref long lockContext, int spinCount = 1) => recordInfo.TryLockExclusive(spinCount); + + /// + public override void LockShared(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, ref long lockContext) => recordInfo.LockShared(); + + /// + public override bool UnlockShared(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, long lockContext) { - if (lockType == LockType.Exclusive) - recordInfo.UnlockExclusive(); - else - recordInfo.UnlockShared(); + recordInfo.UnlockShared(); return true; } + + /// + public override bool TryLockShared(ref RecordInfo recordInfo, ref SpanByte key, ref SpanByte value, ref long lockContext, int spinCount = 1) => recordInfo.TryLockShared(spinCount); } } diff --git a/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj index e38f31b03..a50ffebd0 100644 --- a/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj +++ b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj @@ -1,7 +1,19 @@  + + + + net6.0;net5.0;netstandard2.1;netstandard2.0 + + + + + net5.0;netstandard2.1;netstandard2.0 + + + + - net5.0;netstandard2.1;netstandard2.0;net461 AnyCPU;x64 latest diff --git a/cs/test/ExpirationTests.cs b/cs/test/ExpirationTests.cs index 4717cde6f..c0fd6c28e 100644 --- a/cs/test/ExpirationTests.cs +++ b/cs/test/ExpirationTests.cs @@ -413,12 +413,12 @@ public override bool ConcurrentReader(ref int key, ref ExpirationInput input, re } // Upsert functions - public override void SingleWriter(ref int key, ref ExpirationInput input, ref VLValue src, ref VLValue dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref int key, ref ExpirationInput input, ref VLValue src, ref VLValue dst, ref ExpirationOutput output, ref RecordInfo recordInfo, long address) { src.CopyTo(ref dst); } - public override bool ConcurrentWriter(ref int key, ref ExpirationInput input, ref VLValue src, ref VLValue dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref int key, ref ExpirationInput input, ref VLValue src, ref VLValue dst, ref ExpirationOutput output, ref RecordInfo recordInfo, long address) { src.CopyTo(ref dst); return true; diff --git a/cs/test/FASTER.test.csproj b/cs/test/FASTER.test.csproj index f5d51ae7e..1dc5b7c4d 100644 --- a/cs/test/FASTER.test.csproj +++ b/cs/test/FASTER.test.csproj @@ -1,8 +1,19 @@  + + + + net6.0;net5.0;netcoreapp3.1 + + + + + net5.0;netcoreapp3.1 + + + + - net5.0;netcoreapp3.1;netcoreapp2.1;net461 - net5.0;netcoreapp3.1 AnyCPU;x64 latest true diff --git a/cs/test/InputOutputParameterTests.cs b/cs/test/InputOutputParameterTests.cs new file mode 100644 index 000000000..8398c1cee --- /dev/null +++ b/cs/test/InputOutputParameterTests.cs @@ -0,0 +1,191 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using FASTER.core; +using NUnit.Framework; +using System.IO; +using System.Threading.Tasks; + +namespace FASTER.test.InputOutputParameterTests +{ + [TestFixture] + class InputOutputParameterTests + { + const int AddValue = 10_000; + const int MultValue = 100; + const int NumRecs = 10; + + private FasterKV fht; + private ClientSession session; + private IDevice log; + + internal class UpsertInputFunctions : FunctionsBase + { + internal long lastWriteAddress; + + public override bool ConcurrentReader(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) + { + lastWriteAddress = address; + return SingleReader(ref key, ref input, ref value, ref output, ref recordInfo, address); + } + + /// + public override bool SingleReader(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) + { + Assert.AreEqual(key * input, value); + lastWriteAddress = address; + output = value + AddValue; + return true; + } + + /// + public override bool ConcurrentWriter(ref int key, ref int input, ref int src, ref int dst, ref int output, ref RecordInfo recordInfo, long address) + { + SingleWriter(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); + return true; + } + /// + public override void SingleWriter(ref int key, ref int input, ref int src, ref int dst, ref int output, ref RecordInfo recordInfo, long address) + { + lastWriteAddress = address; + dst = output = src * input; + } + + /// + public override void PostSingleWriter(ref int key, ref int input, ref int src, ref int dst, ref int output, ref RecordInfo recordInfo, long address) + { + Assert.AreEqual(lastWriteAddress, address); + Assert.AreEqual(key * input, dst); + Assert.AreEqual(dst, output); + } + + public override bool InPlaceUpdater(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) + { + InitialUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); + return true; + } + public override void InitialUpdater(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) + { + lastWriteAddress = address; + value = output = key * input; + } + /// + public override void PostInitialUpdater(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) + { + Assert.AreEqual(lastWriteAddress, address); + Assert.AreEqual(key * input, value); + Assert.AreEqual(value, output); + } + } + + [SetUp] + public void Setup() + { + TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); + + log = TestUtils.CreateTestDevice(TestUtils.DeviceType.LocalMemory, Path.Combine(TestUtils.MethodTestDir, "Device.log")); + fht = new FasterKV + (128, new LogSettings { LogDevice = log, MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }); + session = fht.For(new UpsertInputFunctions()).NewSession(); + } + + [TearDown] + public void TearDown() + { + session?.Dispose(); + session = null; + fht?.Dispose(); + fht = null; + log?.Dispose(); + log = null; + TestUtils.DeleteDirectory(TestUtils.MethodTestDir); + } + + // Simple Upsert test with Input + [Test] + [Category(TestUtils.FasterKVTestCategory)] + [Category(TestUtils.SmokeTestCategory)] + public async Task InputOutputParametersTest([Values]bool useRMW, [Values]bool isAsync) + { + int input = MultValue; + Status status; + int output = -1; + bool loading = true; + + async Task doWrites() + { + for (int key = 0; key < NumRecs; ++key) + { + var tailAddress = this.fht.Log.TailAddress; + RecordMetadata recordMetadata; + if (isAsync) + { + if (useRMW) + { + var r = await session.RMWAsync(ref key, ref input); + if ((key & 0x1) == 0) + { + while (r.Status == Status.PENDING) + r = await r.CompleteAsync(); + status = r.Status; + output = r.Output; + recordMetadata = r.RecordMetadata; + } + else + { + (status, output) = r.Complete(out recordMetadata); + } + } + else + { + var r = await session.UpsertAsync(ref key, ref input, ref key); + if ((key & 0x1) == 0) + { + while (r.Status == Status.PENDING) + r = await r.CompleteAsync(); + status = r.Status; + output = r.Output; + recordMetadata = r.RecordMetadata; + } + else + { + (status, output) = r.Complete(out recordMetadata); + } + } + } + else + { + status = useRMW + ? session.RMW(ref key, ref input, ref output, out recordMetadata) + : session.Upsert(ref key, ref input, ref key, ref output, out recordMetadata); + } + Assert.AreEqual(loading && useRMW ? Status.NOTFOUND : Status.OK, status); + Assert.AreEqual(key * input, output); + if (loading) + Assert.AreEqual(tailAddress, session.functions.lastWriteAddress); + Assert.AreEqual(session.functions.lastWriteAddress, recordMetadata.Address); + } + } + + void doReads() + { + for (int key = 0; key < NumRecs; ++key) + { + session.Read(ref key, ref input, ref output); + Assert.AreEqual(key * input + AddValue, output); + } + } + + // SingleWriter (records do not yet exist) + await doWrites(); + doReads(); + + loading = false; + input *= input; + + // ConcurrentWriter (update existing records) + await doWrites(); + doReads(); + } + } +} diff --git a/cs/test/LockTests.cs b/cs/test/LockTests.cs index 87cff461b..39122cd8d 100644 --- a/cs/test/LockTests.cs +++ b/cs/test/LockTests.cs @@ -25,7 +25,7 @@ static bool Increment(ref int dst) return true; } - public override bool ConcurrentWriter(ref int key, ref int input, ref int src, ref int dst, ref RecordInfo recordInfo, long address) => Increment(ref dst); + public override bool ConcurrentWriter(ref int key, ref int input, ref int src, ref int dst, ref int output, ref RecordInfo recordInfo, long address) => Increment(ref dst); public override bool InPlaceUpdater(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) => Increment(ref value); } diff --git a/cs/test/ObjectRecoveryTest2.cs b/cs/test/ObjectRecoveryTest2.cs index 9f5732782..beb8a9b21 100644 --- a/cs/test/ObjectRecoveryTest2.cs +++ b/cs/test/ObjectRecoveryTest2.cs @@ -247,7 +247,7 @@ public override bool SingleReader(ref MyKey key, ref MyInput input, ref MyValue return true; } - public override void SingleWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref RecordInfo recordInfo, long address) => dst = src; + public override void SingleWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref RecordInfo recordInfo, long address) => dst = src; public override bool ConcurrentReader(ref MyKey key, ref MyInput input, ref MyValue value, ref MyOutput dst, ref RecordInfo recordInfo, long address) { @@ -255,7 +255,7 @@ public override bool ConcurrentReader(ref MyKey key, ref MyInput input, ref MyVa return true; } - public override bool ConcurrentWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref RecordInfo recordInfo, long address) { if (src == null) return false; diff --git a/cs/test/ObjectTestTypes.cs b/cs/test/ObjectTestTypes.cs index 3b19f30f3..737994056 100644 --- a/cs/test/ObjectTestTypes.cs +++ b/cs/test/ObjectTestTypes.cs @@ -84,7 +84,7 @@ public override bool ConcurrentReader(ref MyKey key, ref MyInput input, ref MyVa return true; } - public override bool ConcurrentWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref RecordInfo recordInfo, long address) { dst.value = src.value; return true; @@ -109,7 +109,7 @@ public override bool SingleReader(ref MyKey key, ref MyInput input, ref MyValue return true; } - public override void SingleWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref RecordInfo recordInfo, long address) { dst = src; } @@ -143,7 +143,7 @@ public override bool ConcurrentReader(ref MyValue key, ref MyInput input, ref My return true; } - public override bool ConcurrentWriter(ref MyValue key, ref MyInput input, ref MyValue src, ref MyValue dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref MyValue key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref RecordInfo recordInfo, long address) { dst.value = src.value; return true; @@ -168,7 +168,7 @@ public override bool SingleReader(ref MyValue key, ref MyInput input, ref MyValu return true; } - public override void SingleWriter(ref MyValue key, ref MyInput input, ref MyValue src, ref MyValue dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref MyValue key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref RecordInfo recordInfo, long address) { dst = src; } @@ -202,7 +202,7 @@ public override bool ConcurrentReader(ref MyKey key, ref MyInput input, ref MyVa return true; } - public override bool ConcurrentWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref RecordInfo recordInfo, long address) { dst = src; return true; @@ -237,7 +237,7 @@ public override bool SingleReader(ref MyKey key, ref MyInput input, ref MyValue return true; } - public override void SingleWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref MyKey key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref RecordInfo recordInfo, long address) { dst = src; } @@ -269,7 +269,7 @@ public override bool ConcurrentReader(ref int key, ref MyInput input, ref MyValu return true; } - public override bool ConcurrentWriter(ref int key, ref MyInput input, ref MyValue src, ref MyValue dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref int key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref RecordInfo recordInfo, long address) { dst.value = src.value; return true; @@ -281,7 +281,7 @@ public override bool SingleReader(ref int key, ref MyInput input, ref MyValue va return true; } - public override void SingleWriter(ref int key, ref MyInput input, ref MyValue src, ref MyValue dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref int key, ref MyInput input, ref MyValue src, ref MyValue dst, ref MyOutput output, ref RecordInfo recordInfo, long address) { dst = src; } @@ -349,13 +349,13 @@ public override bool ConcurrentReader(ref MyKey key, ref MyInput input, ref MyLa return true; } - public override bool ConcurrentWriter(ref MyKey key, ref MyInput input, ref MyLargeValue src, ref MyLargeValue dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref MyKey key, ref MyInput input, ref MyLargeValue src, ref MyLargeValue dst, ref MyLargeOutput output, ref RecordInfo recordInfo, long address) { dst = src; return true; } - public override void SingleWriter(ref MyKey key, ref MyInput input, ref MyLargeValue src, ref MyLargeValue dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref MyKey key, ref MyInput input, ref MyLargeValue src, ref MyLargeValue dst, ref MyLargeOutput output, ref RecordInfo recordInfo, long address) { dst = src; } diff --git a/cs/test/PostOperationsTests.cs b/cs/test/PostOperationsTests.cs index 919094a5e..5cd581cc0 100644 --- a/cs/test/PostOperationsTests.cs +++ b/cs/test/PostOperationsTests.cs @@ -26,7 +26,7 @@ internal void Clear() internal PostFunctions() : base(locking: false, postOps: true) { } - public override void PostSingleWriter(ref int key, ref int input, ref int src, ref int dst, ref RecordInfo recordInfo, long address) { this.pswAddress = address; } + public override void PostSingleWriter(ref int key, ref int input, ref int src, ref int dst, ref int output, ref RecordInfo recordInfo, long address) { this.pswAddress = address; } public override void InitialUpdater(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) { value = input; } /// diff --git a/cs/test/Properties/AssemblyInfo.cs b/cs/test/Properties/AssemblyInfo.cs index d586faa81..061c15ce0 100644 --- a/cs/test/Properties/AssemblyInfo.cs +++ b/cs/test/Properties/AssemblyInfo.cs @@ -40,7 +40,6 @@ // Make all fixtures in the test assembly run in parallel #if false // disable parallelism until all the problems are resolved -//#if NETCOREAPP || NET // net461 runs x86 by default so OOMs on current memory usage by tests when running multiple tests simultaneously -[assembly: Parallelizable(ParallelScope.Fixtures)] - //[assembly: LevelOfParallelism(4)] // For reduced parallelization of net461 if we reduce memory usage in tests + [assembly: Parallelizable(ParallelScope.Fixtures)] + //[assembly: LevelOfParallelism(4)] // For reduced parallelization if needed #endif diff --git a/cs/test/ReadAddressTests.cs b/cs/test/ReadAddressTests.cs index 4a77deb92..69370f5bd 100644 --- a/cs/test/ReadAddressTests.cs +++ b/cs/test/ReadAddressTests.cs @@ -73,12 +73,12 @@ public override bool SingleReader(ref Key key, ref Value input, ref Value value, } // Return false to force a chain of values. - public override bool ConcurrentWriter(ref Key key, ref Value input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) => false; + public override bool ConcurrentWriter(ref Key key, ref Value input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) => false; public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => false; // Record addresses - public override void SingleWriter(ref Key key, ref Value input, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref Key key, ref Value input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { dst = src; this.lastWriteAddress = address; diff --git a/cs/test/SharedDirectoryTests.cs b/cs/test/SharedDirectoryTests.cs index 1bdd1d049..046b8c6b0 100644 --- a/cs/test/SharedDirectoryTests.cs +++ b/cs/test/SharedDirectoryTests.cs @@ -83,9 +83,7 @@ public async ValueTask SharedLogDirectory([Values]bool isAsync) // Dispose original, files should not be deleted on Windows this.original.TearDown(); -#if NETCOREAPP || NET if (RuntimeInformation.IsOSPlatform(System.Runtime.InteropServices.OSPlatform.Windows)) -#endif { // Clone should still work on Windows Assert.IsFalse(IsDirectoryEmpty(this.sharedLogDirectory)); @@ -107,10 +105,9 @@ private struct FasterTestInstance public void Initialize(string checkpointDirectory, string logDirectory, bool populateLogHandles = false) { -#if NETCOREAPP || NET if (!RuntimeInformation.IsOSPlatform(System.Runtime.InteropServices.OSPlatform.Windows)) populateLogHandles = false; -#endif + this.CheckpointDirectory = checkpointDirectory; this.LogDirectory = logDirectory; @@ -136,13 +133,11 @@ public void Initialize(string checkpointDirectory, string logDirectory, bool pop } } -#if NETCOREAPP || NET if (!RuntimeInformation.IsOSPlatform(System.Runtime.InteropServices.OSPlatform.Windows)) { this.LogDevice = new ManagedLocalStorageDevice(deviceFileName, deleteOnClose: true); } else -#endif { this.LogDevice = new LocalStorageDevice(deviceFileName, deleteOnClose: true, disableFileBuffering: false, initialLogFileHandles: initialHandles); } diff --git a/cs/test/TestTypes.cs b/cs/test/TestTypes.cs index acc40a958..5381f2033 100644 --- a/cs/test/TestTypes.cs +++ b/cs/test/TestTypes.cs @@ -202,9 +202,9 @@ public override bool ConcurrentReader(ref KeyStruct key, ref InputStruct input, } // Upsert functions - public override void SingleWriter(ref KeyStruct key, ref InputStruct input, ref ValueStruct src, ref ValueStruct dst, ref RecordInfo recordInfo, long address) => dst = src; + public override void SingleWriter(ref KeyStruct key, ref InputStruct input, ref ValueStruct src, ref ValueStruct dst, ref OutputStruct output, ref RecordInfo recordInfo, long address) => dst = src; - public override bool ConcurrentWriter(ref KeyStruct key, ref InputStruct input, ref ValueStruct src, ref ValueStruct dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref KeyStruct key, ref InputStruct input, ref ValueStruct src, ref ValueStruct dst, ref OutputStruct output, ref RecordInfo recordInfo, long address) { Interlocked.Increment(ref _concurrentWriterCallCount); return false; diff --git a/cs/test/UpsertTests.cs b/cs/test/UpsertTests.cs deleted file mode 100644 index b8a6b799f..000000000 --- a/cs/test/UpsertTests.cs +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT license. - -using FASTER.core; -using NUnit.Framework; -using System.IO; -using System.Threading.Tasks; - -namespace FASTER.test.UpsertTests -{ - [TestFixture] - class UpsertTests - { - const int AddValue = 10_000; - const int MultValue = 100; - const int NumRecs = 10; - - private FasterKV fht; - private ClientSession session; - private IDevice log; - - internal class UpsertInputFunctions : FunctionsBase - { - public override bool ConcurrentReader(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) - => SingleReader(ref key, ref input, ref value, ref output, ref recordInfo, address); - - /// - public override bool SingleReader(ref int key, ref int input, ref int value, ref int output, ref RecordInfo recordInfo, long address) - { - Assert.AreEqual(key * input, value); - output = value + AddValue; - return true; - } - - /// - public override bool ConcurrentWriter(ref int key, ref int input, ref int src, ref int dst, ref RecordInfo recordInfo, long address) - { - SingleWriter(ref key, ref input, ref src, ref dst, ref recordInfo, address); - return true; - } - /// - public override void SingleWriter(ref int key, ref int input, ref int src, ref int dst, ref RecordInfo recordInfo, long address) => dst = src * input; - /// - public override void PostSingleWriter(ref int key, ref int input, ref int src, ref int dst, ref RecordInfo recordInfo, long address) => Assert.AreEqual(key * input, dst); - } - - [SetUp] - public void Setup() - { - TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); - - log = TestUtils.CreateTestDevice(TestUtils.DeviceType.LocalMemory, Path.Combine(TestUtils.MethodTestDir, "Device.log")); - fht = new FasterKV - (128, new LogSettings { LogDevice = log, MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }); - session = fht.For(new UpsertInputFunctions()).NewSession(); - } - - [TearDown] - public void TearDown() - { - session?.Dispose(); - session = null; - fht?.Dispose(); - fht = null; - log?.Dispose(); - log = null; - TestUtils.DeleteDirectory(TestUtils.MethodTestDir); - } - - // Simple Upsert test with Input - [Test] - [Category(TestUtils.FasterKVTestCategory)] - [Category(TestUtils.SmokeTestCategory)] - public async Task UpsertWithInputsTest([Values]bool isAsync) - { - int input = MultValue; - int output = -1; - - async Task doWrites() - { - for (int key = 0; key < NumRecs; ++key) - { - if (isAsync) - { - var r = await session.UpsertAsync(ref key, ref input, ref key); - while (r.Status == Status.PENDING) - r = await r.CompleteAsync(); - } - else - session.Upsert(ref key, ref input, ref key); - } - } - - void doReads() - { - for (int key = 0; key < NumRecs; ++key) - { - session.Read(ref key, ref input, ref output); - Assert.AreEqual(key * input + AddValue, output); - } - } - - // SingleWriter (records do not yet exist) - await doWrites(); - doReads(); - - // ConcurrentWriter (update existing records) - await doWrites(); - doReads(); - } - } -} diff --git a/cs/test/VLTestTypes.cs b/cs/test/VLTestTypes.cs index a7553bc77..d30583e76 100644 --- a/cs/test/VLTestTypes.cs +++ b/cs/test/VLTestTypes.cs @@ -121,12 +121,12 @@ public override bool ConcurrentReader(ref Key key, ref Input input, ref VLValue } // Upsert functions - public override void SingleWriter(ref Key key, ref Input input, ref VLValue src, ref VLValue dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref Key key, ref Input input, ref VLValue src, ref VLValue dst, ref int[] output, ref RecordInfo recordInfo, long address) { src.CopyTo(ref dst); } - public override bool ConcurrentWriter(ref Key key, ref Input input, ref VLValue src, ref VLValue dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref Key key, ref Input input, ref VLValue src, ref VLValue dst, ref int[] output, ref RecordInfo recordInfo, long address) { if (src.length != dst.length) return false; @@ -166,12 +166,12 @@ public override bool ConcurrentReader(ref VLValue key, ref Input input, ref VLVa } // Upsert functions - public override void SingleWriter(ref VLValue key, ref Input input, ref VLValue src, ref VLValue dst, ref RecordInfo recordInfo, long address) + public override void SingleWriter(ref VLValue key, ref Input input, ref VLValue src, ref VLValue dst, ref int[] output, ref RecordInfo recordInfo, long address) { src.CopyTo(ref dst); } - public override bool ConcurrentWriter(ref VLValue key, ref Input input, ref VLValue src, ref VLValue dst, ref RecordInfo recordInfo, long address) + public override bool ConcurrentWriter(ref VLValue key, ref Input input, ref VLValue src, ref VLValue dst, ref int[] output, ref RecordInfo recordInfo, long address) { if (src.length != dst.length) return false;