Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resumability of iteration over IAsyncEnumerable<'T> #42

Merged
merged 27 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3cec0d7
Add a bunch of multiple iteration tests
abelbraaksma Oct 16, 2022
e3fdee7
Adding several comments and log statements to get to the bottom of th…
abelbraaksma Oct 17, 2022
68e632e
Issue #39, adding bunch of tests that show various ways of getting In…
abelbraaksma Oct 22, 2022
dc7dcca
Add (failing) tests for calling IAsyncEnumerator.Current *before* ite…
abelbraaksma Oct 22, 2022
cf1c24d
Fix: do not throw an exception on IAsyncEnumerator.Current when undef…
abelbraaksma Oct 22, 2022
63fc705
Add (currently failing) tests for transitioning state to *after* sequ…
abelbraaksma Oct 22, 2022
da0a1ce
Fix tests, Fact vs Theory
abelbraaksma Oct 22, 2022
d8f9865
Fix: ensure Current returns Unchecked.defaultof when *beyond* the end…
abelbraaksma Oct 22, 2022
b27b11e
Improve verbosity printing
abelbraaksma Oct 22, 2022
817d6f7
Record whether or not the IAsyncEnumerable is "past completion", whic…
abelbraaksma Oct 22, 2022
1e318c1
Make the test's code a little more readable
abelbraaksma Oct 22, 2022
c537169
Add two tests the show the difference in behavior between two seq pro…
abelbraaksma Oct 22, 2022
fe8f5d4
Renaming 'this' pointer for clarity, ts -> this.
abelbraaksma Oct 22, 2022
33739e7
Initialize new TaskSeqStateMachineData() when getting a new enumerator
abelbraaksma Oct 22, 2022
98c4fc1
Add test for basic case of incorrect MoveNext state after creating ne…
abelbraaksma Oct 22, 2022
b1b29bb
Fix incorrect ResumptionPoint resume by force-resetting the `Machine`…
abelbraaksma Oct 24, 2022
667468a
Add a (failing) test to investigate Faulted state with multiple itera…
abelbraaksma Oct 24, 2022
2deb71d
Adding a bunch of variants with delayed yields to check proper state …
abelbraaksma Oct 24, 2022
2d999a0
Disable a bunch of tests, and set verbose=false, as CI cannot deal wi…
abelbraaksma Oct 24, 2022
e03acd7
Fix rebase mistake
abelbraaksma Oct 24, 2022
8fb8282
Temp disable worrisome tests to get xUnit to complete in CI
abelbraaksma Oct 24, 2022
b597e4d
Show that the NRE can also occur in synchronous versions of taskSeq
abelbraaksma Oct 24, 2022
c095488
Add two more tests, and improve logging
abelbraaksma Oct 26, 2022
cfd09c4
use copy of initial state machine to get resumption back to zero
dsyme Oct 28, 2022
3692a04
Cleanup TaskSeqBuilder now that we have a working version (credits @d…
abelbraaksma Oct 28, 2022
8932ff5
Fix expected test output and ignore a test that's dependent on faulty…
abelbraaksma Oct 28, 2022
218f83b
Re-enable previously hanging and crashing tests, add extra, more comp…
abelbraaksma Oct 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
<Compile Include="TaskSeq.OfXXX.Tests.fs" />
<Compile Include="TaskSeq.Tests.Other.fs" />
<Compile Include="TaskSeq.Tests.CE.fs" />
<Compile Include="TaskSeq.StateTransitionBug.Tests.CE.fs" />
<Compile Include="TaskSeq.StateTransitionBug-delayed.Tests.CE.fs" />
<Compile Include="TaskSeq.PocTests.fs" />
<Compile Include="TaskSeq.Realworld.fs" />
<Compile Include="Program.fs" />
Expand Down
12 changes: 12 additions & 0 deletions src/FSharpy.TaskSeq.Test/TaskSeq.Iter.Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ let ``TaskSeq-iter should go over all items`` () = task {
sum |> should equal 55 // task-dummies started at 1
}


[<Fact>]
let ``TaskSeq-iter multiple iterations over same sequence`` () = task {
let tq = createDummyTaskSeq 10
let mutable sum = 0
do! tq |> TaskSeq.iter (fun item -> sum <- sum + item)
do! tq |> TaskSeq.iter (fun item -> sum <- sum + item)
do! tq |> TaskSeq.iter (fun item -> sum <- sum + item)
do! tq |> TaskSeq.iter (fun item -> sum <- sum + item)
sum |> should equal 820 // task-dummies started at 1
}

[<Fact>]
let ``TaskSeq-iteriAsync should go over all items`` () = task {
let tq = createDummyTaskSeq 10
Expand Down
34 changes: 34 additions & 0 deletions src/FSharpy.TaskSeq.Test/TaskSeq.Map.Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,25 @@ open FsToolkit.ErrorHandling

open FSharpy

/// Asserts that a sequence contains the char values 'A'..'J'.
let validateSequence sequence =
sequence
|> Seq.map string
|> String.concat ""
|> should equal "ABCDEFGHIJ"

/// Validates for "ABCDEFGHIJ" char sequence, or any amount of char-value higher
let validateSequenceWithOffset offset sequence =
let expected =
[ 'A' .. 'J' ]
|> List.map (int >> (+) offset >> char >> string)
|> String.concat ""

sequence
|> Seq.map string
|> String.concat ""
|> should equal expected

[<Fact>]
let ``TaskSeq-map maps in correct order`` () = task {
let! sq =
Expand Down Expand Up @@ -72,6 +85,27 @@ let ``TaskSeq-mapAsync maps in correct order`` () = task {
validateSequence sq
}

[<Fact>]
let ``TaskSeq-mapAsync can map the same sequence multiple times`` () = task {
let mapAndCache =
TaskSeq.mapAsync (fun item -> task { return char (item + 64) })
>> TaskSeq.toSeqCachedAsync

let ts = createDummyDirectTaskSeq 10

let! result1 = mapAndCache ts
let! result2 = mapAndCache ts
let! result3 = mapAndCache ts
let! result4 = mapAndCache ts
validateSequence result1

// each time we do GetAsyncEnumerator(), and go through the whole sequence,
// the whole sequence gets re-evaluated, causing our +1 side-effect to run again.
validateSequenceWithOffset 10 result2 // the mutable is 10 higher
validateSequenceWithOffset 20 result3 // again
validateSequenceWithOffset 30 result4 // again
}

[<Fact>]
let ``TaskSeq-mapiAsync maps in correct order`` () = task {
let! sq =
Expand Down
57 changes: 53 additions & 4 deletions src/FSharpy.TaskSeq.Test/TaskSeq.Realworld.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) =

interface IAsyncEnumerable<byte[]> with
member reader.GetAsyncEnumerator(ct) =
output.WriteLine $"Cloning!! Current: {current}, lastPos: {lastPos}"
reader.MemberwiseClone() :?> IAsyncEnumerator<_>
{ new IAsyncEnumerator<_> with
member this.Current = (reader :> IAsyncEnumerator<_>).Current
member this.MoveNextAsync() = (reader :> IAsyncEnumerator<_>).MoveNextAsync()
interface IAsyncDisposable with
member this.DisposeAsync() = ValueTask()
}
//output.WriteLine $"Cloning!! Current: {current}, lastPos: {lastPos}"
//reader.MemberwiseClone() :?> IAsyncEnumerator<_>

interface IAsyncEnumerator<byte[]> with
member _.Current =
Expand All @@ -39,6 +45,8 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) =
let! bytesRead = buffered.ReadAsync(mem, 0, mem.Length) // offset refers to offset in target buffer, not source
lastPos <- buffered.Position

let x: seq<Guid> = seq { 1 } |> Seq.cast

if bytesRead > 0 then
current <- ValueSome mem
return true
Expand All @@ -48,7 +56,6 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) =
}
|> Task.toValueTask

