Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Async.Choice #744

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ namespace FSharp.Core.Unittests.FSharp_Core.Microsoft_FSharp_Control
open System
open FSharp.Core.Unittests.LibraryTestFx
open NUnit.Framework
#if FSHARP_CORE_PORTABLE
// nothing
#else
#if FSHARP_CORE_NETCORE_PORTABLE
// nothing
#else
open FsCheck
#endif
#endif

module LeakUtils =
// when testing for liveness, the things that we want to observe must always be created in
Expand Down Expand Up @@ -296,7 +305,7 @@ type AsyncModule() =


[<Test>]
member this.``error on one workflow should cancel all others``() =
member this.``error on one workflow should cancel all other parallel workflows``() =
let counter =
async {
let counter = ref 0
Expand All @@ -314,6 +323,84 @@ type AsyncModule() =

Assert.AreEqual(0, counter)

#if FSHARP_CORE_PORTABLE
// nothing
#else
#if FSHARP_CORE_NETCORE_PORTABLE
// nothing
#else

[<Test>]
member this.``Async.Choice takes first result that is <> None``() =
let returnFirstResult (PositiveInt n) (PositiveInt i) x =
n > i ==>
let result =
async {
let job j = async { if j = i then return Some x else return None }

return! Async.Choice [ for j in 1 .. n -> job j ]
} |> Async.RunSynchronously

Some x = result

Check.QuickThrowOnFailure returnFirstResult

[<Test>]
member this.``Async.Choice reports error when things crash``() =
try
async {
let job i = async { failwith "crashed"; return None }

return! Async.Choice [ for i in 1 .. 100 -> job i ]
}
|> Async.RunSynchronously
|> ignore

failwith "expected an exception"
with exn when exn.Message = "crashed" -> ()

[<Test>]
member this.``Async.Choice returns None if no tasks are given``() =
let result =
Async.Choice [ ]
|> Async.RunSynchronously

Assert.AreEqual(None, result)

[<Test>]
member this.``Async.Choice returns None if all results are None``() =
let returnNone (PositiveInt n) x =
let result =
async {
let job j = async { return None }

return! Async.Choice [ for j in 1 .. n -> job j ]
} |> Async.RunSynchronously

None = result

Check.QuickThrowOnFailure returnNone

[<Test>]
member this.``Async.Choice returns fastest response that is not None``() =
let delay interval result =
async {
do! Async.Sleep interval
return! async {
printfn "returning %A after %d ms." result interval
return result }
}

let result =
[ delay 100 None ; delay 1000 (Some 1) ; delay 500 (Some 2) ]
|> Async.Choice
|> Async.RunSynchronously

Assert.AreEqual(Some 2, result)

#endif
#endif

[<Test>]
member this.``AwaitWaitHandle.ExceptionsAfterTimeout``() =
let wh = new System.Threading.ManualResetEvent(false)
Expand Down
18 changes: 11 additions & 7 deletions src/fsharp/FSharp.Core.Unittests/LibraryTestFx.fs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,14 @@ module SurfaceArea =
t.GetMembers()
|> Array.map (fun v -> sprintf "%s: %s" (v.ReflectedType.ToString()) (v.ToString()))
#endif

types
|> Array.collect getTypeMemberStrings
|> Array.sort
|> String.concat "\r\n"

let actual =
types
|> Array.collect getTypeMemberStrings
|> Array.sort
|> String.concat "\r\n"

asm,actual

// verify public surface area matches expected
let verify expected platform fileName =
Expand All @@ -133,7 +136,8 @@ module SurfaceArea =
let normalize (s:string) =
Regex.Replace(s, "(\\r\\n|\\n)+", "\r\n").Trim([|'\r';'\n'|])

let actual = getActual () |> normalize
let asm, actualNotNormalized = getActual ()
let actual = actualNotNormalized |> normalize
let expected = expected |> normalize

Assert.AreEqual(expected, actual, sprintf "\r\n%s\r\n\r\n Expected and actual surface area don't match. To see the delta, run\r\nwindiff %s %s" actual fileName logFile)
Assert.AreEqual(expected, actual, sprintf "\r\nAssembly: %A\r\n\r\n%s\r\n\r\n Expected and actual surface area don't match. To see the delta, run\r\nwindiff %s %s" asm actual fileName logFile)
1 change: 1 addition & 0 deletions src/fsharp/FSharp.Core.Unittests/SurfaceArea.net40.fs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ Microsoft.FSharp.Control.FSharpAsync: Boolean Equals(System.Object)
Microsoft.FSharp.Control.FSharpAsync: Int32 GetHashCode()
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Control.FSharpAsync`1[T]] StartChild[T](Microsoft.FSharp.Control.FSharpAsync`1[T], Microsoft.FSharp.Core.FSharpOption`1[System.Int32])
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.FSharpChoice`2[T,System.Exception]] Catch[T](Microsoft.FSharp.Control.FSharpAsync`1[T])
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.FSharpOption`1[T]] Choice[T](System.Collections.Generic.IEnumerable`1[Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.FSharpOption`1[T]]])
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit] AwaitTask(System.Threading.Tasks.Task)
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit] Ignore[T](Microsoft.FSharp.Control.FSharpAsync`1[T])
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit] Sleep(Int32)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ Microsoft.FSharp.Control.FSharpAsync: Boolean Equals(System.Object)
Microsoft.FSharp.Control.FSharpAsync: Int32 GetHashCode()
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Control.FSharpAsync`1[T]] StartChild[T](Microsoft.FSharp.Control.FSharpAsync`1[T], Microsoft.FSharp.Core.FSharpOption`1[System.Int32])
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.FSharpChoice`2[T,System.Exception]] Catch[T](Microsoft.FSharp.Control.FSharpAsync`1[T])
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.FSharpOption`1[T]] Choice[T](System.Collections.Generic.IEnumerable`1[Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.FSharpOption`1[T]]])
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit] AwaitTask(System.Threading.Tasks.Task)
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit] Ignore[T](Microsoft.FSharp.Control.FSharpAsync`1[T])
Microsoft.FSharp.Control.FSharpAsync: Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit] Sleep(Int32)
Expand Down
31 changes: 31 additions & 0 deletions src/fsharp/FSharp.Core/control.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,37 @@ namespace Microsoft.FSharp.Control
let token = defaultArg cancellationToken defaultCancellationTokenSource.Token
CancellationTokenOps.StartWithContinuations(token, computation, id, (fun edi -> edi.ThrowAny()), ignore)

