From 88d72698482233e4ed7db0737513380933e5c9ce Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 18 Sep 2019 20:01:49 -0700 Subject: [PATCH] Added commit and recovery support. --- cs/playground/FasterLogSample/Program.cs | 10 + cs/src/core/Allocator/AllocatorBase.cs | 22 ++- cs/src/core/Allocator/BlittableAllocator.cs | 15 +- cs/src/core/Allocator/GenericAllocator.cs | 6 +- .../Allocator/VarLenBlittableAllocator.cs | 13 +- cs/src/core/Index/FasterLog/FasterLog.cs | 72 ++++++- .../core/Index/FasterLog/FasterLogSettings.cs | 101 ++++++++++ .../core/Index/FasterLog/ILogCommitManager.cs | 25 +++ .../Index/FasterLog/LocalLogCommitManager.cs | 62 ++++++ cs/src/core/Index/Recovery/Recovery.cs | 177 ++++++++++-------- cs/test/FasterLogTests.cs | 2 +- 11 files changed, 402 insertions(+), 103 deletions(-) create mode 100644 cs/src/core/Index/FasterLog/ILogCommitManager.cs create mode 100644 cs/src/core/Index/FasterLog/LocalLogCommitManager.cs diff --git a/cs/playground/FasterLogSample/Program.cs b/cs/playground/FasterLogSample/Program.cs index 631848f44..0ca4005b4 100644 --- a/cs/playground/FasterLogSample/Program.cs +++ b/cs/playground/FasterLogSample/Program.cs @@ -33,6 +33,15 @@ static void ReportThread() } } + static void CommitThread() + { + while (true) + { + Thread.Sleep(100); + log.FlushAndCommit(true); + } + } + static void AppendThread() { byte[] entry = new byte[entryLength]; @@ -90,6 +99,7 @@ static void Main(string[] args) new Thread(new ThreadStart(AppendThread)).Start(); new Thread(new ThreadStart(ScanThread)).Start(); new Thread(new ThreadStart(ReportThread)).Start(); + new Thread(new ThreadStart(CommitThread)).Start(); Thread.Sleep(500*1000); } diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 371cfad69..6365b7198 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -49,7 +49,7 @@ internal struct PageOffset /// /// /// - public unsafe abstract class AllocatorBase : IDisposable + public unsafe abstract partial class AllocatorBase : IDisposable where Key : new() where Value : new() { @@ -221,6 +221,11 @@ public unsafe abstract class AllocatorBase : IDisposable /// protected readonly Action EvictCallback = null; + /// + /// Flush callback + /// + protected readonly Action FlushCallback = null; + /// /// Observer for records entering read-only region /// @@ -380,7 +385,8 @@ public unsafe abstract class AllocatorBase : IDisposable /// Clear page /// /// Page number to be cleared - protected abstract void ClearPage(long page); + /// Offset to clear from (if partial clear) + protected abstract void ClearPage(long page, int offset = 0); /// /// Write page (async) /// @@ -469,13 +475,15 @@ public unsafe abstract class AllocatorBase : IDisposable /// /// /// - public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback, LightEpoch epoch) + /// + public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback, LightEpoch epoch, Action flushCallback) { if (evictCallback != null) { ReadCache = true; EvictCallback = evictCallback; } + FlushCallback = flushCallback; this.comparer = comparer; if (epoch == null) @@ -1100,7 +1108,10 @@ protected void ShiftFlushedUntilAddress() if (update) { - Utility.MonotonicUpdate(ref FlushedUntilAddress, currentFlushedUntilAddress, out long oldFlushedUntilAddress); + if (Utility.MonotonicUpdate(ref FlushedUntilAddress, currentFlushedUntilAddress, out long oldFlushedUntilAddress)) + { + FlushCallback?.Invoke(FlushedUntilAddress); + } } } @@ -1146,6 +1157,9 @@ public void RecoveryReset(long tailAddress, long headAddress, long beginAddress) PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Open; PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; + // clear the last page starting from tail address + ClearPage(pageIndex, (int)GetOffsetInPage(tailAddress)); + // Printing debug info Debug.WriteLine("******* Recovered HybridLog Stats *******"); Debug.WriteLine("Head Address: {0}", HeadAddress); diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs index e3339ff6b..6e80ccb78 100644 --- a/cs/src/core/Allocator/BlittableAllocator.cs +++ b/cs/src/core/Allocator/BlittableAllocator.cs @@ -31,8 +31,8 @@ public unsafe sealed class BlittableAllocator : AllocatorBase comparer, Action evictCallback = null, LightEpoch epoch = null) - : base(settings, comparer, evictCallback, epoch) + public BlittableAllocator(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null, Action flushCallback = null) + : base(settings, comparer, evictCallback, epoch, flushCallback) { values = new byte[BufferSize][]; handles = new GCHandle[BufferSize]; @@ -198,9 +198,16 @@ public override long GetFirstValidLogicalAddress(long page) return page << LogPageSizeBits; } - protected override void ClearPage(long page) + protected override void ClearPage(long page, int offset) { - Array.Clear(values[page % BufferSize], 0, values[page % BufferSize].Length); + if (offset == 0) + Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); + else + { + // Adjust array offset for cache alignment + offset += (int)(pointers[page % BufferSize] - (long)handles[page % BufferSize].AddrOfPinnedObject()); + Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); + } } /// diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index ccea80368..0b6b7983d 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -42,7 +42,7 @@ public unsafe sealed class GenericAllocator : AllocatorBase(); public GenericAllocator(LogSettings settings, SerializerSettings serializerSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null) - : base(settings, comparer, evictCallback, epoch) + : base(settings, comparer, evictCallback, epoch, null) { SerializerSettings = serializerSettings; @@ -254,9 +254,9 @@ protected override void WriteAsyncToDevice - protected override void ClearPage(long page) + protected override void ClearPage(long page, int offset) { - Array.Clear(values[page % BufferSize], 0, values[page % BufferSize].Length); + Array.Clear(values[page % BufferSize], offset / recordSize, values[page % BufferSize].Length - offset / recordSize); // Close segments var thisCloseSegment = page >> (LogSegmentSizeBits - LogPageSizeBits); diff --git a/cs/src/core/Allocator/VarLenBlittableAllocator.cs b/cs/src/core/Allocator/VarLenBlittableAllocator.cs index a7587f685..b5057d8cd 100644 --- a/cs/src/core/Allocator/VarLenBlittableAllocator.cs +++ b/cs/src/core/Allocator/VarLenBlittableAllocator.cs @@ -34,7 +34,7 @@ public unsafe sealed class VariableLengthBlittableAllocator : Alloca internal readonly IVariableLengthStruct ValueLength; public VariableLengthBlittableAllocator(LogSettings settings, VariableLengthStructSettings vlSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null) - : base(settings, comparer, evictCallback, epoch) + : base(settings, comparer, evictCallback, epoch, null) { values = new byte[BufferSize][]; handles = new GCHandle[BufferSize]; @@ -282,9 +282,16 @@ public override long GetFirstValidLogicalAddress(long page) return page << LogPageSizeBits; } - protected override void ClearPage(long page) + protected override void ClearPage(long page, int offset) { - Array.Clear(values[page % BufferSize], 0, values[page % BufferSize].Length); + if (offset == 0) + Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); + else + { + // Adjust array offset for cache alignment + offset += (int)(pointers[page % BufferSize] - (long)handles[page % BufferSize].AddrOfPinnedObject()); + Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); + } } /// diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs index 5435008ed..90c65041c 100644 --- a/cs/src/core/Index/FasterLog/FasterLog.cs +++ b/cs/src/core/Index/FasterLog/FasterLog.cs @@ -7,6 +7,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Runtime.CompilerServices; using System.Threading; @@ -32,19 +33,76 @@ public class FasterLog : IDisposable public long TailAddress => allocator.GetTailAddress(); /// - /// Flushed until address + /// Log flushed until address /// public long FlushedUntilAddress => allocator.FlushedUntilAddress; + /// + /// Log commit until address + /// + public long CommitUntilAddress; + + private ILogCommitManager logCommitManager; + /// /// Create new log instance /// /// public FasterLog(FasterLogSettings logSettings) { + logCommitManager = logSettings.LogCommitManager ?? + new LocalLogCommitManager(logSettings.LogCommitFile ?? + logSettings.LogDevice.FileName + ".commit"); + epoch = new LightEpoch(); - allocator = new BlittableAllocator(logSettings.GetLogSettings(), null, null, epoch); + allocator = new BlittableAllocator( + logSettings.GetLogSettings(), null, + null, epoch, e => Commit(e)); allocator.Initialize(); + Restore(); + } + + /// + /// Commit log + /// + private void Commit(long flushAddress) + { + epoch.Resume(); + FasterLogRecoveryInfo info = new FasterLogRecoveryInfo(); + info.FlushedUntilAddress = allocator.FlushedUntilAddress; + info.BeginAddress = allocator.BeginAddress; + epoch.Suspend(); + + // We can only allow serial monotonic synchronous commit + lock (this) + { + if (flushAddress > CommitUntilAddress) + { + logCommitManager.Commit(info.ToByteArray()); + CommitUntilAddress = flushAddress; + info.DebugPrint(); + } + } + } + + /// + /// Restore log + /// + private void Restore() + { + FasterLogRecoveryInfo info = new FasterLogRecoveryInfo(); + var commitInfo = logCommitManager.GetCommitMetadata(); + + if (commitInfo == null) return; + + using (var r = new BinaryReader(new MemoryStream(commitInfo))) + { + info.Initialize(r); + } + + allocator.RestoreHybridLog(info.FlushedUntilAddress, + info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress), + info.BeginAddress); } /// @@ -117,16 +175,20 @@ public unsafe long Append(List entries) /// /// Flush the log until tail /// - public long Flush(bool spinWait = false) + public long FlushAndCommit(bool spinWait = false) { epoch.Resume(); allocator.ShiftReadOnlyToTail(out long tailAddress); - epoch.Suspend(); + if (spinWait) { - while (allocator.FlushedUntilAddress < tailAddress) + while (CommitUntilAddress < tailAddress) + { + epoch.ProtectAndDrain(); Thread.Yield(); + } } + epoch.Suspend(); return tailAddress; } diff --git a/cs/src/core/Index/FasterLog/FasterLogSettings.cs b/cs/src/core/Index/FasterLog/FasterLogSettings.cs index 1b70aa656..5969a034e 100644 --- a/cs/src/core/Index/FasterLog/FasterLogSettings.cs +++ b/cs/src/core/Index/FasterLog/FasterLogSettings.cs @@ -3,6 +3,10 @@ #pragma warning disable 0162 +using System; +using System.Diagnostics; +using System.IO; + namespace FASTER.core { /// @@ -30,6 +34,18 @@ public class FasterLogSettings /// public int MemorySizeBits = 26; + /// + /// Log commit manager + /// + public ILogCommitManager LogCommitManager = null; + + /// + /// Use specified directory for storing and retrieving checkpoints + /// This is a shortcut to providing the following: + /// FasterLogSettings.LogCommitManager = new LocalLogCommitManager(LogCommitFile) + /// + public string LogCommitFile = null; + internal LogSettings GetLogSettings() { return new LogSettings @@ -45,4 +61,89 @@ internal LogSettings GetLogSettings() }; } } + + /// + /// Recovery info for FASTER Log + /// + internal struct FasterLogRecoveryInfo + { + /// + /// Begin address + /// + public long BeginAddress; + + /// + /// Flushed logical address + /// + public long FlushedUntilAddress; + + + /// + /// Initialize + /// + public void Initialize() + { + BeginAddress = 0; + FlushedUntilAddress = 0; + } + + /// + /// Initialize from stream + /// + /// + public void Initialize(BinaryReader reader) + { + BeginAddress = reader.ReadInt64(); + FlushedUntilAddress = reader.ReadInt64(); + } + + /// + /// Recover info from token + /// + /// + /// + internal void Recover(ILogCommitManager logCommitManager) + { + var metadata = logCommitManager.GetCommitMetadata(); + if (metadata == null) + throw new Exception("Invalid log commit metadata during recovery"); + + Initialize(new BinaryReader(new MemoryStream(metadata))); + } + + /// + /// Reset + /// + public void Reset() + { + Initialize(); + } + + /// + /// Write info to byte array + /// + public byte[] ToByteArray() + { + using (var ms = new MemoryStream()) + { + using (var writer = new BinaryWriter(ms)) + { + writer.Write(BeginAddress); + writer.Write(FlushedUntilAddress); + } + return ms.ToArray(); + } + } + + /// + /// Print checkpoint info for debugging purposes + /// + public void DebugPrint() + { + Debug.WriteLine("******** Log Commit Info ********"); + + Debug.WriteLine("BeginAddress: {0}", BeginAddress); + Debug.WriteLine("FlushedUntilAddress: {0}", FlushedUntilAddress); + } + } } diff --git a/cs/src/core/Index/FasterLog/ILogCommitManager.cs b/cs/src/core/Index/FasterLog/ILogCommitManager.cs new file mode 100644 index 000000000..7c5f37de9 --- /dev/null +++ b/cs/src/core/Index/FasterLog/ILogCommitManager.cs @@ -0,0 +1,25 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.IO; + +namespace FASTER.core +{ + /// + /// Log commit manager + /// + public interface ILogCommitManager + { + /// + /// Perform (synchronous) commit with specified metadata + /// + /// + void Commit(byte[] commitMetadata); + + /// + /// Return prior commit metadata during recovery + /// + /// + byte[] GetCommitMetadata(); + } +} \ No newline at end of file diff --git a/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs b/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs new file mode 100644 index 000000000..d7bdba2a0 --- /dev/null +++ b/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.IO; + +namespace FASTER.core +{ + /// + /// Implementation of checkpoint interface for local file storage + /// + public class LocalLogCommitManager : ILogCommitManager + { + private string CommitFile; + + /// + /// Create new instance of local checkpoint manager at given base directory + /// + /// + public LocalLogCommitManager(string CommitFile) + { + this.CommitFile = CommitFile; + } + + /// + /// Commit log + /// + /// + public void Commit(byte[] commitMetadata) + { + // Two phase to ensure we write metadata in single Write operation + using (var ms = new MemoryStream()) + { + using (var writer = new BinaryWriter(ms)) + { + writer.Write(commitMetadata.Length); + writer.Write(commitMetadata); + } + using (var writer = new BinaryWriter(new FileStream(CommitFile, FileMode.OpenOrCreate))) + { + writer.Write(ms.ToArray()); + writer.Flush(); + } + } + } + + /// + /// Retrieve commit metadata + /// + /// Metadata, or null if invalid + public byte[] GetCommitMetadata() + { + if (!File.Exists(CommitFile)) + return null; + + using (var reader = new BinaryReader(new FileStream(CommitFile, FileMode.Open))) + { + var len = reader.ReadInt32(); + return reader.ReadBytes(len); + } + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index e3ffcffad..7ddd5db3f 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -89,7 +89,7 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken) recoveredHLCInfo.info.DebugPrint(); // Check if the two checkpoints are compatible for recovery - if(!IsCompatible(recoveredICInfo.info, recoveredHLCInfo.info)) + if (!IsCompatible(recoveredICInfo.info, recoveredHLCInfo.info)) { throw new Exception("Cannot recover from (" + indexToken.ToString() + "," + hybridLogToken.ToString() + ") checkpoint pair!\n"); } @@ -118,69 +118,15 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken) { RecoverHybridLogFromSnapshotFile(recoveredICInfo.info, recoveredHLCInfo.info); } - + // Read appropriate hybrid log pages into memory - RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.headAddress, recoveredHLCInfo.info.beginAddress); + hlog.RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.headAddress, recoveredHLCInfo.info.beginAddress); // Recover session information _recoveredSessions = recoveredHLCInfo.info.continueTokens; } - private void RestoreHybridLog(long untilAddress, long headAddress, long beginAddress) - { - Debug.Assert(beginAddress <= headAddress); - Debug.Assert(headAddress <= untilAddress); - - // Special cases: we do not load any records into memory - if ( - (beginAddress == untilAddress) || // Empty log - ((headAddress == untilAddress) && (hlog.GetOffsetInPage(headAddress) == 0)) // Empty in-memory page - ) - { - hlog.AllocatePage(hlog.GetPageIndexForAddress(headAddress)); - } - else - { - var tailPage = hlog.GetPage(untilAddress); - var headPage = hlog.GetPage(headAddress); - - var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage, untilAddress); - for (int i = 0; i < recoveryStatus.capacity; i++) - { - recoveryStatus.readStatus[i] = ReadStatus.Done; - } - - var numPages = 0; - for (var page = headPage; page <= tailPage; page++) - { - var pageIndex = hlog.GetPageIndexForPage(page); - recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; - numPages++; - } - - hlog.AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); - - var done = false; - while (!done) - { - done = true; - for (long page = headPage; page <= tailPage; page++) - { - int pageIndex = hlog.GetPageIndexForPage(page); - if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending) - { - done = false; - break; - } - } - } - } - - hlog.RecoveryReset(untilAddress, headAddress, beginAddress); - } - - private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, HybridLogRecoveryInfo recoveryInfo) { @@ -202,8 +148,8 @@ private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, int numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); // Issue request to read pages as much as possible - hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); - + hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, hlog.AsyncReadPagesCallbackForRecovery, recoveryStatus); + for (long page = startPage; page < endPage; page++) { // Ensure page has been read into memory @@ -227,7 +173,7 @@ private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, { pageUntilAddress = hlog.GetOffsetInPage(untilAddress); } - + var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress); RecoverFromPage(fromAddress, pageFromAddress, pageUntilAddress, startLogicalAddress, physicalAddress, recoveryInfo.version); @@ -292,7 +238,7 @@ private void RecoverHybridLogFromSnapshotFile( int numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, - AsyncReadPagesCallbackForRecovery, + hlog.AsyncReadPagesCallbackForRecovery, recoveryStatus, recoveryStatus.recoveryDevicePageOffset, recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice); @@ -430,26 +376,6 @@ private void RecoverFromPage(long startRecoveryAddress, } } - private void AsyncReadPagesCallbackForRecovery(uint errorCode, uint numBytes, NativeOverlapped* overlap) - { - if (errorCode != 0) - { - Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); - } - - // Set the page status to flushed - var result = (PageAsyncReadResult)Overlapped.Unpack(overlap).AsyncResult; - - if (result.freeBuffer1 != null) - { - hlog.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page); - result.freeBuffer1.Return(); - } - int index = hlog.GetPageIndexForPage(result.page); - result.context.readStatus[index] = ReadStatus.Done; - Interlocked.MemoryBarrier(); - Overlapped.Free(overlap); - } private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, NativeOverlapped* overlap) { @@ -470,11 +396,11 @@ private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, Na long readPage = result.page + result.context.capacity; if (FoldOverSnapshot) { - hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, AsyncReadPagesCallbackForRecovery, result.context); + hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, hlog.AsyncReadPagesCallbackForRecovery, result.context); } else { - hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, AsyncReadPagesCallbackForRecovery, + hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, hlog.AsyncReadPagesCallbackForRecovery, result.context, result.context.recoveryDevicePageOffset, result.context.recoveryDevice, result.context.objectLogRecoveryDevice); @@ -485,4 +411,89 @@ private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, Na Overlapped.Free(overlap); } } + + public unsafe abstract partial class AllocatorBase : IDisposable + where Key : new() + where Value : new() + { + /// + /// Restore log + /// + /// + /// + /// + public void RestoreHybridLog(long untilAddress, long headAddress, long beginAddress) + { + Debug.Assert(beginAddress <= headAddress); + Debug.Assert(headAddress <= untilAddress); + + // Special cases: we do not load any records into memory + if ( + (beginAddress == untilAddress) || // Empty log + ((headAddress == untilAddress) && (GetOffsetInPage(headAddress) == 0)) // Empty in-memory page + ) + { + AllocatePage(GetPageIndexForAddress(headAddress)); + } + else + { + var tailPage = GetPage(untilAddress); + var headPage = GetPage(headAddress); + + var recoveryStatus = new RecoveryStatus(GetCapacityNumPages(), headPage, tailPage, untilAddress); + for (int i = 0; i < recoveryStatus.capacity; i++) + { + recoveryStatus.readStatus[i] = ReadStatus.Done; + } + + var numPages = 0; + for (var page = headPage; page <= tailPage; page++) + { + var pageIndex = GetPageIndexForPage(page); + recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; + numPages++; + } + + AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); + + var done = false; + while (!done) + { + done = true; + for (long page = headPage; page <= tailPage; page++) + { + int pageIndex = GetPageIndexForPage(page); + if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending) + { + done = false; + break; + } + } + } + } + + RecoveryReset(untilAddress, headAddress, beginAddress); + } + + internal void AsyncReadPagesCallbackForRecovery(uint errorCode, uint numBytes, NativeOverlapped* overlap) + { + if (errorCode != 0) + { + Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); + } + + // Set the page status to flushed + var result = (PageAsyncReadResult)Overlapped.Unpack(overlap).AsyncResult; + + if (result.freeBuffer1 != null) + { + PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page); + result.freeBuffer1.Return(); + } + int index = GetPageIndexForPage(result.page); + result.context.readStatus[index] = ReadStatus.Done; + Interlocked.MemoryBarrier(); + Overlapped.Free(overlap); + } + } } diff --git a/cs/test/FasterLogTests.cs b/cs/test/FasterLogTests.cs index 60542c9be..8059006bf 100644 --- a/cs/test/FasterLogTests.cs +++ b/cs/test/FasterLogTests.cs @@ -44,7 +44,7 @@ public void FasterLogTest1() { log.Append(entry); } - log.Flush(true); + log.FlushAndCommit(true); using (var iter = log.Scan(0, long.MaxValue)) {