Skip to content

Commit

Permalink
[C#] Add testcase: resume iterator with async enumerable (#471)
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc authored May 13, 2021
1 parent 52e8678 commit 31db806
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions cs/test/FasterLogResumeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,59 @@ public async Task FasterLogResumePersistedReader2([Values] LogChecksumType logCh
}
}
}

[Test]
[Category("FasterLog")]
public async Task FasterLogResumePersistedReader3([Values] LogChecksumType logChecksum, [Values] bool overwriteLogCommits, [Values] bool removeOutdated)
{
var input1 = new byte[] { 0, 1, 2, 3 };
var input2 = new byte[] { 4, 5, 6, 7, 8, 9, 10 };
var input3 = new byte[] { 11, 12 };
string readerName = "abcd";

using (var logCommitManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(commitPath), overwriteLogCommits, removeOutdated))
{
long originalCompleted;

using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager }))
{
await l.EnqueueAsync(input1);
await l.CommitAsync();
await l.EnqueueAsync(input2);
await l.CommitAsync();
await l.EnqueueAsync(input3);
await l.CommitAsync();

using var originalIterator = l.Scan(0, l.TailAddress, readerName);

int count = 0;
await foreach (var item in originalIterator.GetAsyncEnumerable())
{
if (count < 2) // we complete 1st and 2nd item read
originalIterator.CompleteUntil(item.nextAddress);

if (count < 1) // we commit only 1st item read
await l.CommitAsync();

count++;
}
originalCompleted = originalIterator.CompletedUntilAddress;
}

using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager }))
{
using var recoveredIterator = l.Scan(0, l.TailAddress, readerName);

int count = 0;
await foreach (var item in recoveredIterator.GetAsyncEnumerable())
{
if (count == 0) // resumed iterator will start at item2
Assert.True(input2.SequenceEqual(item.entry), $"Original: {input2[0]}, Recovered: {item.entry[0]}, Original: {originalCompleted}, Recovered: {recoveredIterator.CompletedUntilAddress}");
count++;
}
Assert.IsTrue(count == 2);
}
}
}
}
}

0 comments on commit 31db806

Please sign in to comment.