-
Notifications
You must be signed in to change notification settings - Fork 790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Async.Choice #807
Changes from 2 commits
39527b9
c4bdb2d
54de8fa
43dd871
8b4cdd4
4f44983
e758c51
b81dea5
28d0c0c
1da20e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,8 +6,125 @@ | |
namespace FSharp.Core.Unittests.FSharp_Core.Microsoft_FSharp_Control | ||
|
||
open System | ||
open System.Threading | ||
open FSharp.Core.Unittests.LibraryTestFx | ||
open NUnit.Framework | ||
open FsCheck | ||
|
||
[<AutoOpen>] | ||
module ChoiceUtils = | ||
|
||
// FsCheck driven Async.Choice tests | ||
|
||
exception ChoiceExn of index:int | ||
|
||
/// represents a child computation of a choice workflow | ||
type ChoiceOp = | ||
| NoneResultAfter of timeout:int | ||
| SomeResultAfter of timeout:int | ||
| ExceptionAfter of timeout:int | ||
with | ||
member c.Timeout = | ||
match c with | ||
| NoneResultAfter t -> t | ||
| SomeResultAfter t -> t | ||
| ExceptionAfter t -> t | ||
|
||
/// represent a choice worfklow | ||
type ChoiceWorkflow = ChoiceWorkflow of children:ChoiceOp list * cancelAfter:int option | ||
|
||
/// normalizes random timeout arguments | ||
let normalize (ChoiceWorkflow(ops, cancelAfter)) = | ||
let ms t = 2000 * (abs t % 15) // timeouts only positive multiples of 2 seconds, up to 30 seconds | ||
let mkOp op = | ||
match op with | ||
| NoneResultAfter t -> NoneResultAfter (ms t) | ||
| SomeResultAfter t -> SomeResultAfter (ms t) | ||
| ExceptionAfter t -> ExceptionAfter (ms t) | ||
|
||
let ops = ops |> List.map mkOp | ||
let cancelAfter = cancelAfter |> Option.map ms | ||
ChoiceWorkflow(ops, cancelAfter) | ||
|
||
/// runs specified choice workflow and checks that | ||
/// Async.Choice spec is satisfied | ||
let runChoice (ChoiceWorkflow(ops, cancelAfter)) = | ||
// Step 1. build a choice workflow from the abstract representation | ||
let completed = ref 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can use F# 4 i think, so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm OK with refs being explicit if the author wishes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dsyme you too in the |
||
let returnAfter time f = async { | ||
do! Async.Sleep time | ||
let _ = Interlocked.Increment completed | ||
return f () | ||
} | ||
|
||
let mkOp (index : int) = function | ||
| NoneResultAfter t -> returnAfter t (fun () -> None) | ||
| SomeResultAfter t -> returnAfter t (fun () -> Some index) | ||
| ExceptionAfter t -> returnAfter t (fun () -> raise <| ChoiceExn index) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think |
||
|
||
let choiceWorkflow = ops |> List.mapi mkOp |> Async.Choice | ||
|
||
// Step 2. run the choice workflow and keep the results | ||
let result = | ||
let cancellationToken = | ||
match cancelAfter with | ||
| Some ca -> | ||
let cts = new CancellationTokenSource() | ||
cts.CancelAfter(ca) | ||
Some cts.Token | ||
| None -> None | ||
|
||
try Async.RunSynchronously(choiceWorkflow, ?cancellationToken = cancellationToken) |> Choice1Of2 | ||
with e -> Choice2Of2 e | ||
|
||
// Step 3. check that results are up to spec | ||
let getMinTime() = | ||
seq { | ||
yield Int32.MaxValue // "infinity": avoid exceptions if list is empty | ||
|
||
for op in ops do | ||
match op with | ||
| NoneResultAfter _ -> () | ||
| op -> yield op.Timeout | ||
|
||
match cancelAfter with Some t -> yield t | None -> () | ||
} |> Seq.min | ||
|
||
let verifyIndex index = | ||
if index < 0 || index >= ops.Length then | ||
Assert.Fail "Returned choice index is out of bounds." | ||
|
||
// Step 3a. check that output is up to spec | ||
match result with | ||
| Choice1Of2 (Some index) -> | ||
verifyIndex index | ||
match ops.[index] with | ||
| SomeResultAfter timeout -> Assert.AreEqual(getMinTime(), timeout) | ||
| op -> Assert.Fail <| sprintf "Should be 'Some' but got %A" op | ||
|
||
| Choice1Of2 None -> | ||
Assert.True(ops |> List.forall (function NoneResultAfter _ -> true | _ -> false)) | ||
|
||
| Choice2Of2 (:? OperationCanceledException) -> | ||
match cancelAfter with | ||
| None -> Assert.Fail "Got unexpected cancellation exception." | ||
| Some ca -> Assert.AreEqual(getMinTime(), ca) | ||
|
||
| Choice2Of2 (ChoiceExn index) -> | ||
verifyIndex index | ||
match ops.[index] with | ||
| ExceptionAfter timeout -> Assert.AreEqual(getMinTime(), timeout) | ||
| op -> Assert.Fail <| sprintf "Should be 'Exception' but got %A" op | ||
|
||
| Choice2Of2 e -> Assert.Fail(sprintf "Unexpected exception %O" e) | ||
|
||
// Step 3b. check that nested cancellation happens as expected | ||
if not <| List.isEmpty ops then | ||
let minTimeout = getMinTime() | ||
let minTimeoutOps = ops |> Seq.filter (fun op -> op.Timeout <= minTimeout) |> Seq.length | ||
Assert.LessOrEqual(!completed, minTimeoutOps) | ||
|
||
|
||
|
||
module LeakUtils = | ||
// when testing for liveness, the things that we want to observe must always be created in | ||
|
@@ -295,6 +412,10 @@ type AsyncModule() = | |
testErrorAndCancelRace (Async.Sleep (-5)) | ||
|
||
|
||
[<Test>] | ||
member this.``Async.Choice correctness and cancellation``() = | ||
Check.QuickThrowOnFailure (normalize >> runChoice) | ||
|
||
[<Test>] | ||
member this.``error on one workflow should cancel all others``() = | ||
let counter = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1462,6 +1462,50 @@ namespace Microsoft.FSharp.Control | |
|> unfake); | ||
FakeUnit)) | ||
|
||
static member Choice(computations : Async<'T option> seq) : Async<'T option> = | ||
unprotectedPrimitive(fun args -> | ||
let result = | ||
try Choice1Of2 <| Seq.toArray computations | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please just use a forward pipe. We don't use much back piping in the F# compiler or FSharp.Core, |
||
with exn -> Choice2Of2 <| ExceptionDispatchInfo.RestoreOrCapture(exn) | ||
|
||
match result with | ||
| Choice2Of2 edi -> args.aux.econt edi | ||
| Choice1Of2 [||] -> args.cont None | ||
| Choice1Of2 [|P body|] -> body args | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line reflects the original fssnip implementation: if given a singleton array as argument, simply pass your current continuation to it. This does not happen in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think just remove this case. It's better to run all operations consistently through the thread pool. |
||
| Choice1Of2 computations -> | ||
protectedPrimitiveCore args (fun args -> | ||
let ({ aux = aux } as args) = delimitSyncContext args | ||
let noneCount = ref 0 | ||
let exnCount = ref 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mutable instead of ref There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that for the sake of clarity, this should remain an explicit ref cell allocation. It is used precisely to synchronize across multiple threads, hence should be unambiguously heap allocated. In any case, changing this would not change the underlying IL. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. eirik I think we both are now those old grumpy people who will continue to use ref cells even if mutable might work as well ;-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Damn kids with your mutable bindings There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm in your club too, Enrico also points me to |
||
let innerCts = new LinkedSubSource(aux.token) | ||
let trampolineHolder = aux.trampolineHolder | ||
|
||
let scont (result : 'T option) = | ||
match result with | ||
| Some _ when Interlocked.Increment exnCount = 1 -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's safe to have an inpure function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be surprised if it did. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only possibility for this to happen would be to have the continuation fire more than once, but that's hardly related to the use of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be better not to do this in a when clause. |
||
innerCts.Cancel(); trampolineHolder.Protect(fun () -> args.cont result) | ||
| None when Interlocked.Increment noneCount = computations.Length -> | ||
innerCts.Cancel(); trampolineHolder.Protect(fun () -> args.cont None) | ||
|
||
| _ -> FakeUnit | ||
|
||
let econt (exn : ExceptionDispatchInfo) = | ||
if Interlocked.Increment exnCount = 1 then | ||
innerCts.Cancel(); trampolineHolder.Protect(fun () -> args.aux.econt exn) | ||
else | ||
FakeUnit | ||
|
||
let ccont (exn : OperationCanceledException) = | ||
if Interlocked.Increment exnCount = 1 then | ||
innerCts.Cancel(); trampolineHolder.Protect(fun () -> args.aux.ccont exn) | ||
else | ||
FakeUnit | ||
|
||
for c in computations do | ||
queueAsync innerCts.Token scont econt ccont c |> unfake | ||
|
||
FakeUnit)) | ||
|
||
#if FX_NO_TASK | ||
#else | ||
// Contains helpers that will attach continuation to the given task. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -268,6 +268,13 @@ namespace Microsoft.FSharp.Control | |
/// <returns>A computation that returns an array of values from the sequence of input computations.</returns> | ||
static member Parallel : computations:seq<Async<'T>> -> Async<'T[]> | ||
|
||
/// <summary>Creates an asynchronous computation that executes all the given asynchronous computations, | ||
/// and returns the result of the first succeeding computation (i.e. the first computation with a result that is not None).</summary> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 43dd871 |
||
/// | ||
/// <param name="computationList">A sequence of distinct computations to be parallelized.</param> | ||
/// <returns>A computation that returns the first succeeding computation in the sequence of input computations.</returns> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The parameter name is wrong (should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in b81dea5 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed, we need to use wording in which 'first' clearly indicates the first to complete, as opposed to the first in the sequence. Any suggestions? |
||
static member Choice : computations:seq<Async<'T option>> -> Async<'T option> | ||
|
||
//---------- Thread Control | ||
|
||
/// <summary>Creates an asynchronous computation that creates a new thread and runs | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this "with"