Skip to content

Commit

Permalink
Implement TaskSeq.init, initAsync, initInfinite, initInfiniteAsync an…
Browse files Browse the repository at this point in the history
…d TaskSeq.concat, plus tests and docs
  • Loading branch information
abelbraaksma committed Nov 3, 2022
1 parent 1f547ca commit e8e62ec
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<Compile Include="TaskSeq.Cast.Tests.fs" />
<Compile Include="TaskSeq.Choose.Tests.fs" />
<Compile Include="TaskSeq.Collect.Tests.fs" />
<Compile Include="TaskSeq.Concat.Tests.fs" />
<Compile Include="TaskSeq.Empty.Tests.fs" />
<Compile Include="TaskSeq.ExactlyOne.Tests.fs" />
<Compile Include="TaskSeq.Filter.Tests.fs" />
Expand All @@ -23,6 +24,7 @@
<Compile Include="TaskSeq.Fold.Tests.fs" />
<Compile Include="TaskSeq.Head.Tests.fs" />
<Compile Include="TaskSeq.Indexed.Tests.fs" />
<Compile Include="TaskSeq.Init.Tests.fs" />
<Compile Include="TaskSeq.IsEmpty.fs" />
<Compile Include="TaskSeq.Item.Tests.fs" />
<Compile Include="TaskSeq.Iter.Tests.fs" />
Expand Down
51 changes: 51 additions & 0 deletions src/FSharpy.TaskSeq.Test/TaskSeq.Concat.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
module FSharpy.Tests.Concat

open System

open Xunit
open FsUnit.Xunit
open FsToolkit.ErrorHandling

open FSharpy
open System.Collections.Generic

//
// TaskSeq.concat
//

let validateSequence ts =
ts
|> TaskSeq.toSeqCachedAsync
|> Task.map (Seq.map string)
|> Task.map (String.concat "")
|> Task.map (should equal "123456789101234567891012345678910")