interface IAsyncDisposable with
member _.DisposeAsync() =
try
// this disposes of the mem stream
Expand All @@ -57,8 +64,44 @@ type AsyncBufferedReader(output: ITestOutputHelper, data, blockSize) =
// if the previous block raises, we should still try to get rid of the underlying stream
stream.DisposeAsync().AsTask().Wait()


type ``Real world tests``(output: ITestOutputHelper) =
[<Fact>]
let ``Reading a 10MB buffered IAsync through TaskSeq.toArray non-async should succeed`` () = task {
use reader = AsyncBufferedReader(output, Array.init 2048 byte, 256)
// unreadable error with 'use'
//use bla = seq { 1}
let expected = Array.init 256 byte |> Array.replicate 8
let results = reader |> TaskSeq.toArray

(results, expected)
||> Array.iter2 (fun a b -> should equal a b)
}

[<Fact(Skip = "Broken test, faulty streaming test-implementation")>]
let ``Reading a user-code IAsync multiple times with TaskSeq.toArrayAsync should succeed`` () = task {
use reader = AsyncBufferedReader(output, Array.init 2048 byte, 256)
let expected = Array.init 256 byte |> Array.replicate 8
// read four times
let! results1 = reader |> TaskSeq.toArrayAsync
let! results2 = reader |> TaskSeq.toArrayAsync
let! results3 = reader |> TaskSeq.toArrayAsync
let! results4 = reader |> TaskSeq.toArrayAsync

(results1, expected)
||> Array.iter2 (fun a b -> should equal a b)

(results2, expected)
||> Array.iter2 (fun a b -> should equal a b)

(results3, expected)
||> Array.iter2 (fun a b -> should equal a b)

(results4, expected)
||> Array.iter2 (fun a b -> should equal a b)
}

