From 31db806681df7ddda7482889de2bc3e6bd34ba41 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 12 May 2021 18:36:19 -0700 Subject: [PATCH] [C#] Add testcase: resume iterator with async enumerable (#471) --- cs/test/FasterLogResumeTests.cs | 54 +++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/cs/test/FasterLogResumeTests.cs b/cs/test/FasterLogResumeTests.cs index d104251a0..25afd874f 100644 --- a/cs/test/FasterLogResumeTests.cs +++ b/cs/test/FasterLogResumeTests.cs @@ -114,5 +114,59 @@ public async Task FasterLogResumePersistedReader2([Values] LogChecksumType logCh } } } + + [Test] + [Category("FasterLog")] + public async Task FasterLogResumePersistedReader3([Values] LogChecksumType logChecksum, [Values] bool overwriteLogCommits, [Values] bool removeOutdated) + { + var input1 = new byte[] { 0, 1, 2, 3 }; + var input2 = new byte[] { 4, 5, 6, 7, 8, 9, 10 }; + var input3 = new byte[] { 11, 12 }; + string readerName = "abcd"; + + using (var logCommitManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(commitPath), overwriteLogCommits, removeOutdated)) + { + long originalCompleted; + + using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager })) + { + await l.EnqueueAsync(input1); + await l.CommitAsync(); + await l.EnqueueAsync(input2); + await l.CommitAsync(); + await l.EnqueueAsync(input3); + await l.CommitAsync(); + + using var originalIterator = l.Scan(0, l.TailAddress, readerName); + + int count = 0; + await foreach (var item in originalIterator.GetAsyncEnumerable()) + { + if (count < 2) // we complete 1st and 2nd item read + originalIterator.CompleteUntil(item.nextAddress); + + if (count < 1) // we commit only 1st item read + await l.CommitAsync(); + + count++; + } + originalCompleted = originalIterator.CompletedUntilAddress; + } + + using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager })) + { + using var recoveredIterator = l.Scan(0, l.TailAddress, readerName); + + int count = 0; + await foreach (var item in recoveredIterator.GetAsyncEnumerable()) + { + if (count == 0) // resumed iterator will start at item2 + Assert.True(input2.SequenceEqual(item.entry), $"Original: {input2[0]}, Recovered: {item.entry[0]}, Original: {originalCompleted}, Recovered: {recoveredIterator.CompletedUntilAddress}"); + count++; + } + Assert.IsTrue(count == 2); + } + } + } } }