Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] FasterLog v2 iteration fix #623

Merged
merged 5 commits into from
Dec 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,21 @@ public sealed class FasterLog : IDisposable
/// Used to determine disposability of log
/// </summary>
internal int logRefCount = 1;

/// <summary>
/// Create new log instance
/// </summary>
/// <param name="logSettings">Log settings</param>
public FasterLog(FasterLogSettings logSettings)
: this(logSettings, logSettings.TryRecoverLatest)
{ }

/// <summary>
/// Create new log instance
/// </summary>
/// <param name="logSettings">Log settings</param>
/// <param name="syncRecover">Recover synchronously</param>
private FasterLog(FasterLogSettings logSettings, bool syncRecover)
badrishc marked this conversation as resolved.
Show resolved Hide resolved
{
logCommitManager = logSettings.LogCommitManager ??
new DeviceLogCommitCheckpointManager
Expand Down Expand Up @@ -167,7 +176,8 @@ public FasterLog(FasterLogSettings logSettings)
commitPolicy.OnAttached(this);

tolerateDeviceFailure = logSettings.TolerateDeviceFailure;
if (logSettings.TryRecoverLatest)

if (syncRecover)
badrishc marked this conversation as resolved.
Show resolved Hide resolved
{
try
{
Expand Down Expand Up @@ -201,11 +211,13 @@ public void Recover(long requestedCommitNum = -1)
/// <param name="cancellationToken"></param>
public static async ValueTask<FasterLog> CreateAsync(FasterLogSettings logSettings, CancellationToken cancellationToken = default)
{
var fasterLog = new FasterLog(logSettings);
var (it, cookie) = await fasterLog.RestoreLatestAsync(cancellationToken).ConfigureAwait(false);
fasterLog.RecoveredIterators = it;
fasterLog.RecoveredCookie = cookie;

var fasterLog = new FasterLog(logSettings, false);
if (logSettings.TryRecoverLatest)
{
var (it, cookie) = await fasterLog.RestoreLatestAsync(cancellationToken).ConfigureAwait(false);
fasterLog.RecoveredIterators = it;
fasterLog.RecoveredCookie = cookie;
}
return fasterLog;
}

Expand Down Expand Up @@ -1499,7 +1511,6 @@ private void RestoreLatest(out Dictionary<string, long> iterators, out byte[] co
cookie = info.Cookie;
commitNum = info.CommitNum;
beginAddress = allocator.BeginAddress;
if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (readOnlyMode)
allocator.HeadAddress = long.MaxValue;

Expand Down
6 changes: 3 additions & 3 deletions cs/src/core/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public sealed class FasterLogScanIterator : ScanIteratorBase, IDisposable
/// Whether iteration has ended, either because we reached the end address of iteration, or because
/// we reached the end of a completed log.
/// </summary>
public bool Ended => (currentAddress >= endAddress) || (fasterLog.LogCompleted && currentAddress == fasterLog.TailAddress);
public bool Ended => (nextAddress >= endAddress) || (fasterLog.LogCompleted && nextAddress == fasterLog.TailAddress);

/// <summary>
/// Constructor
Expand Down Expand Up @@ -517,8 +517,8 @@ internal unsafe bool ScanForwardForCommit(ref FasterLogRecoveryInfo info, long c
try
{
// Continue looping until we find a record that is a commit record
while (GetNextInternal(out long physicalAddress, out var entryLength, out currentAddress,
out nextAddress,
while (GetNextInternal(out long physicalAddress, out var entryLength, out long currentAddress,
out long nextAddress,
out var isCommitRecord))
{
if (!isCommitRecord) continue;
Expand Down
2 changes: 1 addition & 1 deletion cs/test/DeviceFasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void BasicHighLatencyDeviceTest()

private async ValueTask FasterLogTest1(LogChecksumType logChecksum, IDevice device, ILogCommitManager logCommitManager, FasterLogTestBase.IteratorType iteratorType)
{
var logSettings = new FasterLogSettings { PageSizeBits = 20, SegmentSizeBits = 20, LogDevice = device, LogChecksum = logChecksum, LogCommitManager = logCommitManager };
var logSettings = new FasterLogSettings { PageSizeBits = 20, SegmentSizeBits = 20, LogDevice = device, LogChecksum = logChecksum, LogCommitManager = logCommitManager, TryRecoverLatest = false };
log = FasterLogTestBase.IsAsync(iteratorType) ? await FasterLog.CreateAsync(logSettings) : new FasterLog(logSettings);

byte[] entry = new byte[entryLength];
Expand Down
2 changes: 1 addition & 1 deletion cs/test/FasterLogRecoverReadOnlyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void TearDown()
public async Task RecoverReadOnlyCheck1([Values] bool isAsync)
{
using var device = Devices.CreateLogDevice(deviceName);
var logSettings = new FasterLogSettings { LogDevice = device, MemorySizeBits = 11, PageSizeBits = 9, MutableFraction = 0.5, SegmentSizeBits = 9 };
var logSettings = new FasterLogSettings { LogDevice = device, MemorySizeBits = 11, PageSizeBits = 9, MutableFraction = 0.5, SegmentSizeBits = 9, TryRecoverLatest = false };
using var log = isAsync ? await FasterLog.CreateAsync(logSettings) : new FasterLog(logSettings);

await Task.WhenAll(ProducerAsync(log, cts),
Expand Down
12 changes: 6 additions & 6 deletions cs/test/FasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ internal class FasterLogGeneralTests : FasterLogTestBase
public async ValueTask FasterLogTest1([Values] LogChecksumType logChecksum, [Values] IteratorType iteratorType)
{
device = Devices.CreateLogDevice(path + "fasterlog.log", deleteOnClose: true);
var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum, LogCommitManager = manager };
var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum, LogCommitManager = manager, TryRecoverLatest = false };
log = IsAsync(iteratorType) ? await FasterLog.CreateAsync(logSettings) : new FasterLog(logSettings);

byte[] entry = new byte[entryLength];
Expand Down Expand Up @@ -255,7 +255,7 @@ public async ValueTask TryEnqueue1([Values] LogChecksumType logChecksum, [Values
CancellationToken token = cts.Token;

device = Devices.CreateLogDevice(path + "fasterlog.log", deleteOnClose: true);
var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum, LogCommitManager = manager };
var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum, LogCommitManager = manager, TryRecoverLatest = false };
log = IsAsync(iteratorType) ? await FasterLog.CreateAsync(logSettings) : new FasterLog(logSettings);

const int dataLength = 1000;
Expand Down Expand Up @@ -302,7 +302,7 @@ public async ValueTask TryEnqueue2([Values] LogChecksumType logChecksum, [Values
string filename = path + "TryEnqueue2" + deviceType.ToString() + ".log";
device = TestUtils.CreateTestDevice(deviceType, filename);

var logSettings = new FasterLogSettings { LogDevice = device, PageSizeBits = 14, LogChecksum = logChecksum, LogCommitManager = manager, SegmentSizeBits = 22 };
var logSettings = new FasterLogSettings { LogDevice = device, PageSizeBits = 14, LogChecksum = logChecksum, LogCommitManager = manager, SegmentSizeBits = 22, TryRecoverLatest = false };
log = IsAsync(iteratorType) ? await FasterLog.CreateAsync(logSettings) : new FasterLog(logSettings);

const int dataLength = 10000;
Expand Down Expand Up @@ -375,7 +375,7 @@ public async ValueTask TruncateUntilBasic([Values] LogChecksumType logChecksum,
string filename = path + "TruncateUntilBasic" + deviceType.ToString() + ".log";
device = TestUtils.CreateTestDevice(deviceType, filename);

var logSettings = new FasterLogSettings { LogDevice = device, PageSizeBits = 14, LogChecksum = logChecksum, LogCommitManager = manager, SegmentSizeBits = 22 };
var logSettings = new FasterLogSettings { LogDevice = device, PageSizeBits = 14, LogChecksum = logChecksum, LogCommitManager = manager, SegmentSizeBits = 22, TryRecoverLatest = false };
log = IsAsync(iteratorType) ? await FasterLog.CreateAsync(logSettings) : new FasterLog(logSettings);

byte[] data1 = new byte[100];
Expand Down Expand Up @@ -463,7 +463,7 @@ public async ValueTask EnqueueAndWaitForCommitAsyncBasicTest([Values] LogChecksu
public async ValueTask TruncateUntil2([Values] LogChecksumType logChecksum, [Values] IteratorType iteratorType)
{
device = Devices.CreateLogDevice(path + "fasterlog.log", deleteOnClose: true);
var logSettings = new FasterLogSettings { LogDevice = device, MemorySizeBits = 20, PageSizeBits = 14, LogChecksum = logChecksum, LogCommitManager = manager };
var logSettings = new FasterLogSettings { LogDevice = device, MemorySizeBits = 20, PageSizeBits = 14, LogChecksum = logChecksum, LogCommitManager = manager, TryRecoverLatest = false };
log = IsAsync(iteratorType) ? await FasterLog.CreateAsync(logSettings) : new FasterLog(logSettings);

byte[] data1 = new byte[1000];
Expand Down Expand Up @@ -658,7 +658,7 @@ public async ValueTask CommitAsyncPrevTask([Values] TestUtils.DeviceType deviceT

string filename = $"{path}/CommitAsyncPrevTask_{deviceType}.log";
device = TestUtils.CreateTestDevice(deviceType, filename);
var logSettings = new FasterLogSettings { LogDevice = device, LogCommitManager = manager, SegmentSizeBits = 22 };
var logSettings = new FasterLogSettings { LogDevice = device, LogCommitManager = manager, SegmentSizeBits = 22, TryRecoverLatest = false };
log = await FasterLog.CreateAsync(logSettings);

// make it small since launching each on separate threads
Expand Down