diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index 9069b2541..8d98a69fa 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -248,7 +248,7 @@ public unsafe void Run() idx_ = 0; - store.DumpDistribution(); + Console.WriteLine(store.DumpDistribution()); Console.WriteLine("Executing experiment."); diff --git a/cs/playground/SumStore/RecoveryTest.cs b/cs/playground/SumStore/RecoveryTest.cs index 6d75de8f5..0cedeaa14 100644 --- a/cs/playground/SumStore/RecoveryTest.cs +++ b/cs/playground/SumStore/RecoveryTest.cs @@ -19,7 +19,6 @@ class RecoveryTest const long completePendingInterval = 1 << 12; const int checkpointInterval = 10 * 1000; readonly int threadCount; - readonly int numActiveThreads; FasterKV fht; BlockingCollection inputArrays; @@ -27,7 +26,6 @@ class RecoveryTest public RecoveryTest(int threadCount) { this.threadCount = threadCount; - numActiveThreads = 0; // Create FASTER index var log = Devices.CreateLogDevice("logs\\hlog"); diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 8168e6785..a92cf3c62 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -60,7 +60,8 @@ public unsafe abstract class AllocatorBase : IDisposable /// /// Epoch information /// - protected LightEpoch epoch; + protected readonly LightEpoch epoch; + private readonly bool ownedEpoch; /// /// Comparer @@ -201,13 +202,13 @@ public unsafe abstract class AllocatorBase : IDisposable /// /// Number of pending reads /// - private static int numPendingReads = 0; + private int numPendingReads = 0; #endregion /// - /// Read buffer pool + /// Buffer pool /// - protected SectorAlignedBufferPool readBufferPool; + protected SectorAlignedBufferPool bufferPool; /// /// Read cache @@ -415,8 +416,9 @@ public unsafe abstract class AllocatorBase : IDisposable /// /// /// - public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback) - : this(settings, comparer) + /// + public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback, LightEpoch epoch) + : this(settings, comparer, epoch) { if (evictCallback != null) { @@ -430,9 +432,18 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer /// /// /// - public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer) + /// + public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, LightEpoch epoch) { this.comparer = comparer; + if (epoch == null) + { + this.epoch = new LightEpoch(); + ownedEpoch = true; + } + else + this.epoch = epoch; + settings.LogDevice.Initialize(1L << settings.SegmentSizeBits); settings.ObjectLogDevice?.Initialize(1L << settings.SegmentSizeBits); @@ -481,7 +492,7 @@ protected void Initialize(long firstValidAddress) Debug.Assert(firstValidAddress <= PageSize); Debug.Assert(PageSize >= GetRecordSize(0)); - readBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize); + bufferPool = new SectorAlignedBufferPool(1, sectorSize); long tailPage = firstValidAddress >> LogPageSizeBits; int tailPageIndex = (int)(tailPage % BufferSize); @@ -504,6 +515,24 @@ protected void Initialize(long firstValidAddress) TailPageIndex = 0; } + /// + /// Acquire thread + /// + public void Acquire() + { + if (ownedEpoch) + epoch.Acquire(); + } + + /// + /// Release thread + /// + public void Release() + { + if (ownedEpoch) + epoch.Release(); + } + /// /// Dispose allocator /// @@ -519,6 +548,10 @@ public virtual void Dispose() SafeHeadAddress = 0; HeadAddress = 0; BeginAddress = 1; + + if (ownedEpoch) + epoch.Dispose(); + bufferPool.Free(); } /// @@ -1120,7 +1153,7 @@ public void RecoveryReset(long tailAddress, long headAddress) uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset); alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1)); - var record = readBufferPool.Get((int)alignedReadLength); + var record = bufferPool.Get((int)alignedReadLength); record.valid_offset = (int)(fileOffset - alignedFileOffset); record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); record.required_bytes = numBytes; @@ -1141,6 +1174,7 @@ public void RecoveryReset(long tailAddress, long headAddress) /// /// /// + /// /// /// /// @@ -1149,12 +1183,13 @@ public void RecoveryReset(long tailAddress, long headAddress) public void AsyncReadPagesFromDevice( long readPageStart, int numPages, + long untilAddress, IOCompletionCallback callback, TContext context, long devicePageOffset = 0, IDevice logDevice = null, IDevice objectLogDevice = null) { - AsyncReadPagesFromDevice(readPageStart, numPages, callback, context, + AsyncReadPagesFromDevice(readPageStart, numPages, untilAddress, callback, context, out CountdownEvent completed, devicePageOffset, logDevice, objectLogDevice); } @@ -1164,6 +1199,7 @@ public void AsyncReadPagesFromDevice( /// /// /// + /// /// /// /// @@ -1173,6 +1209,7 @@ public void AsyncReadPagesFromDevice( private void AsyncReadPagesFromDevice( long readPageStart, int numPages, + long untilAddress, IOCompletionCallback callback, TContext context, out CountdownEvent completed, @@ -1205,15 +1242,24 @@ private void AsyncReadPagesFromDevice( page = readPage, context = context, handle = completed, - count = 1 + maxPtr = PageSize }; ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage); + uint readLength = (uint)AlignedPageSizeBytes; + long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask)); + if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize)) + { + readLength = (uint)(adjustedUntilAddress - (long)offsetInFile); + asyncResult.maxPtr = readLength; + readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1)); + } + if (device != null) offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset)); - ReadAsync(offsetInFile, pageIndex, (uint)PageSize, callback, asyncResult, usedDevice, usedObjlogDevice); + ReadAsync(offsetInFile, pageIndex, readLength, callback, asyncResult, usedDevice, usedObjlogDevice); } } @@ -1361,14 +1407,13 @@ public void AsyncGetFromDisk(long fromLogical, AsyncIOContext context, SectorAlignedMemory result = default(SectorAlignedMemory)) { - while (numPendingReads > 120) + if (epoch.IsProtected()) // Do not spin for unprotected IO threads { - Thread.SpinWait(100); - - // Do not protect if we are not already protected - // E.g., we are in an IO thread - if (epoch.IsProtected()) + while (numPendingReads > 120) + { + Thread.Yield(); epoch.ProtectAndDrain(); + } } Interlocked.Increment(ref numPendingReads); diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs index ee410f039..eb9126640 100644 --- a/cs/src/core/Allocator/BlittableAllocator.cs +++ b/cs/src/core/Allocator/BlittableAllocator.cs @@ -31,15 +31,13 @@ public unsafe sealed class BlittableAllocator : AllocatorBase comparer, Action evictCallback = null) - : base(settings, comparer, evictCallback) + public BlittableAllocator(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null) + : base(settings, comparer, evictCallback, epoch) { values = new byte[BufferSize][]; handles = new GCHandle[BufferSize]; pointers = new long[BufferSize]; - epoch = LightEpoch.Instance; - ptrHandle = GCHandle.Alloc(pointers, GCHandleType.Pinned); nativePointers = (long*)ptrHandle.AddrOfPinnedObject(); } @@ -333,6 +331,7 @@ public override IFasterScanIterator Scan(long beginAddress, long end /// /// /// + /// /// /// /// @@ -343,6 +342,7 @@ public override IFasterScanIterator Scan(long beginAddress, long end internal void AsyncReadPagesFromDeviceToFrame( long readPageStart, int numPages, + long untilAddress, IOCompletionCallback callback, TContext context, BlittableFrame frame, @@ -375,16 +375,24 @@ internal void AsyncReadPagesFromDeviceToFrame( page = readPage, context = context, handle = completed, - count = 1, frame = frame }; ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage); + uint readLength = (uint)AlignedPageSizeBytes; + long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask)); + + if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize)) + { + readLength = (uint)(adjustedUntilAddress - (long)offsetInFile); + readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1)); + } + if (device != null) offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset)); - usedDevice.ReadAsync(offsetInFile, (IntPtr)frame.pointers[pageIndex], (uint)AlignedPageSizeBytes, callback, asyncResult); + usedDevice.ReadAsync(offsetInFile, (IntPtr)frame.pointers[pageIndex], readLength, callback, asyncResult); } } } diff --git a/cs/src/core/Allocator/BlittableScanIterator.cs b/cs/src/core/Allocator/BlittableScanIterator.cs index 87b3453b0..2540ab8c9 100644 --- a/cs/src/core/Allocator/BlittableScanIterator.cs +++ b/cs/src/core/Allocator/BlittableScanIterator.cs @@ -61,7 +61,7 @@ public unsafe BlittableScanIterator(BlittableAllocator hlog, long be var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize; hlog.AsyncReadPagesFromDeviceToFrame (nextAddress >> hlog.LogPageSizeBits, - 1, AsyncReadPagesCallback, Empty.Default, + 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[frameNumber]); } } @@ -160,7 +160,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu { if (!first) { - hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]); + hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]); } } else @@ -169,7 +169,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu if ((endPage > currentPage) && ((endPage > currentPage + 1) || ((endAddress & hlog.PageSizeMask) != 0))) { - hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]); + hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]); } } first = false; diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 142c66902..ad712dd8b 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -35,14 +35,12 @@ public unsafe sealed class GenericAllocator : AllocatorBase)); private readonly SerializerSettings SerializerSettings; - public GenericAllocator(LogSettings settings, SerializerSettings serializerSettings, IFasterEqualityComparer comparer, Action evictCallback = null) - : base(settings, comparer, evictCallback) + public GenericAllocator(LogSettings settings, SerializerSettings serializerSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null) + : base(settings, comparer, evictCallback, epoch) { SerializerSettings = serializerSettings; @@ -66,9 +64,6 @@ public GenericAllocator(LogSettings settings, SerializerSettings ser if (objectLogDevice == null) throw new Exception("Objects in key/value, but object log not provided during creation of FASTER instance"); } - - epoch = LightEpoch.Instance; - ioBufferPool = SectorAlignedBufferPool.GetPool(1, sectorSize); } public override void Initialize() @@ -295,7 +290,7 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres if (localSegmentOffsets == null) localSegmentOffsets = segmentOffsets; var src = values[flushPage % BufferSize]; - var buffer = ioBufferPool.Get((int)numBytesToWrite); + var buffer = bufferPool.Get((int)numBytesToWrite); if (aligned_start < start && (KeyHasObjects() || ValueHasObjects())) { @@ -373,25 +368,36 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres if (ms.Position > ObjectBlockSize || i == (end / recordSize) - 1) { - var _s = ms.ToArray(); - ms.Close(); - ms = new MemoryStream(); + var memoryStreamLength = (int)ms.Position; - var _objBuffer = ioBufferPool.Get(_s.Length); + var _objBuffer = bufferPool.Get(memoryStreamLength); asyncResult.done = new AutoResetEvent(false); - var _alignedLength = (_s.Length + (sectorSize - 1)) & ~(sectorSize - 1); + var _alignedLength = (memoryStreamLength + (sectorSize - 1)) & ~(sectorSize - 1); var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength; - fixed (void* src_ = _s) - Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, _s.Length, _s.Length); + fixed (void* src_ = ms.GetBuffer()) + Buffer.MemoryCopy(src_, _objBuffer.aligned_pointer, memoryStreamLength, memoryStreamLength); foreach (var address in addr) ((AddressInfo*)address)->Address += _objAddr; + if (KeyHasObjects()) + keySerializer.EndSerialize(); + if (ValueHasObjects()) + valueSerializer.EndSerialize(); + + ms.Close(); + if (i < (end / recordSize) - 1) { + ms = new MemoryStream(); + if (KeyHasObjects()) + keySerializer.BeginSerialize(ms); + if (ValueHasObjects()) + valueSerializer.BeginSerialize(ms); + objlogDevice.WriteAsync( (IntPtr)_objBuffer.aligned_pointer, (int)(alignedDestinationAddress >> LogSegmentSizeBits), @@ -414,14 +420,6 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres } } } - if (KeyHasObjects()) - { - keySerializer.EndSerialize(); - } - if (ValueHasObjects()) - { - valueSerializer.EndSerialize(); - } if (asyncResult.partial) { @@ -456,7 +454,7 @@ protected override void ReadAsync( ulong alignedSourceAddress, int destinationPageIndex, uint aligned_read_length, IOCompletionCallback callback, PageAsyncReadResult asyncResult, IDevice device, IDevice objlogDevice) { - asyncResult.freeBuffer1 = readBufferPool.Get((int)aligned_read_length); + asyncResult.freeBuffer1 = bufferPool.Get((int)aligned_read_length); asyncResult.freeBuffer1.required_bytes = (int)aligned_read_length; if (!(KeyHasObjects() || ValueHasObjects())) @@ -467,7 +465,6 @@ protected override void ReadAsync( } asyncResult.callback = callback; - asyncResult.count++; if (objlogDevice == null) { @@ -518,47 +515,34 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num var frame = (GenericFrame)result.frame; src = frame.GetPage(result.page % frame.frameSize); - if (result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0) + if (result.freeBuffer2 == null && result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0) { - PopulatePageFrame(result.freeBuffer1.GetValidPointer(), PageSize, src); - result.freeBuffer1.required_bytes = 0; + PopulatePageFrame(result.freeBuffer1.GetValidPointer(), (int)result.maxPtr, src); } } else { - if (result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0) + if (result.freeBuffer2 == null && result.freeBuffer1 != null && result.freeBuffer1.required_bytes > 0) { - PopulatePage(result.freeBuffer1.GetValidPointer(), PageSize, result.page); - result.freeBuffer1.required_bytes = 0; + PopulatePage(result.freeBuffer1.GetValidPointer(), (int)result.maxPtr, result.page); } } - long ptr = 0; - - // Correct for page 0 of HLOG - //if (result.page == 0) - // ptr += Constants.kFirstValidAddress; - - // Check if we are resuming - if (result.resumeptr > ptr) - ptr = result.resumeptr; - // Deserialize all objects until untilptr - if (ptr < result.untilptr) + if (result.resumePtr < result.untilPtr) { MemoryStream ms = new MemoryStream(result.freeBuffer2.buffer); ms.Seek(result.freeBuffer2.offset, SeekOrigin.Begin); - Deserialize(result.freeBuffer1.GetValidPointer(), ptr, result.untilptr, src, ms); + Deserialize(result.freeBuffer1.GetValidPointer(), result.resumePtr, result.untilPtr, src, ms); ms.Dispose(); - ptr = result.untilptr; result.freeBuffer2.Return(); result.freeBuffer2 = null; - result.resumeptr = ptr; + result.resumePtr = result.untilPtr; } // If we have processed entire page, return - if (ptr >= PageSize) + if (result.untilPtr >= result.maxPtr) { result.Free(); @@ -570,18 +554,17 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num // We will be re-issuing I/O, so free current overlap Overlapped.Free(overlap); - GetObjectInfo(result.freeBuffer1.GetValidPointer(), ref ptr, PageSize, ObjectBlockSize, src, out long startptr, out long size); + // Compute new untilPtr + // We will now be able to process all records until (but not including) untilPtr + GetObjectInfo(result.freeBuffer1.GetValidPointer(), ref result.untilPtr, result.maxPtr, ObjectBlockSize, src, out long startptr, out long size); // Object log fragment should be aligned by construction Debug.Assert(startptr % sectorSize == 0); - // We will be able to process all records until (but not including) ptr - result.untilptr = ptr; - if (size > int.MaxValue) throw new Exception("Unable to read object page, total size greater than 2GB: " + size); - var objBuffer = ioBufferPool.Get((int)size); + var objBuffer = bufferPool.Get((int)size); result.freeBuffer2 = objBuffer; var alignedLength = (size + (sectorSize - 1)) & ~(sectorSize - 1); @@ -609,7 +592,7 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset); alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1)); - var record = readBufferPool.Get((int)alignedReadLength); + var record = bufferPool.Get((int)alignedReadLength); record.valid_offset = (int)(fileOffset - alignedFileOffset); record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); record.required_bytes = numBytes; @@ -633,6 +616,7 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num /// /// /// + /// /// /// /// @@ -643,6 +627,7 @@ private void AsyncReadPageWithObjectsCallback(uint errorCode, uint num internal void AsyncReadPagesFromDeviceToFrame( long readPageStart, int numPages, + long untilAddress, IOCompletionCallback callback, TContext context, GenericFrame frame, @@ -675,16 +660,25 @@ internal void AsyncReadPagesFromDeviceToFrame( page = readPage, context = context, handle = completed, - count = 1, - frame = frame + maxPtr = PageSize, + frame = frame, }; ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage); + uint readLength = (uint)AlignedPageSizeBytes; + long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask)); + + if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize)) + { + readLength = (uint)(adjustedUntilAddress - (long)offsetInFile); + asyncResult.maxPtr = readLength; + readLength = (uint)((readLength + (sectorSize - 1)) & ~(sectorSize - 1)); + } if (device != null) offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset)); - ReadAsync(offsetInFile, pageIndex, (uint)AlignedPageSizeBytes, callback, asyncResult, usedDevice, usedObjlogDevice); + ReadAsync(offsetInFile, pageIndex, readLength, callback, asyncResult, usedDevice, usedObjlogDevice); } } @@ -758,73 +752,6 @@ public void Deserialize(byte *raw, long ptr, long untilptr, Record[] } } - /// - /// Serialize part of page to stream - /// - /// From pointer - /// Until pointer - /// Stream - /// Size of blocks to serialize in chunks of - /// List of addresses that need to be updated with offsets - public void Serialize(ref long ptr, long untilptr, Stream stream, int objectBlockSize, out List addr) - { - IObjectSerializer keySerializer = null; - IObjectSerializer valueSerializer = null; - - if (KeyHasObjects()) - { - keySerializer = SerializerSettings.keySerializer(); - keySerializer.BeginSerialize(stream); - } - if (ValueHasObjects()) - { - valueSerializer = SerializerSettings.valueSerializer(); - valueSerializer.BeginSerialize(stream); - } - - addr = new List(); - while (ptr < untilptr) - { - if (!GetInfo(ptr).Invalid) - { - long pos = stream.Position; - - if (KeyHasObjects()) - { - keySerializer.Serialize(ref GetKey(ptr)); - var key_address = GetKeyAddressInfo(ptr); - key_address->Address = pos; - key_address->Size = (int)(stream.Position - pos); - addr.Add((long)key_address); - } - - if (ValueHasObjects() && !GetInfo(ptr).Tombstone) - { - pos = stream.Position; - var value_address = GetValueAddressInfo(ptr); - valueSerializer.Serialize(ref GetValue(ptr)); - value_address->Address = pos; - value_address->Size = (int)(stream.Position - pos); - addr.Add((long)value_address); - } - - } - ptr += GetRecordSize(ptr); - - if (stream.Position > objectBlockSize) - break; - } - - if (KeyHasObjects()) - { - keySerializer.EndSerialize(); - } - if (ValueHasObjects()) - { - valueSerializer.EndSerialize(); - } - } - /// /// Get location and range of object log addresses for specified log page /// diff --git a/cs/src/core/Allocator/GenericScanIterator.cs b/cs/src/core/Allocator/GenericScanIterator.cs index 77af23dd6..b1fbd5939 100644 --- a/cs/src/core/Allocator/GenericScanIterator.cs +++ b/cs/src/core/Allocator/GenericScanIterator.cs @@ -64,7 +64,7 @@ public unsafe GenericScanIterator(GenericAllocator hlog, long beginA var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize; hlog.AsyncReadPagesFromDeviceToFrame (nextAddress >> hlog.LogPageSizeBits, - 1, AsyncReadPagesCallback, Empty.Default, + 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[frameNumber]); } } @@ -148,7 +148,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu { if (!first) { - hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]); + hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]); } } else @@ -157,7 +157,7 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu if ((endPage > currentPage) && ((endPage > currentPage + 1) || ((endAddress & hlog.PageSizeMask) != 0))) { - hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]); + hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]); } } first = false; diff --git a/cs/src/core/Allocator/MallocFixedPageSize.cs b/cs/src/core/Allocator/MallocFixedPageSize.cs index d3462e0f7..d08beb2e5 100644 --- a/cs/src/core/Allocator/MallocFixedPageSize.cs +++ b/cs/src/core/Allocator/MallocFixedPageSize.cs @@ -47,15 +47,27 @@ public unsafe class MallocFixedPageSize : IDisposable private CountdownEvent checkpointEvent; - [ThreadStatic] - private static Queue freeList; + private readonly LightEpoch epoch; + private readonly bool ownedEpoch; + + private FastThreadLocal> freeList; /// /// Create new instance /// /// - public MallocFixedPageSize(bool returnPhysicalAddress = false) + /// + public MallocFixedPageSize(bool returnPhysicalAddress = false, LightEpoch epoch = null) { + freeList = new FastThreadLocal>(); + if (epoch == null) + { + this.epoch = new LightEpoch(); + ownedEpoch = true; + } + else + this.epoch = epoch; + values[0] = new T[PageSize]; #if !(CALLOC) @@ -173,8 +185,12 @@ public void FreeAtEpoch(long pointer, int removed_epoch = -1) { values[pointer >> PageSizeBits][pointer & PageSizeMask] = default(T); } - if (freeList == null) freeList = new Queue(); - freeList.Enqueue(new FreeItem { removed_item = pointer, removal_epoch = removed_epoch }); + + freeList.InitializeThread(); + + if (freeList.Value == null) + freeList.Value = new Queue(); + freeList.Value.Enqueue(new FreeItem { removed_item = pointer, removal_epoch = removed_epoch }); } private const int kAllocateChunkSize = 16; @@ -292,14 +308,15 @@ public long BulkAllocate() /// public long Allocate() { - if (freeList == null) + freeList.InitializeThread(); + if (freeList.Value == null) { - freeList = new Queue(); + freeList.Value = new Queue(); } - if (freeList.Count > 0) + if (freeList.Value.Count > 0) { - if (freeList.Peek().removal_epoch <= LightEpoch.Instance.SafeToReclaimEpoch) - return freeList.Dequeue().removed_item; + if (freeList.Value.Peek().removal_epoch <= epoch.SafeToReclaimEpoch) + return freeList.Value.Dequeue().removed_item; //if (freeList.Count % 64 == 0) // LightEpoch.Instance.BumpCurrentEpoch(); @@ -406,6 +423,26 @@ public long Allocate() return index; } + /// + /// Acquire thread + /// + public void Acquire() + { + if (ownedEpoch) + epoch.Acquire(); + freeList.InitializeThread(); + } + + /// + /// Release thread + /// + public void Release() + { + if (ownedEpoch) + epoch.Release(); + freeList.DisposeThread(); + } + /// /// Dispose /// @@ -421,6 +458,9 @@ public void Dispose() values = null; values0 = null; count = 0; + if (ownedEpoch) + epoch.Dispose(); + freeList.Dispose(); } diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs index 595bc5bc2..df1b81a6f 100644 --- a/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -30,7 +30,7 @@ public class ManagedLocalStorageDevice : StorageDeviceBase public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false) : base(filename, GetSectorSize(filename)) { - pool = SectorAlignedBufferPool.GetPool(1, 1); + pool = new SectorAlignedBufferPool(1, 1); this.preallocateFile = preallocateFile; this.deleteOnClose = deleteOnClose; @@ -162,6 +162,7 @@ public override void Close() { foreach (var logHandle in logHandles.Values) logHandle.Dispose(); + pool.Free(); } diff --git a/cs/src/core/Epochs/FastThreadLocal.cs b/cs/src/core/Epochs/FastThreadLocal.cs new file mode 100644 index 000000000..e9f53656f --- /dev/null +++ b/cs/src/core/Epochs/FastThreadLocal.cs @@ -0,0 +1,73 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Threading; + +namespace FASTER.core +{ + /// + /// Fast implementation of instance-thread-local variables + /// + /// + internal class FastThreadLocal + { + // Max instances supported + private const int kMaxInstances = 128; + + [ThreadStatic] + private static T[] values; + + private readonly int id; + private static readonly int[] instances = new int[kMaxInstances]; + + public FastThreadLocal() + { + for (int i = 0; i < kMaxInstances; i++) + { + if (0 == Interlocked.CompareExchange(ref instances[i], 1, 0)) + { + id = i; + return; + } + } + throw new Exception("Unsupported number of simultaneous instances"); + } + + public void InitializeThread() + { + if (values == null) + values = new T[kMaxInstances]; + } + + public void DisposeThread() + { + Value = default(T); + + // Dispose values only if there are no other + // instances active for this thread + for (int i = 0; i < kMaxInstances; i++) + { + if ((instances[i] == 1) && (i != id)) + return; + } + values = null; + } + + /// + /// Dispose instance for all threads + /// + public void Dispose() + { + instances[id] = 0; + } + + public T Value + { + get => values[id]; + set => values[id] = value; + } + + public bool IsInitializedForThread => values != null; + } +} diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 68b736e00..2cd7f6232 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -15,11 +15,6 @@ namespace FASTER.core /// public unsafe class LightEpoch { - /// - /// - /// - public static LightEpoch Instance = new LightEpoch(); - /// /// Default invalid index entry. /// @@ -57,8 +52,7 @@ public unsafe class LightEpoch /// /// A thread's entry in the epoch table. /// - [ThreadStatic] - public static int threadEntryIndex; + private FastThreadLocal threadEntryIndex; /// /// Global current epoch value @@ -79,20 +73,13 @@ public LightEpoch(int size = kTableSize) Initialize(size); } - /// - /// - /// - ~LightEpoch() - { - Uninitialize(); - } - /// /// Initialize the epoch table /// /// unsafe void Initialize(int size) { + threadEntryIndex = new FastThreadLocal(); numEntries = size; // Over-allocate to do cache-line alignment @@ -115,7 +102,7 @@ unsafe void Initialize(int size) /// /// Clean up epoch table /// - void Uninitialize() + public void Dispose() { tableHandle.Free(); tableAligned = null; @@ -124,6 +111,8 @@ void Uninitialize() numEntries = 0; CurrentEpoch = 1; SafeToReclaimEpoch = 0; + + threadEntryIndex.Dispose(); } /// @@ -132,10 +121,9 @@ void Uninitialize() /// Result of the check public bool IsProtected() { - return (kInvalidIndex != threadEntryIndex); + return threadEntryIndex.IsInitializedForThread && kInvalidIndex != threadEntryIndex.Value; } - /// /// Enter the thread into the protected code region /// @@ -143,12 +131,7 @@ public bool IsProtected() [MethodImpl(MethodImplOptions.AggressiveInlining)] public int ProtectAndDrain() { - int entry = threadEntryIndex; - if (kInvalidIndex == entry) - { - entry = ReserveEntryForThread(); - threadEntryIndex = entry; - } + int entry = threadEntryIndex.Value; (*(tableAligned + entry)).localCurrentEpoch = CurrentEpoch; @@ -187,18 +170,29 @@ private void Drain(int nextEpoch) } } + /// + /// Thread acquires its epoch entry + /// + public void Acquire() + { + threadEntryIndex.InitializeThread(); + threadEntryIndex.Value = ReserveEntryForThread(); + } + + /// /// Thread releases its epoch entry /// public void Release() { - int entry = threadEntryIndex; + int entry = threadEntryIndex.Value; if (kInvalidIndex == entry) { return; } - threadEntryIndex = kInvalidIndex; + threadEntryIndex.Value = kInvalidIndex; + threadEntryIndex.DisposeThread(); (*(tableAligned + entry)).localCurrentEpoch = 0; (*(tableAligned + entry)).threadId = 0; } @@ -285,9 +279,12 @@ private int ComputeNewSafeToReclaimEpoch(int currentEpoch) for (int index = 1; index <= numEntries; ++index) { int entry_epoch = (*(tableAligned + index)).localCurrentEpoch; - if (0 != entry_epoch && entry_epoch < oldestOngoingCall) + if (0 != entry_epoch) { - oldestOngoingCall = entry_epoch; + if (entry_epoch < oldestOngoingCall) + { + oldestOngoingCall = entry_epoch; + } } } @@ -391,7 +388,7 @@ private struct EpochActionPair [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool MarkAndCheckIsComplete(int markerIdx, int version) { - int entry = threadEntryIndex; + int entry = threadEntryIndex.Value; if (kInvalidIndex == entry) { Debug.WriteLine("New Thread entered during CPR"); diff --git a/cs/src/core/Index/FASTER/Checkpoint.cs b/cs/src/core/Index/FASTER/Checkpoint.cs index 56c8f2ddf..33a27cfc4 100644 --- a/cs/src/core/Index/FASTER/Checkpoint.cs +++ b/cs/src/core/Index/FASTER/Checkpoint.cs @@ -386,7 +386,7 @@ private bool GlobalMoveToNextState(SystemState currentState, SystemState nextSta [MethodImpl(MethodImplOptions.AggressiveInlining)] private void HandleCheckpointingPhases() { - var previousState = SystemState.Make(threadCtx.phase, threadCtx.version); + var previousState = SystemState.Make(threadCtx.Value.phase, threadCtx.Value.version); var finalState = SystemState.Copy(ref _systemState); // Don't play around when system state is being changed @@ -412,13 +412,13 @@ private void HandleCheckpointingPhases() { case Phase.PREP_INDEX_CHECKPOINT: { - if (!threadCtx.markers[EpochPhaseIdx.PrepareForIndexCheckpt]) + if (!threadCtx.Value.markers[EpochPhaseIdx.PrepareForIndexCheckpt]) { - if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.PrepareForIndexCheckpt, threadCtx.version)) + if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.PrepareForIndexCheckpt, threadCtx.Value.version)) { GlobalMoveToNextCheckpointState(currentState); } - threadCtx.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = true; + threadCtx.Value.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = true; } break; } @@ -427,7 +427,7 @@ private void HandleCheckpointingPhases() if (_checkpointType == CheckpointType.INDEX_ONLY) { // Reseting the marker for a potential FULL or INDEX_ONLY checkpoint in the future - threadCtx.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = false; + threadCtx.Value.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = false; } if (IsIndexFuzzyCheckpointCompleted()) @@ -438,7 +438,7 @@ private void HandleCheckpointingPhases() } case Phase.PREPARE: { - if (!threadCtx.markers[EpochPhaseIdx.Prepare]) + if (!threadCtx.Value.markers[EpochPhaseIdx.Prepare]) { // Thread local action AcquireSharedLatchesForAllPendingRequests(); @@ -446,14 +446,14 @@ private void HandleCheckpointingPhases() var idx = Interlocked.Increment(ref _hybridLogCheckpoint.info.numThreads); idx -= 1; - _hybridLogCheckpoint.info.guids[idx] = threadCtx.guid; + _hybridLogCheckpoint.info.guids[idx] = threadCtx.Value.guid; - if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.Prepare, threadCtx.version)) + if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.Prepare, threadCtx.Value.version)) { GlobalMoveToNextCheckpointState(currentState); } - threadCtx.markers[EpochPhaseIdx.Prepare] = true; + threadCtx.Value.markers[EpochPhaseIdx.Prepare] = true; } break; } @@ -463,41 +463,41 @@ private void HandleCheckpointingPhases() FasterExecutionContext ctx; if (previousState.phase == Phase.PREPARE) { - ctx = threadCtx; + ctx = threadCtx.Value; } else { - ctx = prevThreadCtx; + ctx = prevThreadCtx.Value; } if (!ctx.markers[EpochPhaseIdx.InProgress]) { - prevThreadCtx = threadCtx; + prevThreadCtx.Value = threadCtx.Value; - InitLocalContext(ref threadCtx, prevThreadCtx.guid); + InitLocalContext(prevThreadCtx.Value.guid); if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.InProgress, ctx.version)) { GlobalMoveToNextCheckpointState(currentState); } - prevThreadCtx.markers[EpochPhaseIdx.InProgress] = true; + prevThreadCtx.Value.markers[EpochPhaseIdx.InProgress] = true; } break; } case Phase.WAIT_PENDING: { - if (!prevThreadCtx.markers[EpochPhaseIdx.WaitPending]) + if (!prevThreadCtx.Value.markers[EpochPhaseIdx.WaitPending]) { - var notify = (prevThreadCtx.ioPendingRequests.Count == 0); - notify = notify && (prevThreadCtx.retryRequests.Count == 0); + var notify = (prevThreadCtx.Value.ioPendingRequests.Count == 0); + notify = notify && (prevThreadCtx.Value.retryRequests.Count == 0); if (notify) { - if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitPending, threadCtx.version)) + if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitPending, threadCtx.Value.version)) { GlobalMoveToNextCheckpointState(currentState); } - prevThreadCtx.markers[EpochPhaseIdx.WaitPending] = true; + prevThreadCtx.Value.markers[EpochPhaseIdx.WaitPending] = true; } } @@ -505,7 +505,7 @@ private void HandleCheckpointingPhases() } case Phase.WAIT_FLUSH: { - if (!prevThreadCtx.markers[EpochPhaseIdx.WaitFlush]) + if (!prevThreadCtx.Value.markers[EpochPhaseIdx.WaitFlush]) { var notify = false; if (FoldOverSnapshot) @@ -526,12 +526,12 @@ private void HandleCheckpointingPhases() { WriteHybridLogContextInfo(); - if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitFlush, prevThreadCtx.version)) + if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.WaitFlush, prevThreadCtx.Value.version)) { GlobalMoveToNextCheckpointState(currentState); } - prevThreadCtx.markers[EpochPhaseIdx.WaitFlush] = true; + prevThreadCtx.Value.markers[EpochPhaseIdx.WaitFlush] = true; } } break; @@ -539,17 +539,17 @@ private void HandleCheckpointingPhases() case Phase.PERSISTENCE_CALLBACK: { - if (!prevThreadCtx.markers[EpochPhaseIdx.CheckpointCompletionCallback]) + if (!prevThreadCtx.Value.markers[EpochPhaseIdx.CheckpointCompletionCallback]) { // Thread local action - functions.CheckpointCompletionCallback(threadCtx.guid, prevThreadCtx.serialNum); + functions.CheckpointCompletionCallback(threadCtx.Value.guid, prevThreadCtx.Value.serialNum); - if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.CheckpointCompletionCallback, prevThreadCtx.version)) + if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.CheckpointCompletionCallback, prevThreadCtx.Value.version)) { GlobalMoveToNextCheckpointState(currentState); } - prevThreadCtx.markers[EpochPhaseIdx.CheckpointCompletionCallback] = true; + prevThreadCtx.Value.markers[EpochPhaseIdx.CheckpointCompletionCallback] = true; } break; } @@ -563,8 +563,8 @@ private void HandleCheckpointingPhases() } // update thread local variables - threadCtx.phase = currentState.phase; - threadCtx.version = currentState.version; + threadCtx.Value.phase = currentState.phase; + threadCtx.Value.version = currentState.version; previousState.word = currentState.word; } while (previousState.word != finalState.word); @@ -596,11 +596,11 @@ private bool MakeTransition(SystemState currentState, SystemState nextState) private void AcquireSharedLatchesForAllPendingRequests() { - foreach (var ctx in threadCtx.retryRequests) + foreach (var ctx in threadCtx.Value.retryRequests) { AcquireSharedLatch(ctx.key); } - foreach (var ctx in threadCtx.ioPendingRequests.Values) + foreach (var ctx in threadCtx.Value.ioPendingRequests.Values) { AcquireSharedLatch(ctx.key); } @@ -720,10 +720,10 @@ private void WriteHybridLogCheckpointCompleteFile() private void WriteHybridLogContextInfo() { - string filename = directoryConfiguration.GetHybridLogCheckpointContextFileName(_hybridLogCheckpointToken, prevThreadCtx.guid); + string filename = directoryConfiguration.GetHybridLogCheckpointContextFileName(_hybridLogCheckpointToken, prevThreadCtx.Value.guid); using (var file = new StreamWriter(filename, false)) { - prevThreadCtx.Write(file); + prevThreadCtx.Value.Write(file); file.Flush(); } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index c0cdd04d3..3220fce70 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -67,11 +67,8 @@ private enum CheckpointType private SafeConcurrentDictionary _recoveredSessions; - [ThreadStatic] - private static FasterExecutionContext prevThreadCtx = default(FasterExecutionContext); - - [ThreadStatic] - private static FasterExecutionContext threadCtx = default(FasterExecutionContext); + private FastThreadLocal prevThreadCtx; + private FastThreadLocal threadCtx; /// @@ -85,6 +82,9 @@ private enum CheckpointType /// Serializer settings public FasterKV(long size, Functions functions, LogSettings logSettings, CheckpointSettings checkpointSettings = null, SerializerSettings serializerSettings = null, IFasterEqualityComparer comparer = null) { + threadCtx = new FastThreadLocal(); + prevThreadCtx = new FastThreadLocal(); + if (comparer != null) this.comparer = comparer; else @@ -117,7 +117,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo if (Utility.IsBlittable() && Utility.IsBlittable()) { - hlog = new BlittableAllocator(logSettings, this.comparer); + hlog = new BlittableAllocator(logSettings, this.comparer, null, epoch); Log = new LogAccessor(this, hlog); if (UseReadCache) { @@ -127,14 +127,14 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits, MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction - }, this.comparer, ReadCacheEvict); + }, this.comparer, ReadCacheEvict, epoch); readcache.Initialize(); ReadCache = new LogAccessor(this, readcache); } } else { - hlog = new GenericAllocator(logSettings, serializerSettings, this.comparer); + hlog = new GenericAllocator(logSettings, serializerSettings, this.comparer, null, epoch); Log = new LogAccessor(this, hlog); if (UseReadCache) { @@ -145,7 +145,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits, MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction - }, serializerSettings, this.comparer, ReadCacheEvict); + }, serializerSettings, this.comparer, ReadCacheEvict, epoch); readcache.Initialize(); ReadCache = new LogAccessor(this, readcache); } @@ -353,9 +353,9 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user } else { - status = HandleOperationStatus(threadCtx, context, internalStatus); + status = HandleOperationStatus(threadCtx.Value, context, internalStatus); } - threadCtx.serialNum = monotonicSerialNum; + threadCtx.Value.serialNum = monotonicSerialNum; return status; } @@ -380,9 +380,9 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, l } else { - status = HandleOperationStatus(threadCtx, context, internalStatus); + status = HandleOperationStatus(threadCtx.Value, context, internalStatus); } - threadCtx.serialNum = monotonicSerialNum; + threadCtx.Value.serialNum = monotonicSerialNum; return status; } @@ -406,9 +406,9 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long monoto } else { - status = HandleOperationStatus(threadCtx, context, internalStatus); + status = HandleOperationStatus(threadCtx.Value, context, internalStatus); } - threadCtx.serialNum = monotonicSerialNum; + threadCtx.Value.serialNum = monotonicSerialNum; return status; } @@ -432,7 +432,7 @@ public Status Delete(ref Key key, Context userContext, long monotonicSerialNum) { status = (Status)internalStatus; } - threadCtx.serialNum = monotonicSerialNum; + threadCtx.Value.serialNum = monotonicSerialNum; return status; } @@ -451,6 +451,8 @@ public bool GrowIndex() public void Dispose() { base.Free(); + threadCtx.Dispose(); + prevThreadCtx.Dispose(); hlog.Dispose(); } } diff --git a/cs/src/core/Index/FASTER/FASTERBase.cs b/cs/src/core/Index/FASTER/FASTERBase.cs index c1b612c7f..61f30f42a 100644 --- a/cs/src/core/Index/FASTER/FASTERBase.cs +++ b/cs/src/core/Index/FASTER/FASTERBase.cs @@ -251,7 +251,7 @@ public unsafe partial class FasterBase internal long minTableSize = 16; // Allocator for the hash buckets - internal MallocFixedPageSize overflowBucketsAllocator = new MallocFixedPageSize(); + internal readonly MallocFixedPageSize overflowBucketsAllocator; // An array of size two, that contains the old and new versions of the hash-table internal InternalHashTable[] state = new InternalHashTable[2]; @@ -274,13 +274,15 @@ public unsafe partial class FasterBase /// public FasterBase() { - epoch = LightEpoch.Instance; + epoch = new LightEpoch(); + overflowBucketsAllocator = new MallocFixedPageSize(false, epoch); } internal Status Free() { Free(0); Free(1); + epoch.Dispose(); overflowBucketsAllocator.Dispose(); return Status.OK; } @@ -731,7 +733,7 @@ protected virtual long GetEntryCount() /// /// /// - protected virtual void _DumpDistribution(int version) + protected virtual string _DumpDistribution(int version) { var table_size_ = state[version].size; var ptable_ = state[version].tableAligned; @@ -766,23 +768,25 @@ protected virtual void _DumpDistribution(int version) histogram[cnt]++; } - Console.WriteLine("Number of hash buckets: {0}", table_size_); - Console.WriteLine("Total distinct hash-table entry count: {0}", total_record_count); - Console.WriteLine("Average #entries per hash bucket: {0:0.00}", total_record_count / (double)table_size_); - Console.WriteLine("Histogram of #entries per bucket: "); - + var distribution = + $"Number of hash buckets: {{{table_size_}}}\n" + + $"Total distinct hash-table entry count: {{{total_record_count}}}\n" + + $"Average #entries per hash bucket: {{{total_record_count / (double)table_size_:0.00}}}\n" + + $"Histogram of #entries per bucket:\n"; foreach (var kvp in histogram.OrderBy(e => e.Key)) { - Console.WriteLine(kvp.Key.ToString() + ": " + kvp.Value.ToString(CultureInfo.InvariantCulture)); + distribution += $" {kvp.Key} : {kvp.Value}\n"; } + + return distribution; } /// /// Dumps the distribution of each non-empty bucket in the hash table. /// - public void DumpDistribution() + public string DumpDistribution() { - _DumpDistribution(resizeInfo.version); + return _DumpDistribution(resizeInfo.version); } } diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 715e96968..7525c114a 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -76,7 +76,7 @@ internal OperationStatus InternalRead( var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); #region Trace back for record in in-memory HybridLog @@ -88,7 +88,7 @@ internal OperationStatus InternalRead( if (UseReadCache && ReadFromCache(ref key, ref logicalAddress, ref physicalAddress, ref latestRecordVersion)) { - if (threadCtx.phase == Phase.PREPARE && latestRecordVersion != -1 && latestRecordVersion > threadCtx.version) + if (threadCtx.Value.phase == Phase.PREPARE && latestRecordVersion != -1 && latestRecordVersion > threadCtx.Value.version) { status = OperationStatus.CPR_SHIFT_DETECTED; goto CreatePendingContext; // Pivot thread @@ -121,13 +121,13 @@ internal OperationStatus InternalRead( } #endregion - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) { - switch (threadCtx.phase) + switch (threadCtx.Value.phase) { case Phase.PREPARE: { - if (latestRecordVersion != -1 && latestRecordVersion > threadCtx.version) + if (latestRecordVersion != -1 && latestRecordVersion > threadCtx.Value.version) { status = OperationStatus.CPR_SHIFT_DETECTED; goto CreatePendingContext; // Pivot thread @@ -173,7 +173,7 @@ internal OperationStatus InternalRead( { status = OperationStatus.RECORD_ON_DISK; - if (threadCtx.phase == Phase.PREPARE) + if (threadCtx.Value.phase == Phase.PREPARE) { if (!HashBucket.TryAcquireSharedLatch(bucket)) { @@ -203,8 +203,8 @@ internal OperationStatus InternalRead( pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; - pendingContext.version = threadCtx.version; - pendingContext.serialNum = threadCtx.serialNum + 1; + pendingContext.version = threadCtx.Value.version; + pendingContext.serialNum = threadCtx.Value.serialNum + 1; } #endregion @@ -414,7 +414,7 @@ internal OperationStatus InternalUpsert( var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); #region Trace back for record in in-memory HybridLog @@ -444,20 +444,20 @@ internal OperationStatus InternalUpsert( #endregion // Optimization for most common case - if (threadCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone) + if (threadCtx.Value.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone) { functions.ConcurrentWriter(ref key, ref value, ref hlog.GetValue(physicalAddress)); return OperationStatus.SUCCESS; } #region Entry latch operation - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) { - switch (threadCtx.phase) + switch (threadCtx.Value.phase) { case Phase.PREPARE: { - version = threadCtx.version; + version = threadCtx.Value.version; if (HashBucket.TryAcquireSharedLatch(bucket)) { // Set to release shared latch (default) @@ -477,7 +477,7 @@ internal OperationStatus InternalUpsert( } case Phase.IN_PROGRESS: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { if (HashBucket.TryAcquireExclusiveLatch(bucket)) @@ -496,7 +496,7 @@ internal OperationStatus InternalUpsert( } case Phase.WAIT_PENDING: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { if (HashBucket.NoSharedLatches(bucket)) @@ -513,7 +513,7 @@ internal OperationStatus InternalUpsert( } case Phase.WAIT_FLUSH: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { goto CreateNewRecord; // Create a (v+1) record @@ -526,7 +526,7 @@ internal OperationStatus InternalUpsert( } #endregion - Debug.Assert(latestRecordVersion <= threadCtx.version); + Debug.Assert(latestRecordVersion <= threadCtx.Value.version); #region Normal processing @@ -549,7 +549,7 @@ internal OperationStatus InternalUpsert( BlockAllocate(recordSize, out long newLogicalAddress); var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), - threadCtx.version, + threadCtx.Value.version, true, false, false, latestLogicalAddress); hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress)); @@ -590,8 +590,8 @@ internal OperationStatus InternalUpsert( pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; - pendingContext.version = threadCtx.version; - pendingContext.serialNum = threadCtx.serialNum + 1; + pendingContext.version = threadCtx.Value.version; + pendingContext.serialNum = threadCtx.Value.serialNum + 1; } #endregion @@ -678,7 +678,7 @@ internal OperationStatus InternalRMW( var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); #region Trace back for record in in-memory HybridLog @@ -708,20 +708,20 @@ internal OperationStatus InternalRMW( #endregion // Optimization for the most common case - if (threadCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone) + if (threadCtx.Value.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone) { functions.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress)); return OperationStatus.SUCCESS; } #region Entry latch operation - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) { - switch (threadCtx.phase) + switch (threadCtx.Value.phase) { case Phase.PREPARE: { - version = threadCtx.version; + version = threadCtx.Value.version; if (HashBucket.TryAcquireSharedLatch(bucket)) { // Set to release shared latch (default) @@ -741,7 +741,7 @@ internal OperationStatus InternalRMW( } case Phase.IN_PROGRESS: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion <= version) { if (HashBucket.TryAcquireExclusiveLatch(bucket)) @@ -760,7 +760,7 @@ internal OperationStatus InternalRMW( } case Phase.WAIT_PENDING: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { if (HashBucket.NoSharedLatches(bucket)) @@ -777,7 +777,7 @@ internal OperationStatus InternalRMW( } case Phase.WAIT_FLUSH: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { goto CreateNewRecord; // Create a (v+1) record @@ -790,7 +790,7 @@ internal OperationStatus InternalRMW( } #endregion - Debug.Assert(latestRecordVersion <= threadCtx.version); + Debug.Assert(latestRecordVersion <= threadCtx.Value.version); #region Normal processing @@ -799,7 +799,7 @@ internal OperationStatus InternalRMW( { if (FoldOverSnapshot) { - Debug.Assert(hlog.GetInfo(physicalAddress).Version == threadCtx.version); + Debug.Assert(hlog.GetInfo(physicalAddress).Version == threadCtx.Value.version); } functions.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress)); status = OperationStatus.SUCCESS; @@ -853,7 +853,7 @@ internal OperationStatus InternalRMW( BlockAllocate(recordSize, out long newLogicalAddress); var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); ref RecordInfo recordInfo = ref hlog.GetInfo(newPhysicalAddress); - RecordInfo.WriteInfo(ref recordInfo, threadCtx.version, + RecordInfo.WriteInfo(ref recordInfo, threadCtx.Value.version, true, false, false, latestLogicalAddress); hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress)); @@ -919,8 +919,8 @@ ref hlog.GetValue(physicalAddress), pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; - pendingContext.version = threadCtx.version; - pendingContext.serialNum = threadCtx.serialNum + 1; + pendingContext.version = threadCtx.Value.version; + pendingContext.serialNum = threadCtx.Value.serialNum + 1; } #endregion @@ -994,7 +994,7 @@ internal OperationStatus InternalRetryPendingRMW( var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); #region Trace back for record in in-memory HybridLog @@ -1024,15 +1024,15 @@ internal OperationStatus InternalRetryPendingRMW( #endregion #region Entry latch operation - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) { - if (!((ctx.version < threadCtx.version) + if (!((ctx.version < threadCtx.Value.version) || - (threadCtx.phase == Phase.PREPARE))) + (threadCtx.Value.phase == Phase.PREPARE))) { // Processing a pending (v+1) request - version = (threadCtx.version - 1); - switch (threadCtx.phase) + version = (threadCtx.Value.version - 1); + switch (threadCtx.Value.phase) { case Phase.IN_PROGRESS: { @@ -1090,7 +1090,7 @@ internal OperationStatus InternalRetryPendingRMW( { if (FoldOverSnapshot) { - Debug.Assert(hlog.GetInfo(physicalAddress).Version == threadCtx.version); + Debug.Assert(hlog.GetInfo(physicalAddress).Version == threadCtx.Value.version); } functions.InPlaceUpdater(ref pendingContext.key, ref pendingContext.input, ref hlog.GetValue(physicalAddress)); status = OperationStatus.SUCCESS; @@ -1401,7 +1401,7 @@ internal OperationStatus InternalDelete( var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); #region Trace back for record in in-memory HybridLog @@ -1434,20 +1434,20 @@ internal OperationStatus InternalDelete( #endregion // NO optimization for most common case - //if (threadCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress) + //if (threadCtx.Value.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress) //{ // hlog.GetInfo(physicalAddress).Tombstone = true; // return OperationStatus.SUCCESS; //} #region Entry latch operation - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) { - switch (threadCtx.phase) + switch (threadCtx.Value.phase) { case Phase.PREPARE: { - version = threadCtx.version; + version = threadCtx.Value.version; if (HashBucket.TryAcquireSharedLatch(bucket)) { // Set to release shared latch (default) @@ -1467,7 +1467,7 @@ internal OperationStatus InternalDelete( } case Phase.IN_PROGRESS: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { if (HashBucket.TryAcquireExclusiveLatch(bucket)) @@ -1486,7 +1486,7 @@ internal OperationStatus InternalDelete( } case Phase.WAIT_PENDING: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { if (HashBucket.NoSharedLatches(bucket)) @@ -1503,7 +1503,7 @@ internal OperationStatus InternalDelete( } case Phase.WAIT_FLUSH: { - version = (threadCtx.version - 1); + version = (threadCtx.Value.version - 1); if (latestRecordVersion != -1 && latestRecordVersion <= version) { goto CreateNewRecord; // Create a (v+1) record @@ -1516,7 +1516,7 @@ internal OperationStatus InternalDelete( } #endregion - Debug.Assert(latestRecordVersion <= threadCtx.version); + Debug.Assert(latestRecordVersion <= threadCtx.Value.version); #region Normal processing @@ -1573,7 +1573,7 @@ internal OperationStatus InternalDelete( BlockAllocate(recordSize, out long newLogicalAddress); var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), - threadCtx.version, + threadCtx.Value.version, true, true, false, latestLogicalAddress); hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress)); @@ -1612,8 +1612,8 @@ internal OperationStatus InternalDelete( pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; - pendingContext.version = threadCtx.version; - pendingContext.serialNum = threadCtx.serialNum + 1; + pendingContext.version = threadCtx.Value.version; + pendingContext.serialNum = threadCtx.Value.serialNum + 1; } #endregion @@ -1673,7 +1673,7 @@ public Status ContainsKeyInMemory(ref Key key, long fromAddress = -1) var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (threadCtx.phase != Phase.REST) + if (threadCtx.Value.phase != Phase.REST) HeavyEnter(hash); HashBucketEntry entry = default(HashBucketEntry); @@ -1751,13 +1751,13 @@ internal Status HandleOperationStatus( { #region Epoch Synchronization var version = ctx.version; - Debug.Assert(threadCtx.version == version); - Debug.Assert(threadCtx.phase == Phase.PREPARE); + Debug.Assert(threadCtx.Value.version == version); + Debug.Assert(threadCtx.Value.phase == Phase.PREPARE); Refresh(); - Debug.Assert(threadCtx.version == version + 1); - Debug.Assert(threadCtx.phase == Phase.IN_PROGRESS); + Debug.Assert(threadCtx.Value.version == version + 1); + Debug.Assert(threadCtx.Value.phase == Phase.IN_PROGRESS); - pendingContext.version = threadCtx.version; + pendingContext.version = threadCtx.Value.version; #endregion #region Retry as (v+1) Operation @@ -1783,7 +1783,7 @@ internal Status HandleOperationStatus( ref pendingContext); break; case OperationType.RMW: - internalStatus = InternalRetryPendingRMW(threadCtx, ref pendingContext); + internalStatus = InternalRetryPendingRMW(threadCtx.Value, ref pendingContext); break; } @@ -1850,9 +1850,9 @@ private void ReleaseSharedLatch(Key key) private void HeavyEnter(long hash) { - if (threadCtx.phase == Phase.GC) + if (threadCtx.Value.phase == Phase.GC) GarbageCollectBuckets(hash); - if (threadCtx.phase == Phase.PREPARE_GROW) + if (threadCtx.Value.phase == Phase.PREPARE_GROW) { // We spin-wait as a simplification // Could instead do a "heavy operation" here @@ -1860,7 +1860,7 @@ private void HeavyEnter(long hash) Thread.SpinWait(100); Refresh(); } - if (threadCtx.phase == Phase.IN_PROGRESS_GROW) + if (threadCtx.Value.phase == Phase.IN_PROGRESS_GROW) { SplitBuckets(hash); } diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 17e314403..e5dcc2d62 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -18,19 +18,27 @@ public unsafe partial class FasterKV(), - readyResponses = new BlockingCollection>(), - ioPendingRequests = new Dictionary() - }; + var ctx = + new FasterExecutionContext + { + phase = Phase.REST, + version = _systemState.version, + markers = new bool[8], + serialNum = 0, + totalPending = 0, + guid = token, + retryRequests = new Queue(), + readyResponses = new BlockingCollection>(), + ioPendingRequests = new Dictionary() + }; for(int i = 0; i < 8; i++) { - context.markers[i] = false; + ctx.markers[i] = false; } + + threadCtx.Value = ctx; } internal bool InternalCompletePending(bool wait = false) @@ -139,30 +151,30 @@ internal bool InternalCompletePending(bool wait = false) bool done = true; #region Previous pending requests - if (threadCtx.phase == Phase.IN_PROGRESS + if (threadCtx.Value.phase == Phase.IN_PROGRESS || - threadCtx.phase == Phase.WAIT_PENDING) + threadCtx.Value.phase == Phase.WAIT_PENDING) { - CompleteIOPendingRequests(prevThreadCtx); + CompleteIOPendingRequests(prevThreadCtx.Value); Refresh(); - CompleteRetryRequests(prevThreadCtx); + CompleteRetryRequests(prevThreadCtx.Value); - done &= (prevThreadCtx.ioPendingRequests.Count == 0); - done &= (prevThreadCtx.retryRequests.Count == 0); + done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0); + done &= (prevThreadCtx.Value.retryRequests.Count == 0); } #endregion - if (!(threadCtx.phase == Phase.IN_PROGRESS + if (!(threadCtx.Value.phase == Phase.IN_PROGRESS || - threadCtx.phase == Phase.WAIT_PENDING)) + threadCtx.Value.phase == Phase.WAIT_PENDING)) { - CompleteIOPendingRequests(threadCtx); + CompleteIOPendingRequests(threadCtx.Value); } InternalRefresh(); - CompleteRetryRequests(threadCtx); + CompleteRetryRequests(threadCtx.Value); - done &= (threadCtx.ioPendingRequests.Count == 0); - done &= (threadCtx.retryRequests.Count == 0); + done &= (threadCtx.Value.ioPendingRequests.Count == 0); + done &= (threadCtx.Value.retryRequests.Count == 0); if (done) { @@ -202,9 +214,9 @@ internal void InternalRetryRequestAndCallback( #region Entry latch operation var handleLatches = false; - if ((ctx.version < threadCtx.version) // Thread has already shifted to (v+1) + if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1) || - (threadCtx.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch + (threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch { handleLatches = true; } @@ -276,9 +288,9 @@ internal void InternalContinuePendingRequestAndCallback( AsyncIOContext request) { var handleLatches = false; - if ((ctx.version < threadCtx.version) // Thread has already shifted to (v+1) + if ((ctx.version < threadCtx.Value.version) // Thread has already shifted to (v+1) || - (threadCtx.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch + (threadCtx.Value.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch { handleLatches = true; } diff --git a/cs/src/core/Index/FASTER/LogAccessor.cs b/cs/src/core/Index/FASTER/LogAccessor.cs index 29138d16f..7991ded51 100644 --- a/cs/src/core/Index/FASTER/LogAccessor.cs +++ b/cs/src/core/Index/FASTER/LogAccessor.cs @@ -169,7 +169,10 @@ public void Compact(long untilAddress) tempKv.Upsert(ref key, ref value, default(Context), 0); if (++cnt % 1000 == 0) + { fht.Refresh(); + tempKv.Refresh(); + } } } @@ -190,8 +193,10 @@ public void Compact(long untilAddress) fht.Upsert(ref key, ref value, default(Context), 0); } if (++cnt % 1000 == 0) + { fht.Refresh(); - + tempKv.Refresh(); + } if (scanUntil < fht.Log.SafeReadOnlyAddress) { LogScanForValidity(ref untilAddress, ref scanUntil, ref tempKv); @@ -218,7 +223,10 @@ private void LogScanForValidity(ref long untilAddress, ref long scanUntil, ref F tempKv.Delete(ref key, default(Context), 0); if (++cnt % 1000 == 0) + { fht.Refresh(); + tempKv.Refresh(); + } } } fht.Refresh(); diff --git a/cs/src/core/Index/FASTER/Recovery.cs b/cs/src/core/Index/FASTER/Recovery.cs index 5b2d43d63..5ac34edc3 100644 --- a/cs/src/core/Index/FASTER/Recovery.cs +++ b/cs/src/core/Index/FASTER/Recovery.cs @@ -22,6 +22,7 @@ internal class RecoveryStatus { public long startPage; public long endPage; + public long untilAddress; public int capacity; public IDevice recoveryDevice; @@ -33,11 +34,12 @@ internal class RecoveryStatus public RecoveryStatus(int capacity, long startPage, - long endPage) + long endPage, long untilAddress) { this.capacity = capacity; this.startPage = startPage; this.endPage = endPage; + this.untilAddress = untilAddress; readStatus = new ReadStatus[capacity]; flushStatus = new FlushStatus[capacity]; for (int i = 0; i < capacity; i++) @@ -208,7 +210,7 @@ private void RestoreHybridLog(long untilAddress) } headPage = headPage > 0 ? headPage : 0; - var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage); + var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage, untilAddress); for (int i = 0; i < recoveryStatus.capacity; i++) { recoveryStatus.readStatus[i] = ReadStatus.Done; @@ -222,7 +224,7 @@ private void RestoreHybridLog(long untilAddress) numPages++; } - hlog.AsyncReadPagesFromDevice(headPage, numPages, AsyncReadPagesCallbackForRecovery, recoveryStatus); + hlog.AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); var done = false; while (!done) @@ -259,13 +261,13 @@ private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, // By default first page has one extra record var capacity = hlog.GetCapacityNumPages(); - var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage); + var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress); int totalPagesToRead = (int)(endPage - startPage); int numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); // Issue request to read pages as much as possible - hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, AsyncReadPagesCallbackForRecovery, recoveryStatus); + hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); for (long page = startPage; page < endPage; page++) { @@ -343,7 +345,7 @@ private void RecoverHybridLogFromSnapshotFile( var objectLogRecoveryDevice = Devices.CreateLogDevice(directoryConfiguration.GetHybridLogObjectCheckpointFileName(recoveryInfo.guid), false); recoveryDevice.Initialize(hlog.GetSegmentSize()); objectLogRecoveryDevice.Initialize(hlog.GetSegmentSize()); - var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage) + var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress) { recoveryDevice = recoveryDevice, objectLogRecoveryDevice = objectLogRecoveryDevice, @@ -354,7 +356,7 @@ private void RecoverHybridLogFromSnapshotFile( int totalPagesToRead = (int)(endPage - startPage); int numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); - hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, + hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus, recoveryStatus.recoveryDevicePageOffset, @@ -533,11 +535,11 @@ private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, Na long readPage = result.page + result.context.capacity; if (FoldOverSnapshot) { - hlog.AsyncReadPagesFromDevice(readPage, 1, AsyncReadPagesCallbackForRecovery, result.context); + hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, AsyncReadPagesCallbackForRecovery, result.context); } else { - hlog.AsyncReadPagesFromDevice(readPage, 1, AsyncReadPagesCallbackForRecovery, + hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, AsyncReadPagesCallbackForRecovery, result.context, result.context.recoveryDevicePageOffset, result.context.recoveryDevice, result.context.objectLogRecoveryDevice); diff --git a/cs/src/core/Index/Interfaces/IFasterKV.cs b/cs/src/core/Index/Interfaces/IFasterKV.cs index d1eb1fd90..8d995a3c0 100644 --- a/cs/src/core/Index/Interfaces/IFasterKV.cs +++ b/cs/src/core/Index/Interfaces/IFasterKV.cs @@ -162,9 +162,9 @@ public interface IFasterKV : IDisposable IFasterEqualityComparer Comparer { get; } /// - /// Dump distribution of #entries in hash table, to console + /// Dump distribution of #entries in hash table /// - void DumpDistribution(); + string DumpDistribution(); /// /// Experimental feature diff --git a/cs/src/core/Utilities/BufferPool.cs b/cs/src/core/Utilities/BufferPool.cs index 332d6ff36..872eb8d4e 100644 --- a/cs/src/core/Utilities/BufferPool.cs +++ b/cs/src/core/Utilities/BufferPool.cs @@ -97,51 +97,16 @@ public override string ToString() /// public class SectorAlignedBufferPool { + /// + /// Disable buffer pool + /// + public static bool Disabled = false; + private const int levels = 32; private readonly int recordSize; private readonly int sectorSize; private readonly ConcurrentQueue[] queue; - private static SafeConcurrentDictionary, SectorAlignedBufferPool> _instances - = new SafeConcurrentDictionary, SectorAlignedBufferPool>(); - - /// - /// Clear buffer pool - /// - public static void Clear() - { - foreach (var pool in _instances.Values) - { - pool.Free(); - } - _instances.Clear(); - } - - /// - /// Print contents of buffer pool - /// - public static void PrintAll() - { - foreach (var kvp in _instances) - { - Console.WriteLine("Pool Key: {0}", kvp.Key); - kvp.Value.Print(); - } - } - - /// - /// Get cached instance of buffer pool for specified params - /// - /// Record size - /// Sector size - /// - public static SectorAlignedBufferPool GetPool(int recordSize, int sectorSize) - { - return - _instances.GetOrAdd(new Tuple(recordSize, sectorSize), - t => new SectorAlignedBufferPool(t.Item1, t.Item2)); - } - /// /// Constructor /// @@ -159,14 +124,15 @@ public SectorAlignedBufferPool(int recordSize, int sectorSize) /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public unsafe void Return(SectorAlignedMemory page) + public void Return(SectorAlignedMemory page) { Debug.Assert(queue[page.level] != null); page.available_bytes = 0; page.required_bytes = 0; page.valid_offset = 0; Array.Clear(page.buffer, 0, page.buffer.Length); - queue[page.level].Enqueue(page); + if (!Disabled) + queue[page.level].Enqueue(page); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -201,14 +167,16 @@ public unsafe SectorAlignedMemory Get(int numRecords) Interlocked.CompareExchange(ref queue[index], localPool, null); } - if (queue[index].TryDequeue(out SectorAlignedMemory page)) + if (!Disabled && queue[index].TryDequeue(out SectorAlignedMemory page)) { return page; } - page = new SectorAlignedMemory(); - page.level = index; - page.buffer = new byte[sectorSize * (1 << index)]; + page = new SectorAlignedMemory + { + level = index, + buffer = new byte[sectorSize * (1 << index)] + }; page.handle = GCHandle.Alloc(page.buffer, GCHandleType.Pinned); page.aligned_pointer = (byte*)(((long)page.handle.AddrOfPinnedObject() + (sectorSize - 1)) & ~(sectorSize - 1)); page.offset = (int) ((long)page.aligned_pointer - (long)page.handle.AddrOfPinnedObject()); diff --git a/cs/src/core/Utilities/PageAsyncResultTypes.cs b/cs/src/core/Utilities/PageAsyncResultTypes.cs index 22545626b..5a8792ce7 100644 --- a/cs/src/core/Utilities/PageAsyncResultTypes.cs +++ b/cs/src/core/Utilities/PageAsyncResultTypes.cs @@ -14,28 +14,20 @@ namespace FASTER.core /// public class PageAsyncReadResult : IAsyncResult { - /// - /// Page - /// - public long page; - /// - /// Context - /// - public TContext context; - /// - /// Count - /// - public int count; - + internal long page; + internal TContext context; internal CountdownEvent handle; internal SectorAlignedMemory freeBuffer1; internal SectorAlignedMemory freeBuffer2; internal IOCompletionCallback callback; internal IDevice objlogDevice; - internal long resumeptr; - internal long untilptr; internal object frame; + /* Used for iteration */ + internal long resumePtr; + internal long untilPtr; + internal long maxPtr; + /// /// /// diff --git a/cs/test/ComponentRecoveryTests.cs b/cs/test/ComponentRecoveryTests.cs index 9549b90f2..5a192257e 100644 --- a/cs/test/ComponentRecoveryTests.cs +++ b/cs/test/ComponentRecoveryTests.cs @@ -22,6 +22,7 @@ public unsafe void MallocFixedPageSizeRecoveryTest() var rand1 = new Random(seed); var device = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\MallocFixedPageSizeRecoveryTest.dat", deleteOnClose: true); var allocator = new MallocFixedPageSize(); + allocator.Acquire(); //do something int numBucketsToAdd = 16 * allocator.GetPageSize(); @@ -42,8 +43,12 @@ public unsafe void MallocFixedPageSizeRecoveryTest() //wait until complete allocator.IsCheckpointCompleted(true); + allocator.Release(); + allocator.Dispose(); var recoveredAllocator = new MallocFixedPageSize(); + recoveredAllocator.Acquire(); + //issue call to recover recoveredAllocator.BeginRecovery(device, 0, numBucketsToAdd, numBytesWritten, out ulong numBytesRead); //wait until complete @@ -61,6 +66,9 @@ public unsafe void MallocFixedPageSizeRecoveryTest() Assert.IsTrue(bucket->bucket_entries[j] == rand2.Next()); } } + + recoveredAllocator.Release(); + recoveredAllocator.Dispose(); } [Test] @@ -135,6 +143,9 @@ public unsafe void TestFuzzyIndexRecovery() Assert.IsTrue(entry1.word == entry2.word); } } + + hash_table1.Free(); + hash_table2.Free(); } } } diff --git a/cs/test/FullRecoveryTests.cs b/cs/test/FullRecoveryTests.cs index 872030035..12d91aded 100644 --- a/cs/test/FullRecoveryTests.cs +++ b/cs/test/FullRecoveryTests.cs @@ -78,6 +78,8 @@ public static void DeleteDirectory(string path) public void RecoveryTest1() { Populate(); + fht.Dispose(); + fht = null; log.Close(); Setup(); RecoverAndTest(token, token); diff --git a/cs/test/ObjectRecoveryTest.cs b/cs/test/ObjectRecoveryTest.cs index 12b2d1ce9..670afbce7 100644 --- a/cs/test/ObjectRecoveryTest.cs +++ b/cs/test/ObjectRecoveryTest.cs @@ -87,6 +87,8 @@ public static void DeleteDirectory(string path) public void ObjectRecoveryTest1() { Populate(); + fht.Dispose(); + fht = null; log.Close(); objlog.Close(); Setup();