Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement TaskSeq.init, initAsync, initInfinite, initInfiniteAsync and TaskSeq.concat #69

Merged
merged 1 commit into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<Compile Include="TaskSeq.Cast.Tests.fs" />
<Compile Include="TaskSeq.Choose.Tests.fs" />
<Compile Include="TaskSeq.Collect.Tests.fs" />
<Compile Include="TaskSeq.Concat.Tests.fs" />
<Compile Include="TaskSeq.Empty.Tests.fs" />
<Compile Include="TaskSeq.ExactlyOne.Tests.fs" />
<Compile Include="TaskSeq.Filter.Tests.fs" />
Expand All @@ -23,6 +24,7 @@
<Compile Include="TaskSeq.Fold.Tests.fs" />
<Compile Include="TaskSeq.Head.Tests.fs" />
<Compile Include="TaskSeq.Indexed.Tests.fs" />
<Compile Include="TaskSeq.Init.Tests.fs" />
<Compile Include="TaskSeq.IsEmpty.fs" />
<Compile Include="TaskSeq.Item.Tests.fs" />
<Compile Include="TaskSeq.Iter.Tests.fs" />
Expand Down
51 changes: 51 additions & 0 deletions src/FSharpy.TaskSeq.Test/TaskSeq.Concat.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
module FSharpy.Tests.Concat

open System

open Xunit
open FsUnit.Xunit
open FsToolkit.ErrorHandling

open FSharpy
open System.Collections.Generic

//
// TaskSeq.concat
//

let validateSequence ts =
ts
|> TaskSeq.toSeqCachedAsync
|> Task.map (Seq.map string)
|> Task.map (String.concat "")
|> Task.map (should equal "123456789101234567891012345678910")