static member Choice(computations : Async<'T option> seq) = async {
match Seq.toArray computations with
| [||] -> return None
| [|t|] -> return! t
| computations ->

let! t = Async.CancellationToken
return! Async.FromContinuations <|
fun (sc,ec,cc) ->
let noneCount = ref 0
let exnCount = ref 0
let innerCts = CancellationTokenSource.CreateLinkedTokenSource t

let scont (result : 'T option) =
match result with
| Some _ when Interlocked.Increment exnCount = 1 -> innerCts.Cancel() ; sc result
| None when Interlocked.Increment noneCount = computations.Length -> sc None
| _ -> ()

let econt (exn : exn) =
if Interlocked.Increment exnCount = 1 then
innerCts.Cancel() ; ec exn

let ccont (exn : OperationCanceledException) =
if Interlocked.Increment exnCount = 1 then
innerCts.Cancel(); cc exn

for task in computations do
ignore <| System.Threading.Tasks.Task.Factory.StartNew(fun () -> Async.StartWithContinuations(task, scont, econt, ccont, innerCts.Token))
}

#if FSHARP_CORE_NETCORE_PORTABLE
static member Sleep(dueTime : int) : Async<unit> =
// use combo protectedPrimitiveWithResync + continueWith instead of AwaitTask so we can pass cancellation token to the Delay task
Expand Down
7 changes: 7 additions & 0 deletions src/fsharp/FSharp.Core/control.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ namespace Microsoft.FSharp.Control
/// <returns>A computation that returns an array of values from the sequence of input computations.</returns>
static member Parallel : computations:seq<Async<'T>> -> Async<'T[]>

/// <summary>Creates an asynchronous computation that executes all the given asynchronous computations,
/// and returns the result of the first succeeding computation (i.e. the first computation with a result that is not None).</summary>
///
/// <param name="computationList">A sequence of distinct computations to be parallelized.</param>
/// <returns>A computation that returns the first succeeding computation in the sequence of input computations.</returns>
static member Choice : computations:seq<Async<'T option>> -> Async<'T option>

//---------- Thread Control

/// <summary>Creates an asynchronous computation that creates a new thread and runs
Expand Down