From 5f6fb1ab79d753e7ffea09a77cccf7a66dd6107c Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Tue, 2 Apr 2019 16:10:16 -0700 Subject: [PATCH 01/15] Making LightEpoch instance specific --- cs/src/core/Allocator/AllocatorBase.cs | 12 +++++++---- cs/src/core/Allocator/BlittableAllocator.cs | 6 ++---- cs/src/core/Allocator/GenericAllocator.cs | 5 ++--- cs/src/core/Allocator/MallocFixedPageSize.cs | 9 +++++++-- cs/src/core/Epochs/LightEpoch.cs | 21 ++++++++------------ cs/src/core/Index/FASTER/FASTER.cs | 8 ++++---- cs/src/core/Index/FASTER/FASTERBase.cs | 5 +++-- 7 files changed, 34 insertions(+), 32 deletions(-) diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 8168e6785..8457ebc71 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -60,7 +60,7 @@ public unsafe abstract class AllocatorBase : IDisposable /// /// Epoch information /// - protected LightEpoch epoch; + protected readonly LightEpoch epoch; /// /// Comparer @@ -415,8 +415,9 @@ public unsafe abstract class AllocatorBase : IDisposable /// /// /// - public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback) - : this(settings, comparer) + /// + public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback, LightEpoch epoch) + : this(settings, comparer, epoch) { if (evictCallback != null) { @@ -430,9 +431,12 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer /// /// /// - public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer) + /// + public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, LightEpoch epoch) { this.comparer = comparer; + this.epoch = epoch ?? new LightEpoch(); + settings.LogDevice.Initialize(1L << settings.SegmentSizeBits); settings.ObjectLogDevice?.Initialize(1L << settings.SegmentSizeBits); diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs index ee410f039..edb03c1dc 100644 --- a/cs/src/core/Allocator/BlittableAllocator.cs +++ b/cs/src/core/Allocator/BlittableAllocator.cs @@ -31,15 +31,13 @@ public unsafe sealed class BlittableAllocator : AllocatorBase comparer, Action evictCallback = null) - : base(settings, comparer, evictCallback) + public BlittableAllocator(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null) + : base(settings, comparer, evictCallback, epoch) { values = new byte[BufferSize][]; handles = new GCHandle[BufferSize]; pointers = new long[BufferSize]; - epoch = LightEpoch.Instance; - ptrHandle = GCHandle.Alloc(pointers, GCHandleType.Pinned); nativePointers = (long*)ptrHandle.AddrOfPinnedObject(); } diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 142c66902..118146c29 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -41,8 +41,8 @@ public unsafe sealed class GenericAllocator : AllocatorBase)); private readonly SerializerSettings SerializerSettings; - public GenericAllocator(LogSettings settings, SerializerSettings serializerSettings, IFasterEqualityComparer comparer, Action evictCallback = null) - : base(settings, comparer, evictCallback) + public GenericAllocator(LogSettings settings, SerializerSettings serializerSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null) + : base(settings, comparer, evictCallback, epoch) { SerializerSettings = serializerSettings; @@ -67,7 +67,6 @@ public GenericAllocator(LogSettings settings, SerializerSettings ser throw new Exception("Objects in key/value, but object log not provided during creation of FASTER instance"); } - epoch = LightEpoch.Instance; ioBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize); } diff --git a/cs/src/core/Allocator/MallocFixedPageSize.cs b/cs/src/core/Allocator/MallocFixedPageSize.cs index d3462e0f7..bbc452bd8 100644 --- a/cs/src/core/Allocator/MallocFixedPageSize.cs +++ b/cs/src/core/Allocator/MallocFixedPageSize.cs @@ -47,6 +47,8 @@ public unsafe class MallocFixedPageSize : IDisposable private CountdownEvent checkpointEvent; + private readonly LightEpoch epoch; + [ThreadStatic] private static Queue freeList; @@ -54,8 +56,11 @@ public unsafe class MallocFixedPageSize : IDisposable /// Create new instance /// /// - public MallocFixedPageSize(bool returnPhysicalAddress = false) + /// + public MallocFixedPageSize(bool returnPhysicalAddress = false, LightEpoch epoch = null) { + this.epoch = epoch ?? new LightEpoch(); + values[0] = new T[PageSize]; #if !(CALLOC) @@ -298,7 +303,7 @@ public long Allocate() } if (freeList.Count > 0) { - if (freeList.Peek().removal_epoch <= LightEpoch.Instance.SafeToReclaimEpoch) + if (freeList.Peek().removal_epoch <= epoch.SafeToReclaimEpoch) return freeList.Dequeue().removed_item; //if (freeList.Count % 64 == 0) diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 68b736e00..10037eafd 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -15,11 +15,6 @@ namespace FASTER.core /// public unsafe class LightEpoch { - /// - /// - /// - public static LightEpoch Instance = new LightEpoch(); - /// /// Default invalid index entry. /// @@ -57,8 +52,7 @@ public unsafe class LightEpoch /// /// A thread's entry in the epoch table. /// - [ThreadStatic] - public static int threadEntryIndex; + private ThreadLocal threadEntryIndex; /// /// Global current epoch value @@ -93,6 +87,7 @@ public LightEpoch(int size = kTableSize) /// unsafe void Initialize(int size) { + threadEntryIndex = new ThreadLocal(); numEntries = size; // Over-allocate to do cache-line alignment @@ -132,7 +127,7 @@ void Uninitialize() /// Result of the check public bool IsProtected() { - return (kInvalidIndex != threadEntryIndex); + return kInvalidIndex != threadEntryIndex.Value; } @@ -143,11 +138,11 @@ public bool IsProtected() [MethodImpl(MethodImplOptions.AggressiveInlining)] public int ProtectAndDrain() { - int entry = threadEntryIndex; + int entry = threadEntryIndex.Value; if (kInvalidIndex == entry) { entry = ReserveEntryForThread(); - threadEntryIndex = entry; + threadEntryIndex.Value = entry; } (*(tableAligned + entry)).localCurrentEpoch = CurrentEpoch; @@ -192,13 +187,13 @@ private void Drain(int nextEpoch) /// public void Release() { - int entry = threadEntryIndex; + int entry = threadEntryIndex.Value; if (kInvalidIndex == entry) { return; } - threadEntryIndex = kInvalidIndex; + threadEntryIndex.Value = kInvalidIndex; (*(tableAligned + entry)).localCurrentEpoch = 0; (*(tableAligned + entry)).threadId = 0; } @@ -391,7 +386,7 @@ private struct EpochActionPair [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool MarkAndCheckIsComplete(int markerIdx, int version) { - int entry = threadEntryIndex; + int entry = threadEntryIndex.Value; if (kInvalidIndex == entry) { Debug.WriteLine("New Thread entered during CPR"); diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index c0cdd04d3..ad22e22a2 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -117,7 +117,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo if (Utility.IsBlittable() && Utility.IsBlittable()) { - hlog = new BlittableAllocator(logSettings, this.comparer); + hlog = new BlittableAllocator(logSettings, this.comparer, null, epoch); Log = new LogAccessor(this, hlog); if (UseReadCache) { @@ -127,14 +127,14 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits, MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction - }, this.comparer, ReadCacheEvict); + }, this.comparer, ReadCacheEvict, epoch); readcache.Initialize(); ReadCache = new LogAccessor(this, readcache); } } else { - hlog = new GenericAllocator(logSettings, serializerSettings, this.comparer); + hlog = new GenericAllocator(logSettings, serializerSettings, this.comparer, null, epoch); Log = new LogAccessor(this, hlog); if (UseReadCache) { @@ -145,7 +145,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits, MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction - }, serializerSettings, this.comparer, ReadCacheEvict); + }, serializerSettings, this.comparer, ReadCacheEvict, epoch); readcache.Initialize(); ReadCache = new LogAccessor(this, readcache); } diff --git a/cs/src/core/Index/FASTER/FASTERBase.cs b/cs/src/core/Index/FASTER/FASTERBase.cs index c1b612c7f..67cebc4d8 100644 --- a/cs/src/core/Index/FASTER/FASTERBase.cs +++ b/cs/src/core/Index/FASTER/FASTERBase.cs @@ -251,7 +251,7 @@ public unsafe partial class FasterBase internal long minTableSize = 16; // Allocator for the hash buckets - internal MallocFixedPageSize overflowBucketsAllocator = new MallocFixedPageSize(); + internal readonly MallocFixedPageSize overflowBucketsAllocator; // An array of size two, that contains the old and new versions of the hash-table internal InternalHashTable[] state = new InternalHashTable[2]; @@ -274,7 +274,8 @@ public unsafe partial class FasterBase /// public FasterBase() { - epoch = LightEpoch.Instance; + epoch = new LightEpoch(); + overflowBucketsAllocator = new MallocFixedPageSize(false, epoch); } internal Status Free() From 0a82beed0ceeec9b1373b35d9f7d02e00b3a5101 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Tue, 2 Apr 2019 19:28:11 -0700 Subject: [PATCH 02/15] Created thread + instance based thread local variable mechanism --- cs/src/core/Allocator/AllocatorBase.cs | 12 ++++- cs/src/core/Epochs/FastThreadLocal.cs | 69 ++++++++++++++++++++++++ cs/src/core/Epochs/LightEpoch.cs | 40 +++++++------- cs/src/core/Index/FASTER/FASTERBase.cs | 1 + cs/src/core/Index/FASTER/FASTERThread.cs | 2 + cs/src/core/Index/FASTER/LogAccessor.cs | 10 +++- 6 files changed, 113 insertions(+), 21 deletions(-) create mode 100644 cs/src/core/Epochs/FastThreadLocal.cs diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 8457ebc71..2529c9343 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -61,6 +61,7 @@ public unsafe abstract class AllocatorBase : IDisposable /// Epoch information /// protected readonly LightEpoch epoch; + private readonly bool toDisposeEpoch; /// /// Comparer @@ -435,7 +436,13 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, LightEpoch epoch) { this.comparer = comparer; - this.epoch = epoch ?? new LightEpoch(); + if (epoch == null) + { + epoch = new LightEpoch(); + toDisposeEpoch = true; + } + else + this.epoch = epoch; settings.LogDevice.Initialize(1L << settings.SegmentSizeBits); settings.ObjectLogDevice?.Initialize(1L << settings.SegmentSizeBits); @@ -523,6 +530,9 @@ public virtual void Dispose() SafeHeadAddress = 0; HeadAddress = 0; BeginAddress = 1; + + if (toDisposeEpoch) + epoch.Dispose(); } /// diff --git a/cs/src/core/Epochs/FastThreadLocal.cs b/cs/src/core/Epochs/FastThreadLocal.cs new file mode 100644 index 000000000..975dbb5b6 --- /dev/null +++ b/cs/src/core/Epochs/FastThreadLocal.cs @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Threading; + +namespace FASTER.core +{ + /// + /// Fast implementation of instance-thread-local variables + /// + /// + internal class FastThreadLocal + { + // Max instances supported + private const int kMaxInstances = 128; + + [ThreadStatic] + private static T[] values; + + private readonly int id; + private static int[] instances = new int[kMaxInstances]; + + public FastThreadLocal() + { + for (int i = 0; i < kMaxInstances; i++) + { + if (0 == Interlocked.CompareExchange(ref instances[i], 1, 0)) + { + id = i; + return; + } + } + throw new Exception("Unsupported number of simultaneous instances"); + } + + public void InitializeThread() + { + if (values == null) + values = new T[kMaxInstances]; + } + + public void DisposeThread() + { + // Dispose values only if there are no other + // instances active for this thread + for (int i = 0; i < kMaxInstances; i++) + { + if ((instances[i] == 1) && (i != id)) + return; + } + values = null; + } + + /// + /// Dispose instance for all threads + /// + public void Dispose() + { + instances[id] = 0; + } + + public T Value + { + get { return values[id]; } + set { values[id] = value; } + } + } +} diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 10037eafd..7e0e60e7e 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -52,7 +52,7 @@ public unsafe class LightEpoch /// /// A thread's entry in the epoch table. /// - private ThreadLocal threadEntryIndex; + private FastThreadLocal threadEntryIndex; /// /// Global current epoch value @@ -73,21 +73,13 @@ public LightEpoch(int size = kTableSize) Initialize(size); } - /// - /// - /// - ~LightEpoch() - { - Uninitialize(); - } - /// /// Initialize the epoch table /// /// unsafe void Initialize(int size) { - threadEntryIndex = new ThreadLocal(); + threadEntryIndex = new FastThreadLocal(); numEntries = size; // Over-allocate to do cache-line alignment @@ -110,7 +102,7 @@ unsafe void Initialize(int size) /// /// Clean up epoch table /// - void Uninitialize() + public void Dispose() { tableHandle.Free(); tableAligned = null; @@ -119,6 +111,8 @@ void Uninitialize() numEntries = 0; CurrentEpoch = 1; SafeToReclaimEpoch = 0; + + threadEntryIndex.Dispose(); } /// @@ -130,7 +124,6 @@ public bool IsProtected() return kInvalidIndex != threadEntryIndex.Value; } - /// /// Enter the thread into the protected code region /// @@ -139,11 +132,6 @@ public bool IsProtected() public int ProtectAndDrain() { int entry = threadEntryIndex.Value; - if (kInvalidIndex == entry) - { - entry = ReserveEntryForThread(); - threadEntryIndex.Value = entry; - } (*(tableAligned + entry)).localCurrentEpoch = CurrentEpoch; @@ -182,6 +170,16 @@ private void Drain(int nextEpoch) } } + /// + /// Thread acquires its epoch entry + /// + public void Acquire() + { + threadEntryIndex.InitializeThread(); + threadEntryIndex.Value = ReserveEntryForThread(); + } + + /// /// Thread releases its epoch entry /// @@ -194,6 +192,7 @@ public void Release() } threadEntryIndex.Value = kInvalidIndex; + threadEntryIndex.DisposeThread(); (*(tableAligned + entry)).localCurrentEpoch = 0; (*(tableAligned + entry)).threadId = 0; } @@ -280,9 +279,12 @@ private int ComputeNewSafeToReclaimEpoch(int currentEpoch) for (int index = 1; index <= numEntries; ++index) { int entry_epoch = (*(tableAligned + index)).localCurrentEpoch; - if (0 != entry_epoch && entry_epoch < oldestOngoingCall) + if (0 != entry_epoch) { - oldestOngoingCall = entry_epoch; + if (entry_epoch < oldestOngoingCall) + { + oldestOngoingCall = entry_epoch; + } } } diff --git a/cs/src/core/Index/FASTER/FASTERBase.cs b/cs/src/core/Index/FASTER/FASTERBase.cs index 67cebc4d8..4fd6cb275 100644 --- a/cs/src/core/Index/FASTER/FASTERBase.cs +++ b/cs/src/core/Index/FASTER/FASTERBase.cs @@ -283,6 +283,7 @@ internal Status Free() Free(0); Free(1); overflowBucketsAllocator.Dispose(); + epoch.Dispose(); return Status.OK; } diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 17e314403..352670adf 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -18,6 +18,7 @@ public unsafe partial class FasterKV Date: Wed, 3 Apr 2019 09:51:04 -0700 Subject: [PATCH 03/15] Updates --- cs/src/core/Allocator/AllocatorBase.cs | 2 +- cs/src/core/Allocator/MallocFixedPageSize.cs | 23 ++-- cs/src/core/Epochs/FastThreadLocal.cs | 2 + cs/src/core/Index/FASTER/Checkpoint.cs | 62 +++++----- cs/src/core/Index/FASTER/FASTER.cs | 24 ++-- cs/src/core/Index/FASTER/FASTERImpl.cs | 124 +++++++++---------- cs/src/core/Index/FASTER/FASTERThread.cs | 71 ++++++----- 7 files changed, 160 insertions(+), 148 deletions(-) diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 2529c9343..002f707b7 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -202,7 +202,7 @@ public unsafe abstract class AllocatorBase : IDisposable /// /// Number of pending reads /// - private static int numPendingReads = 0; + private int numPendingReads = 0; #endregion /// diff --git a/cs/src/core/Allocator/MallocFixedPageSize.cs b/cs/src/core/Allocator/MallocFixedPageSize.cs index bbc452bd8..56dd3897f 100644 --- a/cs/src/core/Allocator/MallocFixedPageSize.cs +++ b/cs/src/core/Allocator/MallocFixedPageSize.cs @@ -49,8 +49,7 @@ public unsafe class MallocFixedPageSize : IDisposable private readonly LightEpoch epoch; - [ThreadStatic] - private static Queue freeList; + private FastThreadLocal> freeList; /// /// Create new instance @@ -59,6 +58,7 @@ public unsafe class MallocFixedPageSize : IDisposable /// public MallocFixedPageSize(bool returnPhysicalAddress = false, LightEpoch epoch = null) { + freeList = new FastThreadLocal>(); this.epoch = epoch ?? new LightEpoch(); values[0] = new T[PageSize]; @@ -178,8 +178,12 @@ public void FreeAtEpoch(long pointer, int removed_epoch = -1) { values[pointer >> PageSizeBits][pointer & PageSizeMask] = default(T); } - if (freeList == null) freeList = new Queue(); - freeList.Enqueue(new FreeItem { removed_item = pointer, removal_epoch = removed_epoch }); + + freeList.InitializeThread(); + + if (freeList.Value == null) + freeList.Value = new Queue(); + freeList.Value.Enqueue(new FreeItem { removed_item = pointer, removal_epoch = removed_epoch }); } private const int kAllocateChunkSize = 16; @@ -297,14 +301,15 @@ public long BulkAllocate() /// public long Allocate() { - if (freeList == null) + freeList.InitializeThread(); + if (freeList.Value == null) { - freeList = new Queue(); + freeList.Value = new Queue(); } - if (freeList.Count > 0) + if (freeList.Value.Count > 0) { - if (freeList.Peek().removal_epoch <= epoch.SafeToReclaimEpoch) - return freeList.Dequeue().removed_item; + if (freeList.Value.Peek().removal_epoch <= epoch.SafeToReclaimEpoch) + return freeList.Value.Dequeue().removed_item; //if (freeList.Count % 64 == 0) // LightEpoch.Instance.BumpCurrentEpoch(); diff --git a/cs/src/core/Epochs/FastThreadLocal.cs b/cs/src/core/Epochs/FastThreadLocal.cs index 975dbb5b6..65c0e18b2 100644 --- a/cs/src/core/Epochs/FastThreadLocal.cs +++ b/cs/src/core/Epochs/FastThreadLocal.cs @@ -42,6 +42,8 @@ public void InitializeThread() public void DisposeThread() { + Value = default(T); + // Dispose values only if there are no other // instances active for this thread for (int i = 0; i < kMaxInstances; i++) diff --git a/cs/src/core/Index/FASTER/Checkpoint.cs b/cs/src/core/Index/FASTER/Checkpoint.cs index 56c8f2ddf..8d92c147b 100644 --- a/cs/src/core/Index/FASTER/Checkpoint.cs +++ b/cs/src/core/Index/FASTER/Checkpoint.cs @@ -386,7 +386,7 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta [MethodImpl(MethodImplOptions.AggressiveInlining)] private void HandleCheckpointingPhases() { - var previousState = SystemState.Make(threadCtx.phase, threadCtx.version); + var previousState = SystemState.Make(threadCtx.Value.phase, threadCtx.Value.version); var finalState = SystemState.Copy(ref _systemState); // Don't play around when system state is being changed @@ -412,13 +412,13 @@ private void HandleCheckpointingPhases() { case Phase.PREP_INDEX_CHECKPOINT: { - if (!threadCtx.markers[EpochPhaseIdx.PrepareForIndexCheckpt]) + if (!threadCtx.Value.markers[EpochPhaseIdx.PrepareForIndexCheckpt]) { - if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.PrepareForIndexCheckpt, threadCtx.version)) + if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.PrepareForIndexCheckpt, threadCtx.Value.version)) { GlobalMoveToNextCheckpointState(currentState); } - threadCtx.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = true; + threadCtx.Value.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = true; } break; } @@ -427,7 +427,7 @@ private void HandleCheckpointingPhases() if (_checkpointType == CheckpointType.INDEX_ONLY) { // Reseting the marker for a potential FULL or INDEX_ONLY checkpoint in the future - threadCtx.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = false; + threadCtx.Value.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = false; } if (IsIndexFuzzyCheckpointCompleted()) @@ -438,7 +438,7 @@ private void HandleCheckpointingPhases() } case Phase.PREPARE: { - if (!threadCtx.markers[EpochPhaseIdx.Prepare]) + if (!threadCtx.Value.markers[EpochPhaseIdx.Prepare]) { // Thread local action AcquireSharedLatchesForAllPendingRequests(); @@ -446,14 +446,14 @@ private void HandleCheckpointingPhases() var idx = Interlocked.Increment(ref _hybridLogCheckpoint.info.numThreads); idx -= 1; - _hybridLogCheckpoint.info.guids[idx] = threadCtx.guid; + _hybridLogCheckpoint.info.guids[idx] = threadCtx.Value.guid; - if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.Prepare, threadCtx.version)) + if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.Prepare, threadCtx.Value.version)) { GlobalMoveToNextCheckpointState(currentState); } - threadCtx.markers[EpochPhaseIdx.Prepare] = true; + threadCtx.Value.markers[EpochPhaseIdx.Prepare] = true; } break; } @@ -463,41 +463,41 @@ private void HandleCheckpointingPhases() FasterExecutionContext ctx; if (previousState.phase == Phase.PREPARE) { - ctx = threadCtx; + ctx = threadCtx.Value; } else { - ctx = prevThreadCtx; + ctx = prevThreadCtx.Value; } if (!ctx.markers[EpochPhaseIdx.InProgress]) { prevThreadCtx = threadCtx; - InitLocalContext(ref threadCtx, prevThreadCtx.guid); + InitLocalContext(prevThreadCtx.Value.guid); if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.InProgress, ctx.version)) { GlobalMoveToNextCheckpointState(currentState); } - prevThreadCtx.markers[EpochPhaseIdx.InProgress] = true; + prevThreadCtx.Value.markers[EpochPhaseIdx.InProgress] = true; } break; } case Phase.WAIT_PENDING: { - if (!prevThreadCtx.markers[EpochPhaseIdx.WaitPending]) + if (!prevThreadCtx.Value.markers[EpochPhaseIdx.WaitPending]) { - var notify = (prevThreadCtx.ioPendingRequests.Count == 0); - notify = notify && (prevThreadCtx.retryRequests.Count == 0); + var notify = (prevThreadCtx.Value.ioPendingRequests.Count == 0); + notify = notify && (prevThreadCtx.Value.retryRequests.Count == 0); if (notify) { - if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitPending, threadCtx.version)) + if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitPending, threadCtx.Value.version)) { GlobalMoveToNextCheckpointState(currentState); } - prevThreadCtx.markers[EpochPhaseIdx.WaitPending] = true; + prevThreadCtx.Value.markers[EpochPhaseIdx.WaitPending] = true; } } @@ -505,7 +505,7 @@ private void HandleCheckpointingPhases() } case Phase.WAIT_FLUSH: { - if (!prevThreadCtx.markers[EpochPhaseIdx.WaitFlush]) + if (!prevThreadCtx.Value.markers[EpochPhaseIdx.WaitFlush]) { var notify = false; if (FoldOverSnapshot) @@ -526,12 +526,12 @@ private void HandleCheckpointingPhases() { WriteHybridLogContextInfo(); - if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitFlush, prevThreadCtx.version)) + if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitFlush, prevThreadCtx.Value.version)) { GlobalMoveToNextCheckpointState(currentState); } - prevThreadCtx.markers[EpochPhaseIdx.WaitFlush] = true; + prevThreadCtx.Value.markers[EpochPhaseIdx.WaitFlush] = true; } } break; @@ -539,17 +539,17 @@ private void HandleCheckpointingPhases() case Phase.PERSISTENCE_CALLBACK: { - if (!prevThreadCtx.markers[EpochPhaseIdx.CheckpointCompletionCallback]) + if (!prevThreadCtx.Value.markers[EpochPhaseIdx.CheckpointCompletionCallback]) { // Thread local action - functions.CheckpointCompletionCallback(threadCtx.guid, prevThreadCtx.serialNum); + functions.CheckpointCompletionCallback(threadCtx.Value.guid, prevThreadCtx.Value.serialNum); - if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.CheckpointCompletionCallback, prevThreadCtx.version)) + if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.CheckpointCompletionCallback, prevThreadCtx.Value.version)) { GlobalMoveToNextCheckpointState(currentState); } - prevThreadCtx.markers[EpochPhaseIdx.CheckpointCompletionCallback] = true; + prevThreadCtx.Value.markers[EpochPhaseIdx.CheckpointCompletionCallback] = true; } break; } @@ -563,8 +563,8 @@ private void HandleCheckpointingPhases() } // update thread local variables - threadCtx.phase = currentState.phase; - threadCtx.version = currentState.version; + threadCtx.Value.phase = currentState.phase; + threadCtx.Value.version = currentState.version; previousState.word = currentState.word; } while (previousState.word != finalState.word); @@ -596,11 +596,11 @@ private bool MakeTransition(SystemState currentState, SystemState nextState) private void AcquireSharedLatchesForAllPendingRequests() { - foreach (var ctx in threadCtx.retryRequests) + foreach (var ctx in threadCtx.Value.retryRequests) { AcquireSharedLatch(ctx.key); } - foreach (var ctx in threadCtx.ioPendingRequests.Values) + foreach (var ctx in threadCtx.Value.ioPendingRequests.Values) { AcquireSharedLatch(ctx.key); } @@ -720,10 +720,10 @@ private void WriteHybridLogCheckpointCompleteFile() private void WriteHybridLogContextInfo() { - string filename = directoryConfiguration.GetHybridLogCheckpointContextFileName(_hybridLogCheckpointToken, prevThreadCtx.guid); + string filename = directoryConfiguration.GetHybridLogCheckpointContextFileName(_hybridLogCheckpointToken, prevThreadCtx.Value.guid); using (var file = new StreamWriter(filename, false)) { - prevThreadCtx.Write(file); + prevThreadCtx.Value.Write(file); file.Flush(); } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index ad22e22a2..9fbf8d7f0 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -67,11 +67,8 @@ private enum CheckpointType private SafeConcurrentDictionary _recoveredSessions; - [ThreadStatic] - private static FasterExecutionContext prevThreadCtx = default(FasterExecutionContext); - - [ThreadStatic] - private static FasterExecutionContext threadCtx = default(FasterExecutionContext); + private FastThreadLocal prevThreadCtx; + private FastThreadLocal threadCtx; /// @@ -85,6 +82,9 @@ private enum CheckpointType /// Serializer settings public FasterKV(long size, Functions functions, LogSettings logSettings, CheckpointSettings checkpointSettings = null, SerializerSettings serializerSettings = null, IFasterEqualityComparer comparer = null) { + threadCtx = new FastThreadLocal(); + prevThreadCtx = new FastThreadLocal(); + if (comparer != null) this.comparer = comparer; else @@ -353,9 +353,9 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user } else { - status = HandleOperationStatus(threadCtx, context, internalStatus); + status = HandleOperationStatus(threadCtx.Value, context, internalStatus); } - threadCtx.serialNum = monotonicSerialNum; + threadCtx.Value.serialNum = monotonicSerialNum; return status; } @@ -380,9 +380,9 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, l } else { - status = HandleOperationStatus(threadCtx, context, internalStatus); + status = HandleOperationStatus(threadCtx.Value, context, internalStatus); } - threadCtx.serialNum = monotonicSerialNum; + threadCtx.Value.serialNum = monotonicSerialNum; return status; } @@ -406,9 +406,9 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long monoto } else { - status = HandleOperationStatus(threadCtx, context, internalStatus); + status = HandleOperationStatus(threadCtx.Value, context, internalStatus); } - threadCtx.serialNum = monotonicSerialNum; + threadCtx.Value.serialNum = monotonicSerialNum; return status; } @@ -432,7 +432,7 @@ public Status Delete(ref Key key, Context userContext, long monotonicSerialNum) { status = (Status)internalStatus; } - threadCtx.serialNum = monotonicSerialNum; + threadCtx.Value.serialNum = monotonicSerialNum; return status; } diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 715e96968..7525c114a 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -76,7 +76,7 @@ internal OperationStatus InternalRead( var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); #region Trace back for record in in-memory HybridLog @@ -88,7 +88,7 @@ internal OperationStatus InternalRead( if (UseReadCache && ReadFromCache(ref key, ref logicalAddress, ref physicalAddress, ref latestRecordVersion)) { - if (threadCtx.phase == Phase.PREPARE && latestRecordVersion != -1 && latestRecordVersion > threadCtx.version) + if (threadCtx.Value.phase == Phase.PREPARE && latestRecordVersion != -1 && latestRecordVersion > threadCtx.Value.version) { status = OperationStatus.CPR_SHIFT_DETECTED; goto CreatePendingContext; // Pivot thread @@ -121,13 +121,13 @@ internal OperationStatus InternalRead( } #endregion - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) { - switch (threadCtx.phase) + switch (threadCtx.Value.phase) { case Phase.PREPARE: { - if (latestRecordVersion != -1 && latestRecordVersion > threadCtx.version) + if (latestRecordVersion != -1 && latestRecordVersion > threadCtx.Value.version) { status = OperationStatus.CPR_SHIFT_DETECTED; goto CreatePendingContext; // Pivot thread @@ -173,7 +173,7 @@ internal OperationStatus InternalRead( { status = OperationStatus.RECORD_ON_DISK; - if (threadCtx.phase == Phase.PREPARE) + if (threadCtx.Value.phase == Phase.PREPARE) { if (!HashBucket.TryAcquireSharedLatch(bucket)) { @@ -203,8 +203,8 @@ internal OperationStatus InternalRead( pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; - pendingContext.version = threadCtx.version; - pendingContext.serialNum = threadCtx.serialNum + 1; + pendingContext.version = threadCtx.Value.version; + pendingContext.serialNum = threadCtx.Value.serialNum + 1; } #endregion @@ -414,7 +414,7 @@ internal OperationStatus InternalUpsert( var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); #region Trace back for record in in-memory HybridLog @@ -444,20 +444,20 @@ internal OperationStatus InternalUpsert( #endregion // Optimization for most common case - if (threadCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone) + if (threadCtx.Value.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone) { functions.ConcurrentWriter(ref key, ref value, ref hlog.GetValue(physicalAddress)); return OperationStatus.SUCCESS; } #region Entry latch operation - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) { - switch (threadCtx.phase) + switch (threadCtx.Value.phase) { case Phase.PREPARE: { - version = threadCtx.version; + version = threadCtx.Value.version; if (HashBucket.TryAcquireSharedLatch(bucket)) { // Set to release shared latch (default) @@ -477,7 +477,7 @@ internal OperationStatus InternalUpsert( } case Phase.IN_PROGRESS: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { if (HashBucket.TryAcquireExclusiveLatch(bucket)) @@ -496,7 +496,7 @@ internal OperationStatus InternalUpsert( } case Phase.WAIT_PENDING: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { if (HashBucket.NoSharedLatches(bucket)) @@ -513,7 +513,7 @@ internal OperationStatus InternalUpsert( } case Phase.WAIT_FLUSH: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { goto CreateNewRecord; // Create a (v+1) record @@ -526,7 +526,7 @@ internal OperationStatus InternalUpsert( } #endregion - Debug.Assert(latestRecordVersion <= threadCtx.version); + Debug.Assert(latestRecordVersion <= threadCtx.Value.version); #region Normal processing @@ -549,7 +549,7 @@ internal OperationStatus InternalUpsert( BlockAllocate(recordSize, out long newLogicalAddress); var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), - threadCtx.version, + threadCtx.Value.version, true, false, false, latestLogicalAddress); hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress)); @@ -590,8 +590,8 @@ internal OperationStatus InternalUpsert( pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; - pendingContext.version = threadCtx.version; - pendingContext.serialNum = threadCtx.serialNum + 1; + pendingContext.version = threadCtx.Value.version; + pendingContext.serialNum = threadCtx.Value.serialNum + 1; } #endregion @@ -678,7 +678,7 @@ internal OperationStatus InternalRMW( var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); #region Trace back for record in in-memory HybridLog @@ -708,20 +708,20 @@ internal OperationStatus InternalRMW( #endregion // Optimization for the most common case - if (threadCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone) + if (threadCtx.Value.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone) { functions.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress)); return OperationStatus.SUCCESS; } #region Entry latch operation - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) { - switch (threadCtx.phase) + switch (threadCtx.Value.phase) { case Phase.PREPARE: { - version = threadCtx.version; + version = threadCtx.Value.version; if (HashBucket.TryAcquireSharedLatch(bucket)) { // Set to release shared latch (default) @@ -741,7 +741,7 @@ internal OperationStatus InternalRMW( } case Phase.IN_PROGRESS: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion <= version) { if (HashBucket.TryAcquireExclusiveLatch(bucket)) @@ -760,7 +760,7 @@ internal OperationStatus InternalRMW( } case Phase.WAIT_PENDING: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { if (HashBucket.NoSharedLatches(bucket)) @@ -777,7 +777,7 @@ internal OperationStatus InternalRMW( } case Phase.WAIT_FLUSH: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { goto CreateNewRecord; // Create a (v+1) record @@ -790,7 +790,7 @@ internal OperationStatus InternalRMW( } #endregion - Debug.Assert(latestRecordVersion <= threadCtx.version); + Debug.Assert(latestRecordVersion <= threadCtx.Value.version); #region Normal processing @@ -799,7 +799,7 @@ internal OperationStatus InternalRMW( { if (FoldOverSnapshot) { - Debug.Assert(hlog.GetInfo(physicalAddress).Version == threadCtx.version); + Debug.Assert(hlog.GetInfo(physicalAddress).Version == threadCtx.Value.version); } functions.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress)); status = OperationStatus.SUCCESS; @@ -853,7 +853,7 @@ internal OperationStatus InternalRMW( BlockAllocate(recordSize, out long newLogicalAddress); var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); ref RecordInfo recordInfo = ref hlog.GetInfo(newPhysicalAddress); - RecordInfo.WriteInfo(ref recordInfo, threadCtx.version, + RecordInfo.WriteInfo(ref recordInfo, threadCtx.Value.version, true, false, false, latestLogicalAddress); hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress)); @@ -919,8 +919,8 @@ ref hlog.GetValue(physicalAddress), pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; - pendingContext.version = threadCtx.version; - pendingContext.serialNum = threadCtx.serialNum + 1; + pendingContext.version = threadCtx.Value.version; + pendingContext.serialNum = threadCtx.Value.serialNum + 1; } #endregion @@ -994,7 +994,7 @@ internal OperationStatus InternalRetryPendingRMW( var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); #region Trace back for record in in-memory HybridLog @@ -1024,15 +1024,15 @@ internal OperationStatus InternalRetryPendingRMW( #endregion #region Entry latch operation - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) { - if (!((ctx.version < threadCtx.version) + if (!((ctx.version < threadCtx.Value.version) || - (threadCtx.phase == Phase.PREPARE))) + (threadCtx.Value.phase == Phase.PREPARE))) { // Processing a pending (v+1) request - version = (threadCtx.version - 1); - switch (threadCtx.phase) + version = (threadCtx.Value.version - 1); + switch (threadCtx.Value.phase) { case Phase.IN_PROGRESS: { @@ -1090,7 +1090,7 @@ internal OperationStatus InternalRetryPendingRMW( { if (FoldOverSnapshot) { - Debug.Assert(hlog.GetInfo(physicalAddress).Version == threadCtx.version); + Debug.Assert(hlog.GetInfo(physicalAddress).Version == threadCtx.Value.version); } functions.InPlaceUpdater(ref pendingContext.key, ref pendingContext.input, ref hlog.GetValue(physicalAddress)); status = OperationStatus.SUCCESS; @@ -1401,7 +1401,7 @@ internal OperationStatus InternalDelete( var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); #region Trace back for record in in-memory HybridLog @@ -1434,20 +1434,20 @@ internal OperationStatus InternalDelete( #endregion // NO optimization for most common case - //if (threadCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress) + //if (threadCtx.Value.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress) //{ // hlog.GetInfo(physicalAddress).Tombstone = true; // return OperationStatus.SUCCESS; //} #region Entry latch operation - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) { - switch (threadCtx.phase) + switch (threadCtx.Value.phase) { case Phase.PREPARE: { - version = threadCtx.version; + version = threadCtx.Value.version; if (HashBucket.TryAcquireSharedLatch(bucket)) { // Set to release shared latch (default) @@ -1467,7 +1467,7 @@ internal OperationStatus InternalDelete( } case Phase.IN_PROGRESS: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { if (HashBucket.TryAcquireExclusiveLatch(bucket)) @@ -1486,7 +1486,7 @@ internal OperationStatus InternalDelete( } case Phase.WAIT_PENDING: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { if (HashBucket.NoSharedLatches(bucket)) @@ -1503,7 +1503,7 @@ internal OperationStatus InternalDelete( } case Phase.WAIT_FLUSH: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { goto CreateNewRecord; // Create a (v+1) record @@ -1516,7 +1516,7 @@ internal OperationStatus InternalDelete( } #endregion - Debug.Assert(latestRecordVersion <= threadCtx.version); + Debug.Assert(latestRecordVersion <= threadCtx.Value.version); #region Normal processing @@ -1573,7 +1573,7 @@ internal OperationStatus InternalDelete( BlockAllocate(recordSize, out long newLogicalAddress); var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), - threadCtx.version, + threadCtx.Value.version, true, true, false, latestLogicalAddress); hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress)); @@ -1612,8 +1612,8 @@ internal OperationStatus InternalDelete( pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; - pendingContext.version = threadCtx.version; - pendingContext.serialNum = threadCtx.serialNum + 1; + pendingContext.version = threadCtx.Value.version; + pendingContext.serialNum = threadCtx.Value.serialNum + 1; } #endregion @@ -1673,7 +1673,7 @@ public Status ContainsKeyInMemory(ref Key key, long fromAddress = -1) var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); HashBucketEntry entry = default(HashBucketEntry); @@ -1751,13 +1751,13 @@ internal Status HandleOperationStatus( { #region Epoch Synchronization var version = ctx.version; - Debug.Assert(threadCtx.version == version); - Debug.Assert(threadCtx.phase == Phase.PREPARE); + Debug.Assert(threadCtx.Value.version == version); + Debug.Assert(threadCtx.Value.phase == Phase.PREPARE); Refresh(); - Debug.Assert(threadCtx.version == version + 1); - Debug.Assert(threadCtx.phase == Phase.IN_PROGRESS); + Debug.Assert(threadCtx.Value.version == version + 1); + Debug.Assert(threadCtx.Value.phase == Phase.IN_PROGRESS); - pendingContext.version = threadCtx.version; + pendingContext.version = threadCtx.Value.version; #endregion #region Retry as (v+1) Operation @@ -1783,7 +1783,7 @@ internal Status HandleOperationStatus( ref pendingContext); break; case OperationType.RMW: - internalStatus = InternalRetryPendingRMW(threadCtx, ref pendingContext); + internalStatus = InternalRetryPendingRMW(threadCtx.Value, ref pendingContext); break; } @@ -1850,9 +1850,9 @@ private void ReleaseSharedLatch(Key key) private void HeavyEnter(long hash) { - if (threadCtx.phase == Phase.GC) + if (threadCtx.Value.phase == Phase.GC) GarbageCollectBuckets(hash); - if (threadCtx.phase == Phase.PREPARE_GROW) + if (threadCtx.Value.phase == Phase.PREPARE_GROW) { // We spin-wait as a simplification // Could instead do a "heavy operation" here @@ -1860,7 +1860,7 @@ private void HeavyEnter(long hash) Thread.SpinWait(100); Refresh(); } - if (threadCtx.phase == Phase.IN_PROGRESS_GROW) + if (threadCtx.Value.phase == Phase.IN_PROGRESS_GROW) { SplitBuckets(hash); } diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 352670adf..1cb6d4056 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -19,20 +19,24 @@ public unsafe partial class FasterKV request) { var handleLatches = false; - if ((ctx.version < threadCtx.version) // Thread has already shifted to (v+1) + if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1) || - (threadCtx.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch + (threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch { handleLatches = true; } From a3a9322efb4f7575195bad57fe00eb6d3f6b539b Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 3 Apr 2019 10:20:26 -0700 Subject: [PATCH 04/15] Updates --- cs/src/core/Index/FASTER/Checkpoint.cs | 2 +- cs/src/core/Index/FASTER/FASTERThread.cs | 30 +++++++++++++----------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/cs/src/core/Index/FASTER/Checkpoint.cs b/cs/src/core/Index/FASTER/Checkpoint.cs index 8d92c147b..33a27cfc4 100644 --- a/cs/src/core/Index/FASTER/Checkpoint.cs +++ b/cs/src/core/Index/FASTER/Checkpoint.cs @@ -472,7 +472,7 @@ private void HandleCheckpointingPhases() if (!ctx.markers[EpochPhaseIdx.InProgress]) { - prevThreadCtx = threadCtx; + prevThreadCtx.Value = threadCtx.Value; InitLocalContext(prevThreadCtx.Value.guid); diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 1cb6d4056..28bdd9cb1 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -119,24 +119,26 @@ internal void InternalRelease() internal void InitLocalContext(Guid token) { - threadCtx.Value = - new FasterExecutionContext - { - phase = Phase.REST, - version = _systemState.version, - markers = new bool[8], - serialNum = 0, - totalPending = 0, - guid = token, - retryRequests = new Queue(), - readyResponses = new BlockingCollection>(), - ioPendingRequests = new Dictionary() - }; + var ctx = + new FasterExecutionContext + { + phase = Phase.REST, + version = _systemState.version, + markers = new bool[8], + serialNum = 0, + totalPending = 0, + guid = token, + retryRequests = new Queue(), + readyResponses = new BlockingCollection>(), + ioPendingRequests = new Dictionary() + }; for(int i = 0; i < 8; i++) { - threadCtx.Value.markers[i] = false; + ctx.markers[i] = false; } + + threadCtx.Value = ctx; } internal bool InternalCompletePending(bool wait = false) From a13a8d0131f0ec144f7d89b9371cc6855cffdde8 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 3 Apr 2019 10:24:39 -0700 Subject: [PATCH 05/15] Upgraded variable to readonly --- cs/src/core/Epochs/FastThreadLocal.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/src/core/Epochs/FastThreadLocal.cs b/cs/src/core/Epochs/FastThreadLocal.cs index 65c0e18b2..ce5827365 100644 --- a/cs/src/core/Epochs/FastThreadLocal.cs +++ b/cs/src/core/Epochs/FastThreadLocal.cs @@ -19,7 +19,7 @@ internal class FastThreadLocal private static T[] values; private readonly int id; - private static int[] instances = new int[kMaxInstances]; + private static readonly int[] instances = new int[kMaxInstances]; public FastThreadLocal() { From d1e859fa502377cc29499bd0ca79d565570fa7ff Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 3 Apr 2019 11:18:53 -0700 Subject: [PATCH 06/15] Correct clean up in testcases --- cs/src/core/Allocator/AllocatorBase.cs | 26 +++++++++++++--- cs/src/core/Allocator/MallocFixedPageSize.cs | 32 +++++++++++++++++++- cs/src/core/Index/FASTER/FASTER.cs | 2 ++ cs/src/core/Index/FASTER/FASTERBase.cs | 2 +- cs/src/core/Index/FASTER/FASTERThread.cs | 3 ++ cs/test/ComponentRecoveryTests.cs | 11 +++++++ cs/test/FullRecoveryTests.cs | 2 ++ cs/test/ObjectRecoveryTest.cs | 2 ++ 8 files changed, 74 insertions(+), 6 deletions(-) diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 002f707b7..20743207d 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -61,7 +61,7 @@ public unsafe abstract class AllocatorBase : IDisposable /// Epoch information /// protected readonly LightEpoch epoch; - private readonly bool toDisposeEpoch; + private readonly bool ownedEpoch; /// /// Comparer @@ -438,8 +438,8 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer this.comparer = comparer; if (epoch == null) { - epoch = new LightEpoch(); - toDisposeEpoch = true; + this.epoch = new LightEpoch(); + ownedEpoch = true; } else this.epoch = epoch; @@ -515,6 +515,24 @@ protected void Initialize(long firstValidAddress) TailPageIndex = 0; } + /// + /// Acquire thread + /// + public void Acquire() + { + if (ownedEpoch) + epoch.Acquire(); + } + + /// + /// Release thread + /// + public void Release() + { + if (ownedEpoch) + epoch.Release(); + } + /// /// Dispose allocator /// @@ -531,7 +549,7 @@ public virtual void Dispose() HeadAddress = 0; BeginAddress = 1; - if (toDisposeEpoch) + if (ownedEpoch) epoch.Dispose(); } diff --git a/cs/src/core/Allocator/MallocFixedPageSize.cs b/cs/src/core/Allocator/MallocFixedPageSize.cs index 56dd3897f..d08beb2e5 100644 --- a/cs/src/core/Allocator/MallocFixedPageSize.cs +++ b/cs/src/core/Allocator/MallocFixedPageSize.cs @@ -48,6 +48,7 @@ public unsafe class MallocFixedPageSize : IDisposable private CountdownEvent checkpointEvent; private readonly LightEpoch epoch; + private readonly bool ownedEpoch; private FastThreadLocal> freeList; @@ -59,7 +60,13 @@ public unsafe class MallocFixedPageSize : IDisposable public MallocFixedPageSize(bool returnPhysicalAddress = false, LightEpoch epoch = null) { freeList = new FastThreadLocal>(); - this.epoch = epoch ?? new LightEpoch(); + if (epoch == null) + { + this.epoch = new LightEpoch(); + ownedEpoch = true; + } + else + this.epoch = epoch; values[0] = new T[PageSize]; @@ -416,6 +423,26 @@ public long Allocate() return index; } + /// + /// Acquire thread + /// + public void Acquire() + { + if (ownedEpoch) + epoch.Acquire(); + freeList.InitializeThread(); + } + + /// + /// Release thread + /// + public void Release() + { + if (ownedEpoch) + epoch.Release(); + freeList.DisposeThread(); + } + /// /// Dispose /// @@ -431,6 +458,9 @@ public void Dispose() values = null; values0 = null; count = 0; + if (ownedEpoch) + epoch.Dispose(); + freeList.Dispose(); } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index 9fbf8d7f0..3220fce70 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -451,6 +451,8 @@ public bool GrowIndex() public void Dispose() { base.Free(); + threadCtx.Dispose(); + prevThreadCtx.Dispose(); hlog.Dispose(); } } diff --git a/cs/src/core/Index/FASTER/FASTERBase.cs b/cs/src/core/Index/FASTER/FASTERBase.cs index 4fd6cb275..9b2109cc5 100644 --- a/cs/src/core/Index/FASTER/FASTERBase.cs +++ b/cs/src/core/Index/FASTER/FASTERBase.cs @@ -282,8 +282,8 @@ internal Status Free() { Free(0); Free(1); - overflowBucketsAllocator.Dispose(); epoch.Dispose(); + overflowBucketsAllocator.Dispose(); return Status.OK; } diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 28bdd9cb1..e5dcc2d62 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -19,6 +19,7 @@ public unsafe partial class FasterKV(); + allocator.Acquire(); //do something int numBucketsToAdd = 16 * allocator.GetPageSize(); @@ -42,8 +43,12 @@ public unsafe void MallocFixedPageSizeRecoveryTest() //wait until complete allocator.IsCheckpointCompleted(true); + allocator.Release(); + allocator.Dispose(); var recoveredAllocator = new MallocFixedPageSize(); + recoveredAllocator.Acquire(); + //issue call to recover recoveredAllocator.BeginRecovery(device, 0, numBucketsToAdd, numBytesWritten, out ulong numBytesRead); //wait until complete @@ -61,6 +66,9 @@ public unsafe void MallocFixedPageSizeRecoveryTest() Assert.IsTrue(bucket->bucket_entries[j] == rand2.Next()); } } + + recoveredAllocator.Release(); + recoveredAllocator.Dispose(); } [Test] @@ -135,6 +143,9 @@ public unsafe void TestFuzzyIndexRecovery() Assert.IsTrue(entry1.word == entry2.word); } } + + hash_table1.Free(); + hash_table2.Free(); } } } diff --git a/cs/test/FullRecoveryTests.cs b/cs/test/FullRecoveryTests.cs index 872030035..12d91aded 100644 --- a/cs/test/FullRecoveryTests.cs +++ b/cs/test/FullRecoveryTests.cs @@ -78,6 +78,8 @@ public static void DeleteDirectory(string path) public void RecoveryTest1() { Populate(); + fht.Dispose(); + fht = null; log.Close(); Setup(); RecoverAndTest(token, token); diff --git a/cs/test/ObjectRecoveryTest.cs b/cs/test/ObjectRecoveryTest.cs index 12b2d1ce9..670afbce7 100644 --- a/cs/test/ObjectRecoveryTest.cs +++ b/cs/test/ObjectRecoveryTest.cs @@ -87,6 +87,8 @@ public static void DeleteDirectory(string path) public void ObjectRecoveryTest1() { Populate(); + fht.Dispose(); + fht = null; log.Close(); objlog.Close(); Setup(); From 934faa08aadced181ac86af9cbc0c0abb1b8e57d Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 3 Apr 2019 12:01:11 -0700 Subject: [PATCH 07/15] Close memory stream after calling EndSerialize --- cs/src/core/Allocator/GenericAllocator.cs | 81 ++++------------------- 1 file changed, 13 insertions(+), 68 deletions(-) diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 118146c29..d0437b08d 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -372,9 +372,21 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres if (ms.Position > ObjectBlockSize || i == (end / recordSize) - 1) { + if (KeyHasObjects()) + keySerializer.EndSerialize(); + if (ValueHasObjects()) + valueSerializer.EndSerialize(); var _s = ms.ToArray(); ms.Close(); - ms = new MemoryStream(); + + if (i < (end / recordSize) - 1) + { + ms = new MemoryStream(); + if (KeyHasObjects()) + keySerializer.BeginSerialize(ms); + if (ValueHasObjects()) + valueSerializer.BeginSerialize(ms); + } var _objBuffer = ioBufferPool.Get(_s.Length); @@ -757,73 +769,6 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record[] } } - /// - /// Serialize part of page to stream - /// - /// From pointer - /// Until pointer - /// Stream - /// Size of blocks to serialize in chunks of - /// List of addresses that need to be updated with offsets - public void Serialize(ref long ptr, long untilptr, Stream stream, int objectBlockSize, out List addr) - { - IObjectSerializer keySerializer = null; - IObjectSerializer valueSerializer = null; - - if (KeyHasObjects()) - { - keySerializer = SerializerSettings.keySerializer(); - keySerializer.BeginSerialize(stream); - } - if (ValueHasObjects()) - { - valueSerializer = SerializerSettings.valueSerializer(); - valueSerializer.BeginSerialize(stream); - } - - addr = new List(); - while (ptr < untilptr) - { - if (!GetInfo(ptr).Invalid) - { - long pos = stream.Position; - - if (KeyHasObjects()) - { - keySerializer.Serialize(ref GetKey(ptr)); - var key_address = GetKeyAddressInfo(ptr); - key_address->Address = pos; - key_address->Size = (int)(stream.Position - pos); - addr.Add((long)key_address); - } - - if (ValueHasObjects() && !GetInfo(ptr).Tombstone) - { - pos = stream.Position; - var value_address = GetValueAddressInfo(ptr); - valueSerializer.Serialize(ref GetValue(ptr)); - value_address->Address = pos; - value_address->Size = (int)(stream.Position - pos); - addr.Add((long)value_address); - } - - } - ptr += GetRecordSize(ptr); - - if (stream.Position > objectBlockSize) - break; - } - - if (KeyHasObjects()) - { - keySerializer.EndSerialize(); - } - if (ValueHasObjects()) - { - valueSerializer.EndSerialize(); - } - } - /// /// Get location and range of object log addresses for specified log page /// From 7c6f1767424cc69692d17e3edb311446793899c9 Mon Sep 17 00:00:00 2001 From: Peter Freiling Date: Wed, 3 Apr 2019 12:06:01 -0700 Subject: [PATCH 08/15] Making DumpDistribution return a string instead of writing to console --- cs/benchmark/FasterYcsbBenchmark.cs | 2 +- cs/src/core/Index/FASTER/FASTERBase.cs | 20 +++++++++++--------- cs/src/core/Index/Interfaces/IFasterKV.cs | 4 ++-- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index 9069b2541..8d98a69fa 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -248,7 +248,7 @@ public unsafe void Run() idx_ = 0; - store.DumpDistribution(); + Console.WriteLine(store.DumpDistribution()); Console.WriteLine("Executing experiment."); diff --git a/cs/src/core/Index/FASTER/FASTERBase.cs b/cs/src/core/Index/FASTER/FASTERBase.cs index 9b2109cc5..61f30f42a 100644 --- a/cs/src/core/Index/FASTER/FASTERBase.cs +++ b/cs/src/core/Index/FASTER/FASTERBase.cs @@ -733,7 +733,7 @@ protected virtual long GetEntryCount() /// /// /// - protected virtual void _DumpDistribution(int version) + protected virtual string _DumpDistribution(int version) { var table_size_ = state[version].size; var ptable_ = state[version].tableAligned; @@ -768,23 +768,25 @@ protected virtual void _DumpDistribution(int version) histogram[cnt]++; } - Console.WriteLine("Number of hash buckets: {0}", table_size_); - Console.WriteLine("Total distinct hash-table entry count: {0}", total_record_count); - Console.WriteLine("Average #entries per hash bucket: {0:0.00}", total_record_count / (double)table_size_); - Console.WriteLine("Histogram of #entries per bucket: "); - + var distribution = + $"Number of hash buckets: {{{table_size_}}}\n" + + $"Total distinct hash-table entry count: {{{total_record_count}}}\n" + + $"Average #entries per hash bucket: {{{total_record_count / (double)table_size_:0.00}}}\n" + + $"Histogram of #entries per bucket:\n"; foreach (var kvp in histogram.OrderBy(e => e.Key)) { - Console.WriteLine(kvp.Key.ToString() + ": " + kvp.Value.ToString(CultureInfo.InvariantCulture)); + distribution += $" {kvp.Key} : {kvp.Value}\n"; } + + return distribution; } /// /// Dumps the distribution of each non-empty bucket in the hash table. /// - public void DumpDistribution() + public string DumpDistribution() { - _DumpDistribution(resizeInfo.version); + return _DumpDistribution(resizeInfo.version); } } diff --git a/cs/src/core/Index/Interfaces/IFasterKV.cs b/cs/src/core/Index/Interfaces/IFasterKV.cs index d1eb1fd90..8d995a3c0 100644 --- a/cs/src/core/Index/Interfaces/IFasterKV.cs +++ b/cs/src/core/Index/Interfaces/IFasterKV.cs @@ -162,9 +162,9 @@ public interface IFasterKV : IDisposable IFasterEqualityComparer Comparer { get; } /// - /// Dump distribution of #entries in hash table, to console + /// Dump distribution of #entries in hash table /// - void DumpDistribution(); + string DumpDistribution(); /// /// Experimental feature From 8679b065ceb7d266561e32014f81ef8acf06427d Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 3 Apr 2019 12:10:34 -0700 Subject: [PATCH 09/15] Removed redundant EndSerialize --- cs/src/core/Allocator/GenericAllocator.cs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index d0437b08d..89c4fe298 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -425,14 +425,6 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres } } } - if (KeyHasObjects()) - { - keySerializer.EndSerialize(); - } - if (ValueHasObjects()) - { - valueSerializer.EndSerialize(); - } if (asyncResult.partial) { From f41f1e8ad00fd9a3f5828da62c1f1ffc2fc92613 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 3 Apr 2019 15:18:18 -0700 Subject: [PATCH 10/15] Buffer pools are not shared across instances. --- cs/playground/SumStore/RecoveryTest.cs | 2 - cs/src/core/Allocator/AllocatorBase.cs | 9 ++-- cs/src/core/Allocator/GenericAllocator.cs | 14 ++--- .../core/Device/ManagedLocalStorageDevice.cs | 3 +- cs/src/core/Utilities/BufferPool.cs | 52 ++++--------------- 5 files changed, 21 insertions(+), 59 deletions(-) diff --git a/cs/playground/SumStore/RecoveryTest.cs b/cs/playground/SumStore/RecoveryTest.cs index 6d75de8f5..0cedeaa14 100644 --- a/cs/playground/SumStore/RecoveryTest.cs +++ b/cs/playground/SumStore/RecoveryTest.cs @@ -19,7 +19,6 @@ class RecoveryTest const long completePendingInterval = 1 << 12; const int checkpointInterval = 10 * 1000; readonly int threadCount; - readonly int numActiveThreads; FasterKV fht; BlockingCollection inputArrays; @@ -27,7 +26,6 @@ class RecoveryTest public RecoveryTest(int threadCount) { this.threadCount = threadCount; - numActiveThreads = 0; // Create FASTER index var log = Devices.CreateLogDevice("logs\\hlog"); diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 20743207d..665c7a1a4 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -206,9 +206,9 @@ public unsafe abstract class AllocatorBase : IDisposable #endregion /// - /// Read buffer pool + /// Buffer pool /// - protected SectorAlignedBufferPool readBufferPool; + protected SectorAlignedBufferPool bufferPool; /// /// Read cache @@ -492,7 +492,7 @@ protected void Initialize(long firstValidAddress) Debug.Assert(firstValidAddress <= PageSize); Debug.Assert(PageSize >= GetRecordSize(0)); - readBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize); + bufferPool = new SectorAlignedBufferPool(1, sectorSize); long tailPage = firstValidAddress >> LogPageSizeBits; int tailPageIndex = (int)(tailPage % BufferSize); @@ -551,6 +551,7 @@ public virtual void Dispose() if (ownedEpoch) epoch.Dispose(); + bufferPool.Free(); } /// @@ -1152,7 +1153,7 @@ public void RecoveryReset(long tailAddress, long headAddress) uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset); alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1)); - var record = readBufferPool.Get((int)alignedReadLength); + var record = bufferPool.Get((int)alignedReadLength); record.valid_offset = (int)(fileOffset - alignedFileOffset); record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); record.required_bytes = numBytes; diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 89c4fe298..5f9dce576 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -35,8 +35,6 @@ public unsafe sealed class GenericAllocator : AllocatorBase)); private readonly SerializerSettings SerializerSettings; @@ -66,8 +64,6 @@ public GenericAllocator(LogSettings settings, SerializerSettings ser if (objectLogDevice == null) throw new Exception("Objects in key/value, but object log not provided during creation of FASTER instance"); } - - ioBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize); } public override void Initialize() @@ -294,7 +290,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres if (localSegmentOffsets == null) localSegmentOffsets = segmentOffsets; var src = values[flushPage % BufferSize]; - var buffer = ioBufferPool.Get((int)numBytesToWrite); + var buffer = bufferPool.Get((int)numBytesToWrite); if (aligned_start < start && (KeyHasObjects() || ValueHasObjects())) { @@ -388,7 +384,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres valueSerializer.BeginSerialize(ms); } - var _objBuffer = ioBufferPool.Get(_s.Length); + var _objBuffer = bufferPool.Get(_s.Length); asyncResult.done = new AutoResetEvent(false); @@ -459,7 +455,7 @@ protected override void ReadAsync( ulong alignedSourceAddress, int destinationPageIndex, uint aligned_read_length, IOCompletionCallback callback, PageAsyncReadResult asyncResult, IDevice device, IDevice objlogDevice) { - asyncResult.freeBuffer1 = readBufferPool.Get((int)aligned_read_length); + asyncResult.freeBuffer1 = bufferPool.Get((int)aligned_read_length); asyncResult.freeBuffer1.required_bytes = (int)aligned_read_length; if (!(KeyHasObjects() || ValueHasObjects())) @@ -584,7 +580,7 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num if (size > int.MaxValue) throw new Exception("Unable to read object page, total size greater than 2GB: " + size); - var objBuffer = ioBufferPool.Get((int)size); + var objBuffer = bufferPool.Get((int)size); result.freeBuffer2 = objBuffer; var alignedLength = (size + (sectorSize - 1)) & ~(sectorSize - 1); @@ -612,7 +608,7 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset); alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1)); - var record = readBufferPool.Get((int)alignedReadLength); + var record = bufferPool.Get((int)alignedReadLength); record.valid_offset = (int)(fileOffset - alignedFileOffset); record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); record.required_bytes = numBytes; diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs index 595bc5bc2..df1b81a6f 100644 --- a/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -30,7 +30,7 @@ public class ManagedLocalStorageDevice : StorageDeviceBase public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false) : base(filename, GetSectorSize(filename)) { - pool = SectorAlignedBufferPool.GetPool(1, 1); + pool = new SectorAlignedBufferPool(1, 1); this.preallocateFile = preallocateFile; this.deleteOnClose = deleteOnClose; @@ -162,6 +162,7 @@ public override void Close() { foreach (var logHandle in logHandles.Values) logHandle.Dispose(); + pool.Free(); } diff --git a/cs/src/core/Utilities/BufferPool.cs b/cs/src/core/Utilities/BufferPool.cs index 332d6ff36..341daef18 100644 --- a/cs/src/core/Utilities/BufferPool.cs +++ b/cs/src/core/Utilities/BufferPool.cs @@ -97,51 +97,16 @@ public override string ToString() /// public class SectorAlignedBufferPool { + /// + /// Disable buffer pool + /// + public static bool Disabled = false; + private const int levels = 32; private readonly int recordSize; private readonly int sectorSize; private readonly ConcurrentQueue[] queue; - private static SafeConcurrentDictionary, SectorAlignedBufferPool> _instances - = new SafeConcurrentDictionary, SectorAlignedBufferPool>(); - - /// - /// Clear buffer pool - /// - public static void Clear() - { - foreach (var pool in _instances.Values) - { - pool.Free(); - } - _instances.Clear(); - } - - /// - /// Print contents of buffer pool - /// - public static void PrintAll() - { - foreach (var kvp in _instances) - { - Console.WriteLine("Pool Key: {0}", kvp.Key); - kvp.Value.Print(); - } - } - - /// - /// Get cached instance of buffer pool for specified params - /// - /// Record size - /// Sector size - /// - public static SectorAlignedBufferPool GetPool(int recordSize, int sectorSize) - { - return - _instances.GetOrAdd(new Tuple(recordSize, sectorSize), - t => new SectorAlignedBufferPool(t.Item1, t.Item2)); - } - /// /// Constructor /// @@ -159,14 +124,15 @@ public SectorAlignedBufferPool(int recordSize, int sectorSize) /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public unsafe void Return(SectorAlignedMemory page) + public void Return(SectorAlignedMemory page) { Debug.Assert(queue[page.level] != null); page.available_bytes = 0; page.required_bytes = 0; page.valid_offset = 0; Array.Clear(page.buffer, 0, page.buffer.Length); - queue[page.level].Enqueue(page); + if (!Disabled) + queue[page.level].Enqueue(page); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -201,7 +167,7 @@ public unsafe SectorAlignedMemory Get(int numRecords) Interlocked.CompareExchange(ref queue[index], localPool, null); } - if (queue[index].TryDequeue(out SectorAlignedMemory page)) + if (!Disabled && queue[index].TryDequeue(out SectorAlignedMemory page)) { return page; } From b78aee363f1540e05cfb5007cb909a21fba13854 Mon Sep 17 00:00:00 2001 From: Peter Freiling Date: Wed, 3 Apr 2019 20:43:28 -0700 Subject: [PATCH 11/15] Removing extra buffer copy --- cs/src/core/Allocator/GenericAllocator.cs | 33 +++++++++++------------ 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 5f9dce576..934d91765 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -368,11 +368,26 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres if (ms.Position > ObjectBlockSize || i == (end / recordSize) - 1) { + var memoryStreamLength = (int)ms.Position; + + var _objBuffer = bufferPool.Get(memoryStreamLength); + + asyncResult.done = new AutoResetEvent(false); + + var _alignedLength = (memoryStreamLength + (sectorSize - 1)) & ~(sectorSize - 1); + + var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength; + fixed (void* src_ = ms.GetBuffer()) + Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, memoryStreamLength, memoryStreamLength); + + foreach (var address in addr) + ((AddressInfo*)address)->Address += _objAddr; + if (KeyHasObjects()) keySerializer.EndSerialize(); if (ValueHasObjects()) valueSerializer.EndSerialize(); - var _s = ms.ToArray(); + ms.Close(); if (i < (end / recordSize) - 1) @@ -382,23 +397,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres keySerializer.BeginSerialize(ms); if (ValueHasObjects()) valueSerializer.BeginSerialize(ms); - } - var _objBuffer = bufferPool.Get(_s.Length); - - asyncResult.done = new AutoResetEvent(false); - - var _alignedLength = (_s.Length + (sectorSize - 1)) & ~(sectorSize - 1); - - var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength; - fixed (void* src_ = _s) - Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, _s.Length, _s.Length); - - foreach (var address in addr) - ((AddressInfo*)address)->Address += _objAddr; - - if (i < (end / recordSize) - 1) - { objlogDevice.WriteAsync( (IntPtr)_objBuffer.aligned_pointer, (int)(alignedDestinationAddress >> LogSegmentSizeBits), From 765c083804ba3d7b7a43f3cb9ac3738401e557ed Mon Sep 17 00:00:00 2001 From: Peter Freiling Date: Thu, 4 Apr 2019 10:01:26 -0700 Subject: [PATCH 12/15] Fixing issue where LightEpoch.IsProtected dereferences FastThreadLocal.values for threads that haven't initialized values yet --- cs/src/core/Epochs/FastThreadLocal.cs | 6 ++++-- cs/src/core/Epochs/LightEpoch.cs | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cs/src/core/Epochs/FastThreadLocal.cs b/cs/src/core/Epochs/FastThreadLocal.cs index ce5827365..e9f53656f 100644 --- a/cs/src/core/Epochs/FastThreadLocal.cs +++ b/cs/src/core/Epochs/FastThreadLocal.cs @@ -64,8 +64,10 @@ public void Dispose() public T Value { - get { return values[id]; } - set { values[id] = value; } + get => values[id]; + set => values[id] = value; } + + public bool IsInitializedForThread => values != null; } } diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 7e0e60e7e..2cd7f6232 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -121,7 +121,7 @@ public void Dispose() /// Result of the check public bool IsProtected() { - return kInvalidIndex != threadEntryIndex.Value; + return threadEntryIndex.IsInitializedForThread && kInvalidIndex != threadEntryIndex.Value; } /// From 0bd079086e78ab85e1738a1040ef86206113c830 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 4 Apr 2019 10:57:00 -0700 Subject: [PATCH 13/15] Squashed commit of the following: commit 2ba964526c96c5c6852d1b702348a5bc9fa94dcb Merge: f2b2c1f 765c083 Author: Badrish Chandramouli Date: Thu Apr 4 10:54:51 2019 -0700 Merge branch 'nostatics' of https://github.com/Microsoft/FASTER into readobjfix commit f2b2c1f84b0bac97da222de4f3c929743742ecf2 Author: Badrish Chandramouli Date: Thu Apr 4 10:54:09 2019 -0700 Fixes to addresses, cleanup object read logic commit 765c083804ba3d7b7a43f3cb9ac3738401e557ed Author: Peter Freiling Date: Thu Apr 4 10:01:26 2019 -0700 Fixing issue where LightEpoch.IsProtected dereferences FastThreadLocal.values for threads that haven't initialized values yet commit b78aee363f1540e05cfb5007cb909a21fba13854 Author: Peter Freiling Date: Wed Apr 3 20:43:28 2019 -0700 Removing extra buffer copy commit cd0da5ec37469486e0b6cd978c2088a5ee6bb598 Author: Badrish Chandramouli Date: Wed Apr 3 19:59:53 2019 -0700 Updates commit 8d2ceeb6afcdaf2dacb98c0fa905a66bba7f3ae9 Author: Badrish Chandramouli Date: Wed Apr 3 19:22:39 2019 -0700 Fix for reading objects --- cs/src/core/Allocator/AllocatorBase.cs | 19 +++- cs/src/core/Allocator/BlittableAllocator.cs | 14 ++- .../core/Allocator/BlittableScanIterator.cs | 6 +- cs/src/core/Allocator/GenericAllocator.cs | 87 +++++++++---------- cs/src/core/Allocator/GenericScanIterator.cs | 6 +- cs/src/core/Epochs/FastThreadLocal.cs | 6 +- cs/src/core/Epochs/LightEpoch.cs | 2 +- cs/src/core/Index/FASTER/Recovery.cs | 20 +++-- cs/src/core/Utilities/BufferPool.cs | 8 +- cs/src/core/Utilities/PageAsyncResultTypes.cs | 22 ++--- 10 files changed, 103 insertions(+), 87 deletions(-) diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 665c7a1a4..d7ba60eb0 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -1174,6 +1174,7 @@ public void RecoveryReset(long tailAddress, long headAddress) /// /// /// + /// /// /// /// @@ -1182,12 +1183,13 @@ public void RecoveryReset(long tailAddress, long headAddress) public void AsyncReadPagesFromDevice( long readPageStart, int numPages, + long untilAddress, IOCompletionCallback callback, TContext context, long devicePageOffset = 0, IDevice logDevice = null, IDevice objectLogDevice = null) { - AsyncReadPagesFromDevice(readPageStart, numPages, callback, context, + AsyncReadPagesFromDevice(readPageStart, numPages, untilAddress, callback, context, out CountdownEvent completed, devicePageOffset, logDevice, objectLogDevice); } @@ -1197,6 +1199,7 @@ public void AsyncReadPagesFromDevice( /// /// /// + /// /// /// /// @@ -1206,6 +1209,7 @@ public void AsyncReadPagesFromDevice( private void AsyncReadPagesFromDevice( long readPageStart, int numPages, + long untilAddress, IOCompletionCallback callback, TContext context, out CountdownEvent completed, @@ -1238,15 +1242,24 @@ private void AsyncReadPagesFromDevice( page = readPage, context = context, handle = completed, - count = 1 + maxPtr = PageSize }; ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage); + uint readLength = (uint)AlignedPageSizeBytes; + long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask)); + if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize)) + { + readLength = (uint)(adjustedUntilAddress - (long)offsetInFile); + asyncResult.maxPtr = readLength; + readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1)); + } + if (device != null) offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset)); - ReadAsync(offsetInFile, pageIndex, (uint)PageSize, callback, asyncResult, usedDevice, usedObjlogDevice); + ReadAsync(offsetInFile, pageIndex, readLength, callback, asyncResult, usedDevice, usedObjlogDevice); } } diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs index edb03c1dc..eb9126640 100644 --- a/cs/src/core/Allocator/BlittableAllocator.cs +++ b/cs/src/core/Allocator/BlittableAllocator.cs @@ -331,6 +331,7 @@ public override IFasterScanIterator Scan(long beginAddress, long end /// /// /// + /// /// /// /// @@ -341,6 +342,7 @@ public override IFasterScanIterator Scan(long beginAddress, long end internal void AsyncReadPagesFromDeviceToFrame( long readPageStart, int numPages, + long untilAddress, IOCompletionCallback callback, TContext context, BlittableFrame frame, @@ -373,16 +375,24 @@ internal void AsyncReadPagesFromDeviceToFrame( page = readPage, context = context, handle = completed, - count = 1, frame = frame }; ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage); + uint readLength = (uint)AlignedPageSizeBytes; + long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask)); + + if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize)) + { + readLength = (uint)(adjustedUntilAddress - (long)offsetInFile); + readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1)); + } + if (device != null) offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset)); - usedDevice.ReadAsync(offsetInFile, (IntPtr)frame.pointers[pageIndex], (uint)AlignedPageSizeBytes, callback, asyncResult); + usedDevice.ReadAsync(offsetInFile, (IntPtr)frame.pointers[pageIndex], readLength, callback, asyncResult); } } } diff --git a/cs/src/core/Allocator/BlittableScanIterator.cs b/cs/src/core/Allocator/BlittableScanIterator.cs index 87b3453b0..2540ab8c9 100644 --- a/cs/src/core/Allocator/BlittableScanIterator.cs +++ b/cs/src/core/Allocator/BlittableScanIterator.cs @@ -61,7 +61,7 @@ public unsafe BlittableScanIterator(BlittableAllocator hlog, long be var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize; hlog.AsyncReadPagesFromDeviceToFrame (nextAddress >> hlog.LogPageSizeBits, - 1, AsyncReadPagesCallback, Empty.Default, + 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[frameNumber]); } } @@ -160,7 +160,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu { if (!first) { - hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]); + hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]); } } else @@ -169,7 +169,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu if ((endPage > currentPage) && ((endPage > currentPage + 1) || ((endAddress & hlog.PageSizeMask) != 0))) { - hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]); + hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]); } } first = false; diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 5f9dce576..ad712dd8b 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -368,11 +368,26 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres if (ms.Position > ObjectBlockSize || i == (end / recordSize) - 1) { + var memoryStreamLength = (int)ms.Position; + + var _objBuffer = bufferPool.Get(memoryStreamLength); + + asyncResult.done = new AutoResetEvent(false); + + var _alignedLength = (memoryStreamLength + (sectorSize - 1)) & ~(sectorSize - 1); + + var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength; + fixed (void* src_ = ms.GetBuffer()) + Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, memoryStreamLength, memoryStreamLength); + + foreach (var address in addr) + ((AddressInfo*)address)->Address += _objAddr; + if (KeyHasObjects()) keySerializer.EndSerialize(); if (ValueHasObjects()) valueSerializer.EndSerialize(); - var _s = ms.ToArray(); + ms.Close(); if (i < (end / recordSize) - 1) @@ -382,23 +397,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres keySerializer.BeginSerialize(ms); if (ValueHasObjects()) valueSerializer.BeginSerialize(ms); - } - - var _objBuffer = bufferPool.Get(_s.Length); - - asyncResult.done = new AutoResetEvent(false); - var _alignedLength = (_s.Length + (sectorSize - 1)) & ~(sectorSize - 1); - - var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength; - fixed (void* src_ = _s) - Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, _s.Length, _s.Length); - - foreach (var address in addr) - ((AddressInfo*)address)->Address += _objAddr; - - if (i < (end / recordSize) - 1) - { objlogDevice.WriteAsync( (IntPtr)_objBuffer.aligned_pointer, (int)(alignedDestinationAddress >> LogSegmentSizeBits), @@ -466,7 +465,6 @@ protected override void ReadAsync( } asyncResult.callback = callback; - asyncResult.count++; if (objlogDevice == null) { @@ -517,47 +515,34 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num var frame = (GenericFrame)result.frame; src = frame.GetPage(result.page % frame.frameSize); - if (result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0) + if (result.freeBuffer2 == null && result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0) { - PopulatePageFrame(result.freeBuffer1.GetValidPointer(), PageSize, src); - result.freeBuffer1.required_bytes = 0; + PopulatePageFrame(result.freeBuffer1.GetValidPointer(), (int)result.maxPtr, src); } } else { - if (result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0) + if (result.freeBuffer2 == null && result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0) { - PopulatePage(result.freeBuffer1.GetValidPointer(), PageSize, result.page); - result.freeBuffer1.required_bytes = 0; + PopulatePage(result.freeBuffer1.GetValidPointer(), (int)result.maxPtr, result.page); } } - long ptr = 0; - - // Correct for page 0 of HLOG - //if (result.page == 0) - // ptr += Constants.kFirstValidAddress; - - // Check if we are resuming - if (result.resumeptr > ptr) - ptr = result.resumeptr; - // Deserialize all objects until untilptr - if (ptr < result.untilptr) + if (result.resumePtr < result.untilPtr) { MemoryStream ms = new MemoryStream(result.freeBuffer2.buffer); ms.Seek(result.freeBuffer2.offset, SeekOrigin.Begin); - Deserialize(result.freeBuffer1.GetValidPointer(), ptr, result.untilptr, src, ms); + Deserialize(result.freeBuffer1.GetValidPointer(), result.resumePtr, result.untilPtr, src, ms); ms.Dispose(); - ptr = result.untilptr; result.freeBuffer2.Return(); result.freeBuffer2 = null; - result.resumeptr = ptr; + result.resumePtr = result.untilPtr; } // If we have processed entire page, return - if (ptr >= PageSize) + if (result.untilPtr >= result.maxPtr) { result.Free(); @@ -569,14 +554,13 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num // We will be re-issuing I/O, so free current overlap Overlapped.Free(overlap); - GetObjectInfo(result.freeBuffer1.GetValidPointer(), ref ptr, PageSize, ObjectBlockSize, src, out long startptr, out long size); + // Compute new untilPtr + // We will now be able to process all records until (but not including) untilPtr + GetObjectInfo(result.freeBuffer1.GetValidPointer(), ref result.untilPtr, result.maxPtr, ObjectBlockSize, src, out long startptr, out long size); // Object log fragment should be aligned by construction Debug.Assert(startptr % sectorSize == 0); - // We will be able to process all records until (but not including) ptr - result.untilptr = ptr; - if (size > int.MaxValue) throw new Exception("Unable to read object page, total size greater than 2GB: " + size); @@ -632,6 +616,7 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num /// /// /// + /// /// /// /// @@ -642,6 +627,7 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num internal void AsyncReadPagesFromDeviceToFrame( long readPageStart, int numPages, + long untilAddress, IOCompletionCallback callback, TContext context, GenericFrame frame, @@ -674,16 +660,25 @@ internal void AsyncReadPagesFromDeviceToFrame( page = readPage, context = context, handle = completed, - count = 1, - frame = frame + maxPtr = PageSize, + frame = frame, }; ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage); + uint readLength = (uint)AlignedPageSizeBytes; + long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask)); + + if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize)) + { + readLength = (uint)(adjustedUntilAddress - (long)offsetInFile); + asyncResult.maxPtr = readLength; + readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1)); + } if (device != null) offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset)); - ReadAsync(offsetInFile, pageIndex, (uint)AlignedPageSizeBytes, callback, asyncResult, usedDevice, usedObjlogDevice); + ReadAsync(offsetInFile, pageIndex, readLength, callback, asyncResult, usedDevice, usedObjlogDevice); } } diff --git a/cs/src/core/Allocator/GenericScanIterator.cs b/cs/src/core/Allocator/GenericScanIterator.cs index 77af23dd6..b1fbd5939 100644 --- a/cs/src/core/Allocator/GenericScanIterator.cs +++ b/cs/src/core/Allocator/GenericScanIterator.cs @@ -64,7 +64,7 @@ public unsafe GenericScanIterator(GenericAllocator hlog, long beginA var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize; hlog.AsyncReadPagesFromDeviceToFrame (nextAddress >> hlog.LogPageSizeBits, - 1, AsyncReadPagesCallback, Empty.Default, + 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[frameNumber]); } } @@ -148,7 +148,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu { if (!first) { - hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]); + hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]); } } else @@ -157,7 +157,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu if ((endPage > currentPage) && ((endPage > currentPage + 1) || ((endAddress & hlog.PageSizeMask) != 0))) { - hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]); + hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]); } } first = false; diff --git a/cs/src/core/Epochs/FastThreadLocal.cs b/cs/src/core/Epochs/FastThreadLocal.cs index ce5827365..e9f53656f 100644 --- a/cs/src/core/Epochs/FastThreadLocal.cs +++ b/cs/src/core/Epochs/FastThreadLocal.cs @@ -64,8 +64,10 @@ public void Dispose() public T Value { - get { return values[id]; } - set { values[id] = value; } + get => values[id]; + set => values[id] = value; } + + public bool IsInitializedForThread => values != null; } } diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 7e0e60e7e..2cd7f6232 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -121,7 +121,7 @@ public void Dispose() /// Result of the check public bool IsProtected() { - return kInvalidIndex != threadEntryIndex.Value; + return threadEntryIndex.IsInitializedForThread && kInvalidIndex != threadEntryIndex.Value; } /// diff --git a/cs/src/core/Index/FASTER/Recovery.cs b/cs/src/core/Index/FASTER/Recovery.cs index 5b2d43d63..5ac34edc3 100644 --- a/cs/src/core/Index/FASTER/Recovery.cs +++ b/cs/src/core/Index/FASTER/Recovery.cs @@ -22,6 +22,7 @@ internal class RecoveryStatus { public long startPage; public long endPage; + public long untilAddress; public int capacity; public IDevice recoveryDevice; @@ -33,11 +34,12 @@ internal class RecoveryStatus public RecoveryStatus(int capacity, long startPage, - long endPage) + long endPage, long untilAddress) { this.capacity = capacity; this.startPage = startPage; this.endPage = endPage; + this.untilAddress = untilAddress; readStatus = new ReadStatus[capacity]; flushStatus = new FlushStatus[capacity]; for (int i = 0; i < capacity; i++) @@ -208,7 +210,7 @@ private void RestoreHybridLog(long untilAddress) } headPage = headPage > 0 ? headPage : 0; - var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage); + var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage, untilAddress); for (int i = 0; i < recoveryStatus.capacity; i++) { recoveryStatus.readStatus[i] = ReadStatus.Done; @@ -222,7 +224,7 @@ private void RestoreHybridLog(long untilAddress) numPages++; } - hlog.AsyncReadPagesFromDevice(headPage, numPages, AsyncReadPagesCallbackForRecovery, recoveryStatus); + hlog.AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); var done = false; while (!done) @@ -259,13 +261,13 @@ private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, // By default first page has one extra record var capacity = hlog.GetCapacityNumPages(); - var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage); + var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress); int totalPagesToRead = (int)(endPage - startPage); int numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); // Issue request to read pages as much as possible - hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, AsyncReadPagesCallbackForRecovery, recoveryStatus); + hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); for (long page = startPage; page < endPage; page++) { @@ -343,7 +345,7 @@ private void RecoverHybridLogFromSnapshotFile( var objectLogRecoveryDevice = Devices.CreateLogDevice(directoryConfiguration.GetHybridLogObjectCheckpointFileName(recoveryInfo.guid), false); recoveryDevice.Initialize(hlog.GetSegmentSize()); objectLogRecoveryDevice.Initialize(hlog.GetSegmentSize()); - var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage) + var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress) { recoveryDevice = recoveryDevice, objectLogRecoveryDevice = objectLogRecoveryDevice, @@ -354,7 +356,7 @@ private void RecoverHybridLogFromSnapshotFile( int totalPagesToRead = (int)(endPage - startPage); int numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); - hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, + hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus, recoveryStatus.recoveryDevicePageOffset, @@ -533,11 +535,11 @@ private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, Na long readPage = result.page + result.context.capacity; if (FoldOverSnapshot) { - hlog.AsyncReadPagesFromDevice(readPage, 1, AsyncReadPagesCallbackForRecovery, result.context); + hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, AsyncReadPagesCallbackForRecovery, result.context); } else { - hlog.AsyncReadPagesFromDevice(readPage, 1, AsyncReadPagesCallbackForRecovery, + hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, AsyncReadPagesCallbackForRecovery, result.context, result.context.recoveryDevicePageOffset, result.context.recoveryDevice, result.context.objectLogRecoveryDevice); diff --git a/cs/src/core/Utilities/BufferPool.cs b/cs/src/core/Utilities/BufferPool.cs index 341daef18..872eb8d4e 100644 --- a/cs/src/core/Utilities/BufferPool.cs +++ b/cs/src/core/Utilities/BufferPool.cs @@ -172,9 +172,11 @@ public unsafe SectorAlignedMemory Get(int numRecords) return page; } - page = new SectorAlignedMemory(); - page.level = index; - page.buffer = new byte[sectorSize * (1 << index)]; + page = new SectorAlignedMemory + { + level = index, + buffer = new byte[sectorSize * (1 << index)] + }; page.handle = GCHandle.Alloc(page.buffer, GCHandleType.Pinned); page.aligned_pointer = (byte*)(((long)page.handle.AddrOfPinnedObject() + (sectorSize - 1)) & ~(sectorSize - 1)); page.offset = (int) ((long)page.aligned_pointer - (long)page.handle.AddrOfPinnedObject()); diff --git a/cs/src/core/Utilities/PageAsyncResultTypes.cs b/cs/src/core/Utilities/PageAsyncResultTypes.cs index 22545626b..5a8792ce7 100644 --- a/cs/src/core/Utilities/PageAsyncResultTypes.cs +++ b/cs/src/core/Utilities/PageAsyncResultTypes.cs @@ -14,28 +14,20 @@ namespace FASTER.core /// public class PageAsyncReadResult : IAsyncResult { - /// - /// Page - /// - public long page; - /// - /// Context - /// - public TContext context; - /// - /// Count - /// - public int count; - + internal long page; + internal TContext context; internal CountdownEvent handle; internal SectorAlignedMemory freeBuffer1; internal SectorAlignedMemory freeBuffer2; internal IOCompletionCallback callback; internal IDevice objlogDevice; - internal long resumeptr; - internal long untilptr; internal object frame; + /* Used for iteration */ + internal long resumePtr; + internal long untilPtr; + internal long maxPtr; + /// /// /// From f2c9f180952f1eaa77ed730a50fd1253c9133ac3 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 4 Apr 2019 12:48:22 -0700 Subject: [PATCH 14/15] Do not spin on read when queue is full, if we are reading from unprotected IO thread --- cs/src/core/Allocator/AllocatorBase.cs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index d7ba60eb0..7075f6553 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -1407,14 +1407,13 @@ public void AsyncGetFromDisk(long fromLogical, AsyncIOContext context, SectorAlignedMemory result = default(SectorAlignedMemory)) { - while (numPendingReads > 120) + if (epoch.IsProtected()) // Do not spin for unprotected IO threads { - Thread.SpinWait(100); - - // Do not protect if we are not already protected - // E.g., we are in an IO thread - if (epoch.IsProtected()) + while (numPendingReads > 120) + { + Thread.SpinWait(100); epoch.ProtectAndDrain(); + } } Interlocked.Increment(ref numPendingReads); From d80dcebf1477d8853fa8ce322ddc617aadc9c8f4 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 4 Apr 2019 18:49:40 -0700 Subject: [PATCH 15/15] Changed SpinWait to Yield for async reads --- cs/src/core/Allocator/AllocatorBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 7075f6553..a92cf3c62 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -1411,7 +1411,7 @@ public void AsyncGetFromDisk(long fromLogical, { while (numPendingReads > 120) { - Thread.SpinWait(100); + Thread.Yield(); epoch.ProtectAndDrain(); } }