module EmptySeq =
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-concat with empty sequences`` variant =
taskSeq {
yield Gen.getEmptyVariant variant // not yield-bang!
yield Gen.getEmptyVariant variant
yield Gen.getEmptyVariant variant
}
|> TaskSeq.concat
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-concat with top sequence empty`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.box
|> TaskSeq.cast<IAsyncEnumerable<int>> // casting an int to an enumerable, LOL!
|> TaskSeq.concat
|> verifyEmpty

module Immutable =
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-concat with empty sequences`` variant =
taskSeq {
yield Gen.getSeqImmutable variant // not yield-bang!
yield Gen.getSeqImmutable variant
yield Gen.getSeqImmutable variant
}
|> TaskSeq.concat
|> validateSequence
142 changes: 142 additions & 0 deletions src/FSharpy.TaskSeq.Test/TaskSeq.Init.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
module FSharpy.Tests.Init

open System

open Xunit
open FsUnit.Xunit
open FsToolkit.ErrorHandling

open FSharpy

//
// TaskSeq.init
// TaskSeq.initInfinite
// TaskSeq.initAsync
// TaskSeq.initInfiniteAsync
//

/// Asserts that a sequence contains the char values 'A'..'J'.

module EmptySeq =
[<Fact>]
let ``TaskSeq-init can generate an empty sequence`` () = TaskSeq.init 0 (fun x -> x) |> verifyEmpty

[<Fact>]
let ``TaskSeq-initAsync can generate an empty sequence`` () =
TaskSeq.initAsync 0 (fun x -> Task.fromResult x)
|> verifyEmpty

[<Fact>]
let ``TaskSeq-init with a negative count gives an error`` () =
fun () ->
TaskSeq.init -1 (fun x -> Task.fromResult x)
|> TaskSeq.toArrayAsync
|> Task.ignore

|> should throwAsyncExact typeof<ArgumentException>

fun () ->
TaskSeq.init Int32.MinValue (fun x -> Task.fromResult x)
|> TaskSeq.toArrayAsync
|> Task.ignore

|> should throwAsyncExact typeof<ArgumentException>

[<Fact>]
let ``TaskSeq-initAsync with a negative count gives an error`` () =
fun () ->
TaskSeq.initAsync Int32.MinValue (fun x -> Task.fromResult x)
|> TaskSeq.toArrayAsync
|> Task.ignore

|> should throwAsyncExact typeof<ArgumentException>

module Immutable =
[<Fact>]
let ``TaskSeq-init singleton`` () =
TaskSeq.init 1 id
|> TaskSeq.head
|> Task.map (should equal 0)

[<Fact>]
let ``TaskSeq-initAsync singleton`` () =
TaskSeq.initAsync 1 (id >> Task.fromResult)
|> TaskSeq.head
|> Task.map (should equal 0)

[<Fact>]
let ``TaskSeq-init some values`` () =
TaskSeq.init 42 (fun x -> x / 2)
|> TaskSeq.length
|> Task.map (should equal 42)

[<Fact>]
let ``TaskSeq-initAsync some values`` () =
TaskSeq.init 42 (fun x -> Task.fromResult (x / 2))
|> TaskSeq.length
|> Task.map (should equal 42)

[<Fact>]
let ``TaskSeq-initInfinite`` () =
TaskSeq.initInfinite (fun x -> x / 2)
|> TaskSeq.item 1_000_001
|> Task.map (should equal 500_000)

[<Fact>]
let ``TaskSeq-initInfiniteAsync`` () =
TaskSeq.initInfiniteAsync (fun x -> Task.fromResult (x / 2))
|> TaskSeq.item 1_000_001
|> Task.map (should equal 500_000)

module SideEffects =
let inc (i: int byref) =
i <- i + 1
i

[<Fact>]
let ``TaskSeq-init singleton with side effects`` () = task {
let mutable x = 0

let ts = TaskSeq.init 1 (fun _ -> inc &x)

do! TaskSeq.head ts |> Task.map (should equal 1)
do! TaskSeq.head ts |> Task.map (should equal 2)
do! TaskSeq.head ts |> Task.map (should equal 3) // state mutates
}

[<Fact>]
let ``TaskSeq-init singleton with side effects -- Current`` () = task {
let mutable x = 0

let ts = TaskSeq.init 1 (fun _ -> inc &x)

let enumerator = ts.GetAsyncEnumerator()
let! _ = enumerator.MoveNextAsync()
do enumerator.Current |> should equal 1
do enumerator.Current |> should equal 1
do enumerator.Current |> should equal 1 // current state does not mutate
}

[<Fact>]
let ``TaskSeq-initAsync singleton with side effects`` () = task {
let mutable x = 0

let ts = TaskSeq.initAsync 1 (fun _ -> Task.fromResult (inc &x))

do! TaskSeq.head ts |> Task.map (should equal 1)
do! TaskSeq.head ts |> Task.map (should equal 2)
do! TaskSeq.head ts |> Task.map (should equal 3) // state mutates
}

[<Fact>]
let ``TaskSeq-initAsync singleton with side effects -- Current`` () = task {
let mutable x = 0

let ts = TaskSeq.initAsync 1 (fun _ -> Task.fromResult (inc &x))

let enumerator = ts.GetAsyncEnumerator()
let! _ = enumerator.MoveNextAsync()
do enumerator.Current |> should equal 1
do enumerator.Current |> should equal 1
do enumerator.Current |> should equal 1 // current state does not mutate
}
11 changes: 11 additions & 0 deletions src/FSharpy.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,18 @@ module TaskSeq =
//

let length source = Internal.lengthBy None source
let lengthOrMax max source = Internal.lengthBeforeMax max source
let lengthBy predicate source = Internal.lengthBy (Some(Predicate predicate)) source
let lengthByAsync predicate source = Internal.lengthBy (Some(PredicateAsync predicate)) source
let init count initializer = Internal.init (Some count) (InitAction initializer)
let initInfinite initializer = Internal.init None (InitAction initializer)
let initAsync count initializer = Internal.init (Some count) (InitActionAsync initializer)
let initInfiniteAsync initializer = Internal.init None (InitActionAsync initializer)

let concat (sources: taskSeq<#taskSeq<'T>>) = taskSeq {
for ts in sources do
yield! (ts :> taskSeq<'T>)
}

//
// iter/map/collect functions
Expand Down Expand Up @@ -262,6 +272,7 @@ module TaskSeq =
| None -> return Internal.raiseNotFound ()
}


let findAsync predicate source = task {
match! Internal.tryFind (PredicateAsync predicate) source with
| Some item -> return item
Expand Down
77 changes: 76 additions & 1 deletion src/FSharpy.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@ module TaskSeq =

/// <summary>
/// Returns the length of the sequence. This operation requires the whole sequence to be evaluated and
/// should not be used on potentially infinite sequences.
/// should not be used on potentially infinite sequences, see <see cref="lengthOrMax" /> for an alternative.
/// </summary>
val length: source: taskSeq<'T> -> Task<int>

/// <summary>
/// Returns the length of the sequence, or <paramref name="max" />, whichever comes first. This operation requires the task sequence
/// to be evaluated in full, or until <paramref name="max" /> items have been processed. Use this method instead of
/// <see cref="TaskSeq.length" /> if you want to prevent too many items to be evaluated, or if the sequence is potentially infinite.
/// </summary>
val lengthOrMax: max: int -> source: taskSeq<'T> -> Task<int>

/// <summary>
/// Returns the length of the sequence of all items for which the <paramref name="predicate" /> returns true.
/// This operation requires the whole sequence to be evaluated and should not be used on potentially infinite sequences.
Expand All @@ -32,6 +39,74 @@ module TaskSeq =
/// </summary>
val lengthByAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<int>

/// <summary>
/// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
/// with the current index, up to the given count. Each element is saved after its initialization for successive access to
/// <see cref="IAsyncEnumerator.Current" />, which will not re-evaluate the <paramref name="initializer" />. However,
/// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
/// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
/// not be accessed concurrently.
/// </summary>
///
/// <param name="count">The maximum number of items to generate for the sequence.</param>
/// <param name="initializer">A function that generates an item in the sequence from a given index.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:System.ArgumentException">Thrown when count is negative.</exception>
val init: count: int -> initializer: (int -> 'T) -> taskSeq<'T>

/// <summary>
/// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
/// with the current index, up to the given count. Each element is saved after its initialization for successive access to
/// <see cref="IAsyncEnumerator.Current" />, which will not re-evaluate the <paramref name="initializer" />. However,
/// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
/// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
/// not be accessed concurrently.
/// </summary>
///
/// <param name="count">The maximum number of items to generate for the sequence.</param>
/// <param name="initializer">A function that generates an item in the sequence from a given index.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:System.ArgumentException">Thrown when count is negative.</exception>
val initAsync: count: int -> initializer: (int -> #Task<'T>) -> taskSeq<'T>

/// <summary>
/// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
/// with the current index, ad infinitum, or until <see cref="Int32.MaxValue" /> is reached.
/// Each element is saved after its initialization for successive access to
/// <see cref="IAsyncEnumerator.Current" />, which will not re-evaluate the <paramref name="initializer" />. However,
/// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
/// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
/// not be accessed concurrently.
/// </summary>
///
/// <param name="initializer">A function that generates an item in the sequence from a given index.</param>
/// <returns>The resulting task sequence.</returns>
val initInfinite: initializer: (int -> 'T) -> taskSeq<'T>

/// <summary>
/// Generates a new task sequence which, when iterated, will return successive elements by calling the given function
/// with the current index, ad infinitum, or until <see cref="Int32.MaxValue" /> is reached.
/// Each element is saved after its initialization for successive access to
/// <see cref="IAsyncEnumerator.Current" />, which will not re-evaluate the <paramref name="initializer" />. However,
/// re-iterating the returned task sequence will re-evaluate the initialization function. The returned sequence may
/// be passed between threads safely. However, individual IEnumerator values generated from the returned sequence should
/// not be accessed concurrently.
/// </summary>
///
/// <param name="initializer">A function that generates an item in the sequence from a given index.</param>
/// <returns>The resulting task sequence.</returns>
val initInfiniteAsync: initializer: (int -> #Task<'T>) -> taskSeq<'T>

/// <summary>
/// Combines the given task sequence of task sequences and concatenates them end-to-end, to form a
/// new flattened, single task sequence. Each task sequence is awaited item by item, before the next is iterated.
/// </summary>
///
/// <param name="sources">The input enumeration-of-enumerations.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input sequence is null.</exception>
val concat: sources: taskSeq<#taskSeq<'T>> -> taskSeq<'T>

/// Returns taskSeq as an array. This function is blocking until the sequence is exhausted and will properly dispose of the resources.
val toList: source: taskSeq<'T> -> 'T list

Expand Down
Loading