module EmptySeq =
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-concat with empty sequences`` variant =
taskSeq {
yield Gen.getEmptyVariant variant // not yield-bang!
yield Gen.getEmptyVariant variant
yield Gen.getEmptyVariant variant
}
|> TaskSeq.concat
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-concat with top sequence empty`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.box
|> TaskSeq.cast<IAsyncEnumerable<int>> // casting an int to an enumerable, LOL!
|> TaskSeq.concat
|> verifyEmpty

module Immutable =
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-concat with empty sequences`` variant =
taskSeq {
yield Gen.getSeqImmutable variant // not yield-bang!
yield Gen.getSeqImmutable variant
yield Gen.getSeqImmutable variant
}
|> TaskSeq.concat
|> validateSequence
142 changes: 142 additions & 0 deletions src/FSharpy.TaskSeq.Test/TaskSeq.Init.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
module FSharpy.Tests.Init

open System

open Xunit
open FsUnit.Xunit
open FsToolkit.ErrorHandling

open FSharpy

//
// TaskSeq.init
// TaskSeq.initInfinite
// TaskSeq.initAsync
// TaskSeq.initInfiniteAsync
//

/// Asserts that a sequence contains the char values 'A'..'J'.
module EmptySeq =
[<Fact>]
let ``TaskSeq-init can generate an empty sequence`` () = TaskSeq.init 0 (fun x -> x) |> verifyEmpty

[<Fact>]
let ``TaskSeq-initAsync can generate an empty sequence`` () =
TaskSeq.initAsync 0 (fun x -> Task.fromResult x)
|> verifyEmpty

[<Fact>]
let ``TaskSeq-init with a negative count gives an error`` () =
fun () ->
TaskSeq.init -1 (fun x -> Task.fromResult x)
|> TaskSeq.toArrayAsync
|> Task.ignore

|> should throwAsyncExact typeof<ArgumentException>

fun () ->
TaskSeq.init Int32.MinValue (fun x -> Task.fromResult x)
|> TaskSeq.toArrayAsync
|> Task.ignore

|> should throwAsyncExact typeof<ArgumentException>

[<Fact>]
let ``TaskSeq-initAsync with a negative count gives an error`` () =
fun () ->
TaskSeq.initAsync Int32.MinValue (fun x -> Task.fromResult x)
|> TaskSeq.toArrayAsync
|> Task.ignore

|> should throwAsyncExact typeof<ArgumentException>

module Immutable =
[<Fact>]
let ``TaskSeq-init singleton`` () =
TaskSeq.init 1 id
|> TaskSeq.head
|> Task.map (should equal 0)

[<Fact>]
let ``TaskSeq-initAsync singleton`` () =
TaskSeq.initAsync 1 (id >> Task.fromResult)
|> TaskSeq.head
|> Task.map (should equal 0)

[<Fact>]
let ``TaskSeq-init some values`` () =
TaskSeq.init 42 (fun x -> x / 2)
|> TaskSeq.length
|> Task.map (should equal 42)

[<Fact>]
let ``TaskSeq-initAsync some values`` () =
TaskSeq.init 42 (fun x -> Task.fromResult (x / 2))
|> TaskSeq.length
|> Task.map (should equal 42)

[<Fact>]
let ``TaskSeq-initInfinite`` () =
TaskSeq.initInfinite (fun x -> x / 2)
|> TaskSeq.item 1_000_001
|> Task.map (should equal 500_000)

[<Fact>]
let ``TaskSeq-initInfiniteAsync`` () =
TaskSeq.initInfiniteAsync (fun x -> Task.fromResult (x / 2))
|> TaskSeq.item 1_000_001
|> Task.map (should equal 500_000)

module SideEffects =
let inc (i: int byref) =
i <- i + 1
i

[<Fact>]
let ``TaskSeq-init singleton with side effects`` () = task {
let mutable x = 0

let ts = TaskSeq.init 1 (fun _ -> inc &x)

do! TaskSeq.head ts |> Task.map (should equal 1)
do! TaskSeq.head ts |> Task.map (should equal 2)
do! TaskSeq.head ts |> Task.map (should equal 3) // state mutates
}

[<Fact>]
let ``TaskSeq-init singleton with side effects -- Current`` () = task {
let mutable x = 0

let ts = TaskSeq.init 1 (fun _ -> inc &x)

let enumerator = ts.GetAsyncEnumerator()
let! _ = enumerator.MoveNextAsync()
do enumerator.Current |> should equal 1
do enumerator.Current |> should equal 1
do enumerator.Current |> should equal 1 // current state does not mutate
}

[<Fact>]
let ``TaskSeq-initAsync singleton with side effects`` () = task {
let mutable x = 0

let ts = TaskSeq.initAsync 1 (fun _ -> Task.fromResult (inc &x))

do! TaskSeq.head ts |> Task.map (should equal 1)
do! TaskSeq.head ts |> Task.map (should equal 2)
do! TaskSeq.head ts |> Task.map (should equal 3) // state mutates
}

[<Fact>]
let ``TaskSeq-initAsync singleton with side effects -- Current`` () = task {
let mutable x = 0

let ts = TaskSeq.initAsync 1 (fun _ -> Task.fromResult (inc &x))

let enumerator = ts.GetAsyncEnumerator()
let! _ = enumerator.MoveNextAsync()
do enumerator.Current |> should equal 1
do enumerator.Current |> should equal 1
do enumerator.Current |> should equal 1 // current state does not mutate
}
11 changes: 11 additions & 0 deletions src/FSharpy.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,18 @@ module TaskSeq =
//

let length source = Internal.lengthBy None source
let lengthOrMax max source = Internal.lengthBeforeMax max source
let lengthBy predicate source = Internal.lengthBy (Some(Predicate predicate)) source
let lengthByAsync predicate source = Internal.lengthBy (Some(PredicateAsync predicate)) source
let init count initializer = Internal.init (Some count) (InitAction initializer)
let initInfinite initializer = Internal.init None (InitAction initializer)
let initAsync count initializer = Internal.init (Some count) (InitActionAsync initializer)
let initInfiniteAsync initializer = Internal.init None (InitActionAsync initializer)

let concat (sources: taskSeq<#taskSeq<'T>>) = taskSeq {
for ts in sources do
yield! (ts :> taskSeq<'T>)
}

//
// iter/map/collect functions
Expand Down Expand Up @@ -262,6 +272,7 @@ module TaskSeq =
| None -> return Internal.raiseNotFound ()
}


let findAsync predicate source = task {
match! Internal.tryFind (PredicateAsync predicate) source with
| Some item -> return item
Expand Down
77 changes: 76 additions & 1 deletion src/FSharpy.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@ module TaskSeq =

/// <summary>
/// Returns the length of the sequence. This operation requires the whole sequence to be evaluated and
/// should not be used on potentially infinite sequences.
/// should not be used on potentially infinite sequences, see <see cref="lengthOrMax" /> for an alternative.
/// </summary>
val length: source: taskSeq<'T> -> Task<int>

/// <summary>
/// Returns the length of the sequence, or <paramref name="max" />, whichever comes first. This operation requires the task sequence
/// to be evaluated in full, or until <paramref name="max" /> items have been processed. Use this method instead of
/// <see cref="TaskSeq.length" /> if you want to prevent too many items to be evaluated, or if the sequence is potentially infinite.
/// </summary>
val lengthOrMax: max: int -> source: taskSeq<'T> -> Task<int>

/// <summary>
/// Returns the length of the sequence of all items for which the <paramref name="predicate" /> returns true.
/// This operation requires the whole sequence to be evaluated and should not be used on potentially infinite sequences.
Expand All @@ -32,6 +39,74 @@ module TaskSeq =
/// </summary>
val lengthByAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<int>

/// <summary>
/// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
/// with the current index, up to the given count. Each element is saved after its initialization for successive access to
/// <see cref="IAsyncEnumerator.Current" />, which will not re-evaluate the <paramref name="initializer" />. However,
/// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
/// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
/// not be accessed concurrently.
/// </summary>
///
/// <param name="count">The maximum number of items to generate for the sequence.</param>
/// <param name="initializer">A function that generates an item in the sequence from a given index.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:System.ArgumentException">Thrown when count is negative.</exception>
val init: count: int -> initializer: (int -> 'T) -> taskSeq<'T>

/// <summary>
/// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
/// with the current index, up to the given count. Each element is saved after its initialization for successive access to
/// <see cref="IAsyncEnumerator.Current" />, which will not re-evaluate the <paramref name="initializer" />. However,
/// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
/// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
/// not be accessed concurrently.
/// </summary>
///
/// <param name="count">The maximum number of items to generate for the sequence.</param>
/// <param name="initializer">A function that generates an item in the sequence from a given index.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:System.ArgumentException">Thrown when count is negative.</exception>
val initAsync: count: int -> initializer: (int -> #Task<'T>) -> taskSeq<'T>

/// <summary>
/// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
/// with the current index, ad infinitum, or until <see cref="Int32.MaxValue" /> is reached.
/// Each element is saved after its initialization for successive access to
/// <see cref="IAsyncEnumerator.Current" />, which will not re-evaluate the <paramref name="initializer" />. However,
/// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
/// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
/// not be accessed concurrently.
/// </summary>
///
/// <param name="initializer">A function that generates an item in the sequence from a given index.</param>
/// <returns>The resulting task sequence.</returns>
val initInfinite: initializer: (int -> 'T) -> taskSeq<'T>

/// <summary>
/// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
/// with the current index, ad infinitum, or until <see cref="Int32.MaxValue" /> is reached.
/// Each element is saved after its initialization for successive access to
/// <see cref="IAsyncEnumerator.Current" />, which will not re-evaluate the <paramref name="initializer" />. However,
/// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
/// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
/// not be accessed concurrently.
/// </summary>
///
/// <param name="initializer">A function that generates an item in the sequence from a given index.</param>
/// <returns>The resulting task sequence.</returns>
val initInfiniteAsync: initializer: (int -> #Task<'T>) -> taskSeq<'T>

/// <summary>
/// Combines the given task sequence of task sequences and concatenates them end-to-end, to form a
/// new flattened, single task sequence. Each task sequence is awaited item by item, before the next is iterated.
/// </summary>
///
/// <param name="sources">The input enumeration-of-enumerations.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input sequence is null.</exception>
val concat: sources: taskSeq<#taskSeq<'T>> -> taskSeq<'T>

/// Returns taskSeq as an array. This function is blocking until the sequence is exhausted and will properly dispose of the resources.
val toList: source: taskSeq<'T> -> 'T list

Expand Down
Loading

0 comments on commit e8e62ec

Please sign in to comment.