[<Fact(Skip = "Broken test, faulty streaming test-implementation")>]
let ``Reading a 10MB buffered IAsync stream from start to finish`` () = task {
let mutable count = 0
use reader = AsyncBufferedReader(output, Array.init 2048 byte, 256)
Expand All @@ -76,6 +119,8 @@ type ``Real world tests``(output: ITestOutputHelper) =

// the following is extremely slow, which is why we just use F#'s comparison instead
// Using this takes 67s, compared to 0.25s using normal F# comparison.
// reader |> TaskSeq.toArray |> should equal expected // VERY SLOW!!

do! reader |> TaskSeq.iter (should equal expected)
do! reader |> TaskSeq.iter ((=) expected >> (should be True))
let! len = reader |> TaskSeq.mapi (fun i _ -> i + 1) |> TaskSeq.last
Expand Down Expand Up @@ -113,13 +158,17 @@ type ``Real world tests``(output: ITestOutputHelper) =
}


// This test used to have the following, which has since been solved through #42
// please leave this test in, as it tests a case that's quite easily reached if we
// introduce mistakes in the resumable code.
//
//System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.
// at <StartupCode$FSharpy-TaskSeq-Test>.$TaskSeq.Realworld.clo@58-4.MoveNext() in D:\Projects\OpenSource\Abel\TaskSeq\src\FSharpy.TaskSeq.Test\TaskSeq.Realworld.fs:line 77
// at Xunit.Sdk.TestInvoker`1.<>c__DisplayClass48_0.<<InvokeTestMethodAsync>b__1>d.MoveNext() in /_/src/xunit.execution/Sdk/Frameworks/Runners/TestInvoker.cs:line 264
//--- End of stack trace from previous location ---
// at Xunit.Sdk.ExecutionTimer.AggregateAsync(Func`1 asyncAction) in /_/src/xunit.execution/Sdk/Frameworks/ExecutionTimer.cs:line 48
// at Xunit.Sdk.ExceptionAggregator.RunAsync(Func`1 code) in /_/src/xunit.core/Sdk/ExceptionAggregator.cs:line 90\
[<Fact(Skip = "Currently fails")>]
[<Fact>]
let ``Reading a 1MB buffered IAsync stream from start to finish InvalidOperationException`` () = task {
let mutable count = 0
use reader = AsyncBufferedReader(output, Array.init 1_048_576 byte, 256)
Expand Down
Loading