-
Notifications
You must be signed in to change notification settings - Fork 790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Async.Choice #807
Changes from 6 commits
39527b9
c4bdb2d
54de8fa
43dd871
8b4cdd4
4f44983
e758c51
b81dea5
28d0c0c
1da20e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,8 +6,126 @@ | |
namespace FSharp.Core.Unittests.FSharp_Core.Microsoft_FSharp_Control | ||
|
||
open System | ||
open System.Threading | ||
open FSharp.Core.Unittests.LibraryTestFx | ||
open NUnit.Framework | ||
#if !(FSHARP_CORE_PORTABLE || FSHARP_CORE_NETCORE_PORTABLE) | ||
open FsCheck | ||
|
||
[<AutoOpen>] | ||
module ChoiceUtils = | ||
|
||
// FsCheck driven Async.Choice tests | ||
|
||
exception ChoiceExn of index:int | ||
|
||
/// represents a child computation of a choice workflow | ||
type ChoiceOp = | ||
| NoneResultAfter of timeout:int | ||
| SomeResultAfter of timeout:int | ||
| ExceptionAfter of timeout:int | ||
with | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove this "with" |
||
member c.Timeout = | ||
match c with | ||
| NoneResultAfter t -> t | ||
| SomeResultAfter t -> t | ||
| ExceptionAfter t -> t | ||
|
||
/// represent a choice worfklow | ||
type ChoiceWorkflow = ChoiceWorkflow of children:ChoiceOp list * cancelAfter:int option | ||
|
||
/// normalizes random timeout arguments | ||
let normalize (ChoiceWorkflow(ops, cancelAfter)) = | ||
let ms t = 2000 * (abs t % 15) // timeouts only positive multiples of 2 seconds, up to 30 seconds | ||
let mkOp op = | ||
match op with | ||
| NoneResultAfter t -> NoneResultAfter (ms t) | ||
| SomeResultAfter t -> SomeResultAfter (ms t) | ||
| ExceptionAfter t -> ExceptionAfter (ms t) | ||
|
||
let ops = ops |> List.map mkOp | ||
let cancelAfter = cancelAfter |> Option.map ms | ||
ChoiceWorkflow(ops, cancelAfter) | ||
|
||
/// runs specified choice workflow and checks that | ||
/// Async.Choice spec is satisfied | ||
let runChoice (ChoiceWorkflow(ops, cancelAfter)) = | ||
// Step 1. build a choice workflow from the abstract representation | ||
let completed = ref 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can use F# 4 i think, so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm OK with refs being explicit if the author wishes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dsyme you too in the |
||
let returnAfter time f = async { | ||
do! Async.Sleep time | ||
let _ = Interlocked.Increment completed | ||
return f () | ||
} | ||
|
||
let mkOp (index : int) = function | ||
| NoneResultAfter t -> returnAfter t (fun () -> None) | ||
| SomeResultAfter t -> returnAfter t (fun () -> Some index) | ||
| ExceptionAfter t -> returnAfter t (fun () -> raise <| ChoiceExn index) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think |
||
|
||
let choiceWorkflow = ops |> List.mapi mkOp |> Async.Choice | ||
|
||
// Step 2. run the choice workflow and keep the results | ||
let result = | ||
let cancellationToken = | ||
match cancelAfter with | ||
| Some ca -> | ||
let cts = new CancellationTokenSource() | ||
cts.CancelAfter(ca) | ||
Some cts.Token | ||
| None -> None | ||
|
||
try Async.RunSynchronously(choiceWorkflow, ?cancellationToken = cancellationToken) |> Choice1Of2 | ||
with e -> Choice2Of2 e | ||
|
||
// Step 3. check that results are up to spec | ||
let getMinTime() = | ||
seq { | ||
yield Int32.MaxValue // "infinity": avoid exceptions if list is empty | ||
|
||
for op in ops do | ||
match op with | ||
| NoneResultAfter _ -> () | ||
| op -> yield op.Timeout | ||
|
||
match cancelAfter with Some t -> yield t | None -> () | ||
} |> Seq.min | ||
|
||
let verifyIndex index = | ||
if index < 0 || index >= ops.Length then | ||
Assert.Fail "Returned choice index is out of bounds." | ||
|
||
// Step 3a. check that output is up to spec | ||
match result with | ||
| Choice1Of2 (Some index) -> | ||
verifyIndex index | ||
match ops.[index] with | ||
| SomeResultAfter timeout -> Assert.AreEqual(getMinTime(), timeout) | ||
| op -> Assert.Fail <| sprintf "Should be 'Some' but got %A" op | ||
|
||
| Choice1Of2 None -> | ||
Assert.True(ops |> List.forall (function NoneResultAfter _ -> true | _ -> false)) | ||
|
||
| Choice2Of2 (:? OperationCanceledException) -> | ||
match cancelAfter with | ||
| None -> Assert.Fail "Got unexpected cancellation exception." | ||
| Some ca -> Assert.AreEqual(getMinTime(), ca) | ||
|
||
| Choice2Of2 (ChoiceExn index) -> | ||
verifyIndex index | ||
match ops.[index] with | ||
| ExceptionAfter timeout -> Assert.AreEqual(getMinTime(), timeout) | ||
| op -> Assert.Fail <| sprintf "Should be 'Exception' but got %A" op | ||
|
||
| Choice2Of2 e -> Assert.Fail(sprintf "Unexpected exception %O" e) | ||
|
||
// Step 3b. check that nested cancellation happens as expected | ||
if not <| List.isEmpty ops then | ||
let minTimeout = getMinTime() | ||
let minTimeoutOps = ops |> Seq.filter (fun op -> op.Timeout <= minTimeout) |> Seq.length | ||
Assert.LessOrEqual(!completed, minTimeoutOps) | ||
|
||
#endif | ||
|
||
module LeakUtils = | ||
// when testing for liveness, the things that we want to observe must always be created in | ||
|
@@ -294,6 +412,11 @@ type AsyncModule() = | |
member this.``RaceBetweenCancellationAndError.Sleep``() = | ||
testErrorAndCancelRace (Async.Sleep (-5)) | ||
|
||
#if !(FSHARP_CORE_PORTABLE || FSHARP_CORE_NETCORE_PORTABLE) | ||
[<Test>] | ||
member this.``Async.Choice correctness and cancellation``() = | ||
Check.QuickThrowOnFailure (normalize >> runChoice) | ||
#endif | ||
|
||
[<Test>] | ||
member this.``error on one workflow should cancel all others``() = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1462,6 +1462,50 @@ namespace Microsoft.FSharp.Control | |
|> unfake); | ||
FakeUnit)) | ||
|
||
static member Choice(computations : Async<'T option> seq) : Async<'T option> = | ||
unprotectedPrimitive(fun args -> | ||
let result = | ||
try Choice1Of2 <| Seq.toArray computations | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please just use a forward pipe. We don't use much back piping in the F# compiler or FSharp.Core, |
||
with exn -> Choice2Of2 <| ExceptionDispatchInfo.RestoreOrCapture(exn) | ||
|
||
match result with | ||
| Choice2Of2 edi -> args.aux.econt edi | ||
| Choice1Of2 [||] -> args.cont None | ||
| Choice1Of2 [|P body|] -> body args | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line reflects the original fssnip implementation: if given a singleton array as argument, simply pass your current continuation to it. This does not happen in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think just remove this case. It's better to run all operations consistently through the thread pool. |
||
| Choice1Of2 computations -> | ||
protectedPrimitiveCore args (fun args -> | ||
let ({ aux = aux } as args) = delimitSyncContext args | ||
let noneCount = ref 0 | ||
let exnCount = ref 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mutable instead of ref There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that for the sake of clarity, this should remain an explicit ref cell allocation. It is used precisely to synchronize across multiple threads, hence should be unambiguously heap allocated. In any case, changing this would not change the underlying IL. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. eirik I think we both are now those old grumpy people who will continue to use ref cells even if mutable might work as well ;-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Damn kids with your mutable bindings There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm in your club too, Enrico also points me to |
||
let innerCts = new LinkedSubSource(aux.token) | ||
let trampolineHolder = aux.trampolineHolder | ||
|
||
let scont (result : 'T option) = | ||
match result with | ||
| Some _ when Interlocked.Increment exnCount = 1 -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's safe to have an inpure function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be surprised if it did. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only possibility for this to happen would be to have the continuation fire more than once, but that's hardly related to the use of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be better not to do this in a when clause. |
||
innerCts.Cancel(); trampolineHolder.Protect(fun () -> args.cont result) | ||
| None when Interlocked.Increment noneCount = computations.Length -> | ||
innerCts.Cancel(); trampolineHolder.Protect(fun () -> args.cont None) | ||
|
||
| _ -> FakeUnit | ||
|
||
let econt (exn : ExceptionDispatchInfo) = | ||
if Interlocked.Increment exnCount = 1 then | ||
innerCts.Cancel(); trampolineHolder.Protect(fun () -> args.aux.econt exn) | ||
else | ||
FakeUnit | ||
|
||
let ccont (exn : OperationCanceledException) = | ||
if Interlocked.Increment exnCount = 1 then | ||
innerCts.Cancel(); trampolineHolder.Protect(fun () -> args.aux.ccont exn) | ||
else | ||
FakeUnit | ||
|
||
for c in computations do | ||
queueAsync innerCts.Token scont econt ccont c |> unfake | ||
|
||
FakeUnit)) | ||
|
||
#if FX_NO_TASK | ||
#else | ||
// Contains helpers that will attach continuation to the given task. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -268,6 +268,21 @@ namespace Microsoft.FSharp.Control | |
/// <returns>A computation that returns an array of values from the sequence of input computations.</returns> | ||
static member Parallel : computations:seq<Async<'T>> -> Async<'T[]> | ||
|
||
/// <summary>Creates an asynchronous computation that executes all given asynchronous computations in parallel, | ||
/// returning the result of the first succeeding computation (i.e. the first computation with a result that is 'Some x').</summary> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This summary is a bit long. Normally it is one sentence, no i.e. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I basically just copied the format from the corresponding Async.Parallel segment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i dont know if better, but like it's used by xmldoc generator i think |
||
/// | ||
/// <remarks>If all child computations complete with None, the parent computation also returns None. | ||
/// | ||
/// If any child computation raises an exception, then the overall computation will trigger an | ||
/// exception, and cancel the others. | ||
/// | ||
/// The overall computation will respond to cancellation while executing the child computations. | ||
/// If cancelled, the computation will cancel any remaining child computations but will still wait | ||
/// for the other child computations to complete.</remarks> | ||
/// <param name="computationList">A sequence of computations to be parallelized.</param> | ||
/// <returns>A computation that returns the first succeeding computation in the sequence of input computations.</returns> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The parameter name is wrong (should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in b81dea5 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed, we need to use wording in which 'first' clearly indicates the first to complete, as opposed to the first in the sequence. Any suggestions? |
||
static member Choice : computations:seq<Async<'T option>> -> Async<'T option> | ||
|
||
//---------- Thread Control | ||
|
||
/// <summary>Creates an asynchronous computation that creates a new thread and runs | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this not available in Portable profile 7?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build fails because it can't find the FsCheck dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it should go in each of the profiles, if it's a test issue ... perhaps test a different way or fix the test issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, if we're using FsCheck we should make it work on PCLs too? I understand that's a bigger issue, but maybe link to an FsCheck issue that we can track?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An easy solution for now would be to incorporate the non-FsCheck tests from #744