Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cancellation for PostAndAsyncReply #4477

Merged
merged 2 commits into from
Mar 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 74 additions & 72 deletions src/fsharp/FSharp.Core/control.fs
Original file line number Diff line number Diff line change
Expand Up @@ -329,21 +329,13 @@ namespace Microsoft.FSharp.Control



// Reify exceptional results as exceptions
/// Reify exceptional results as exceptions
let commit res =
match res with
| Ok res -> res
| Error edi -> edi.ThrowAny()
| Canceled exn -> raise exn

// Reify exceptional results as exceptionsJIT 64 doesn't always take tailcalls correctly

let commitWithPossibleTimeout res =
match res with
| None -> raise (System.TimeoutException())
| Some res -> commit res


//----------------------------------
// PRIMITIVE ASYNC INVOCATION

Expand Down Expand Up @@ -713,11 +705,15 @@ namespace Microsoft.FSharp.Control
[<AutoSerializable(false)>]
type ResultCell<'T>() =
let mutable result = None

// The continuations for the result
let mutable savedConts : list<SuspendedAsync<'T>> = []

// The WaitHandle event for the result. Only created if needed, and set to null when disposed.
let mutable resEvent = null

let mutable disposed = false

// All writers of result are protected by lock on syncRoot.
let syncRoot = new Object()

Expand Down Expand Up @@ -752,13 +748,11 @@ namespace Microsoft.FSharp.Control
interface IDisposable with
member x.Dispose() = x.Close() // ; System.GC.SuppressFinalize(x)


member x.GrabResult() =
match result with
| Some res -> res
| None -> failwith "Unexpected no result"


/// Record the result in the ResultCell.
member x.RegisterResult (res:'T, reuseThread) =
let grabbedConts =
Expand Down Expand Up @@ -795,7 +789,10 @@ namespace Microsoft.FSharp.Control

member x.ResultAvailable = result.IsSome

member x.AwaitResult =
/// Await the result of a result cell, without a direct timeout or direct
/// cancellation. That is, the underlying computation must fill the result
/// if cancellation or timeout occurs.
member x.AwaitResult_NoDirectCancelOrTimeout =
unprotectedPrimitive(fun args ->
// Check if a result is available synchronously
let resOpt =
Expand Down Expand Up @@ -860,10 +857,10 @@ namespace Microsoft.FSharp.Control
// If timeout is provided, we govern the async by our own CTS, to cancel
// when execution times out. Otherwise, the user-supplied token governs the async.
match timeout with
| None -> token,None
| Some _ ->
let subSource = new LinkedSubSource(token)
subSource.Token, Some subSource
| None -> token, None
| Some _ ->
let subSource = new LinkedSubSource(token)
subSource.Token, Some subSource

use resultCell = new ResultCell<AsyncImplResult<_>>()
queueAsync
Expand Down Expand Up @@ -1252,7 +1249,8 @@ namespace Microsoft.FSharp.Control
aux.econt edi
)

static member AwaitWaitHandle(waitHandle:WaitHandle,?millisecondsTimeout:int) =
/// Wait for a wait handle. Both timeout and cancellation are supported
static member AwaitWaitHandle(waitHandle: WaitHandle, ?millisecondsTimeout:int) =
let millisecondsTimeout = defaultArg millisecondsTimeout Threading.Timeout.Infinite
if millisecondsTimeout = 0 then
async.Delay(fun () ->
Expand Down Expand Up @@ -1312,61 +1310,61 @@ namespace Microsoft.FSharp.Control
return! Async.AwaitWaitHandle(iar.AsyncWaitHandle, ?millisecondsTimeout=millisecondsTimeout) }


/// Await the result of a result cell without a timeout
static member ReifyResult(result:AsyncImplResult<'T>) : Async<'T> =
/// Bind the result of a result cell, calling the appropriate continuation.
static member BindResult(result: AsyncImplResult<'T>) : Async<'T> =
unprotectedPrimitive(fun ({ aux = aux } as args) ->
(match result with
| Ok v -> args.cont v
| Error exn -> aux.econt exn
| Canceled exn -> aux.ccont exn) )

/// Await the result of a result cell without a timeout
static member AwaitAndReifyResult(resultCell:ResultCell<AsyncImplResult<'T>>) : Async<'T> =
/// Await and use the result of a result cell. The resulting async doesn't support cancellation
/// or timeout directly, rather the underlying computation must fill the result if cancellation
/// or timeout occurs.
static member AwaitAndBindResult_NoDirectCancelOrTimeout(resultCell: ResultCell<AsyncImplResult<'T>>) : Async<'T> =
async {
let! result = resultCell.AwaitResult
return! Async.ReifyResult(result)
let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout
return! Async.BindResult(result)
}



/// Await the result of a result cell without a timeout
///
/// Always resyncs to the synchronization context if needed, by virtue of it being built
/// from primitives which resync.
static member AsyncWaitAsyncWithTimeout(innerCTS : CancellationTokenSource, resultCell:ResultCell<AsyncImplResult<'T>>,millisecondsTimeout) : Async<'T> =
/// Await the result of a result cell belonging to a child computation. The resulting async supports timeout and if
/// it happens the child computation will be cancelled. The resulting async doesn't support cancellation
/// directly, rather the underlying computation must fill the result if cancellation occurs.
static member AwaitAndBindChildResult(innerCTS: CancellationTokenSource, resultCell: ResultCell<AsyncImplResult<'T>>, millisecondsTimeout) : Async<'T> =
match millisecondsTimeout with
| None | Some -1 ->
resultCell |> Async.AwaitAndReifyResult
resultCell |> Async.AwaitAndBindResult_NoDirectCancelOrTimeout

| Some 0 ->
async { if resultCell.ResultAvailable then
return commit (resultCell.GrabResult())
else
return commitWithPossibleTimeout None }
return raise (System.TimeoutException()) }
| _ ->
async { try
if resultCell.ResultAvailable then
return commit (resultCell.GrabResult())
else
let! ok = Async.AwaitWaitHandle (resultCell.GetWaitHandle(),?millisecondsTimeout=millisecondsTimeout)
let! ok = Async.AwaitWaitHandle (resultCell.GetWaitHandle(), ?millisecondsTimeout=millisecondsTimeout)
if ok then
return commitWithPossibleTimeout (Some (resultCell.GrabResult()))
return commit (resultCell.GrabResult())
else // timed out
// issue cancellation signal
innerCTS.Cancel()
// wait for computation to quiesce
let! _ = Async.AwaitWaitHandle (resultCell.GetWaitHandle())
return commitWithPossibleTimeout None
return raise (System.TimeoutException())
finally
resultCell.Close() }


static member FromBeginEnd(beginAction,endAction,?cancelAction): Async<'T> =
static member FromBeginEnd(beginAction, endAction, ?cancelAction): Async<'T> =
async { let! cancellationToken = getCancellationToken()
let resultCell = new ResultCell<_>()

let once = Once()
let registration : CancellationTokenRegistration =

let onCancel (_:obj) =
// Call the cancellation routine
match cancelAction with
Expand All @@ -1381,7 +1379,9 @@ namespace Microsoft.FSharp.Control
// If we get an exception from a cooperative cancellation function
// we assume the operation has already completed.
try cancel() with _ -> ()

cancellationToken.Register(Action<obj>(onCancel), null)

let callback =
new System.AsyncCallback(fun iar ->
if not iar.CompletedSynchronously then
Expand All @@ -1405,15 +1405,15 @@ namespace Microsoft.FSharp.Control
// ResultCell allows a race and throws away whichever comes last.
resultCell.RegisterResult(res,reuseThread=true) |> unfake
else ())



let (iar:IAsyncResult) = beginAction (callback,(null:obj))
if iar.CompletedSynchronously then
registration.Dispose()
return endAction iar
else
return! Async.AwaitAndReifyResult(resultCell) }
// Note: ok to use "NoDirectCancel" here because cancellation has been registered above
// Note: ok to use "NoDirectTimeout" here because no timeout parameter to this method
return! Async.AwaitAndBindResult_NoDirectCancelOrTimeout(resultCell) }


static member FromBeginEnd(arg,beginAction,endAction,?cancelAction): Async<'T> =
Expand Down Expand Up @@ -1567,7 +1567,9 @@ namespace Microsoft.FSharp.Control
event.AddHandler(del)

// Return the async computation that allows us to await the result
return! Async.AwaitAndReifyResult(resultCell) }
// Note: ok to use "NoDirectCancel" here because cancellation has been registered above
// Note: ok to use "NoDirectTimeout" here because no timeout parameter to this method
return! Async.AwaitAndBindResult_NoDirectCancelOrTimeout(resultCell) }

type Async with
static member Ignore (computation: Async<'T>) = bindA computation (fun _ -> doneA)
Expand Down Expand Up @@ -1597,7 +1599,7 @@ namespace Microsoft.FSharp.Control
computation
|> unfake

return Async.AsyncWaitAsyncWithTimeout(innerCTS, resultCell,millisecondsTimeout) }
return Async.AwaitAndBindChildResult(innerCTS, resultCell, millisecondsTimeout) }

static member SwitchToContext syncContext =
async { match syncContext with
Expand Down Expand Up @@ -1681,10 +1683,6 @@ namespace Microsoft.FSharp.Control
Async.FromBeginEnd (buffer,offset,count,stream.BeginWrite,stream.EndWrite)
#endif

type System.Threading.WaitHandle with
member waitHandle.AsyncWaitOne(?millisecondsTimeout:int) = // only used internally, not a public API
Async.AwaitWaitHandle(waitHandle,?millisecondsTimeout=millisecondsTimeout)

type IObservable<'Args> with

[<CompiledName("AddToObservable")>] // give the extension member a 'nice', unmangled compiled name, unique within this module
Expand Down Expand Up @@ -1715,7 +1713,7 @@ namespace Microsoft.FSharp.Control
| :? System.Net.WebException as webExn
when webExn.Status = System.Net.WebExceptionStatus.RequestCanceled && !canceled ->

Async.ReifyResult(AsyncImplResult.Canceled (OperationCanceledException webExn.Message))
Async.BindResult(AsyncImplResult.Canceled (OperationCanceledException webExn.Message))
| _ ->
edi.ThrowAny())

Expand Down Expand Up @@ -1791,7 +1789,10 @@ namespace Microsoft.FSharp.Control
)
start a1 Choice1Of2
start a2 Choice2Of2
let! result = c.AwaitResult
// Note: It is ok to use "NoDirectCancel" here because the started computations use the same
// cancellation token and will register a cancelled result if cancellation occurs.
// Note: It is ok to use "NoDirectTimeout" here because there is no specific timeout log to this routine.
let! result = c.AwaitResult_NoDirectCancelOrTimeout
return! reify result
}
let timeout msec cancellationToken =
Expand All @@ -1805,7 +1806,10 @@ namespace Microsoft.FSharp.Control
exceptionContinuation=ignore,
cancellationContinuation=ignore,
cancellationToken = cancellationToken)
c.AwaitResult
// Note: It is ok to use "NoDirectCancel" here because the started computations use the same
// cancellation token and will register a cancelled result if cancellation occurs.
// Note: It is ok to use "NoDirectTimeout" here because the child compuation above looks after the timeout.
c.AwaitResult_NoDirectCancelOrTimeout

[<Sealed>]
[<AutoSerializable(false)>]
Expand Down Expand Up @@ -1854,7 +1858,7 @@ namespace Microsoft.FSharp.Control
failwith "multiple waiting reader continuations for mailbox")

let waitOneWithCancellation(timeout) =
ensurePulse().AsyncWaitOne(millisecondsTimeout=timeout)
Async.AwaitWaitHandle(ensurePulse(), millisecondsTimeout=timeout)

let waitOne(timeout) =
if timeout < 0 && not cancellationSupported then
Expand Down Expand Up @@ -2125,36 +2129,34 @@ namespace Microsoft.FSharp.Control
let msg = buildMessage (new AsyncReplyChannel<_>(fun reply ->
// Note the ResultCell may have been disposed if the operation
// timed out. In this case RegisterResult drops the result on the floor.
resultCell.RegisterResult(reply,reuseThread=false) |> unfake))
resultCell.RegisterResult(reply, reuseThread=false) |> unfake))
mailbox.Post(msg)
match timeout with
| Threading.Timeout.Infinite ->
async { let! result = resultCell.AwaitResult
return Some(result)
}
| Threading.Timeout.Infinite when not cancellationSupported ->
async { let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout
return Some result }

| _ ->
async { use _disposeCell = resultCell
let! ok = resultCell.GetWaitHandle().AsyncWaitOne(millisecondsTimeout=timeout)
let res = (if ok then Some(resultCell.GrabResult()) else None)
return res }
| _ ->
async { use _disposeCell = resultCell
let! ok = Async.AwaitWaitHandle(resultCell.GetWaitHandle(), millisecondsTimeout=timeout)
let res = (if ok then Some(resultCell.GrabResult()) else None)
return res }

member x.PostAndAsyncReply(buildMessage, ?timeout:int) =
let timeout = defaultArg timeout defaultTimeout
match timeout with
| Threading.Timeout.Infinite ->
// Nothing to dispose, no wait handles used
let resultCell = new ResultCell<_>()
let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply,reuseThread=false) |> unfake))
mailbox.Post(msg)
resultCell.AwaitResult
| _ ->
let asyncReply = x.PostAndTryAsyncReply(buildMessage,timeout=timeout)
async { let! res = asyncReply
match res with
| None -> return! raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndAsyncReplyTimedOut)))
| Some res -> return res
}
| Threading.Timeout.Infinite when not cancellationSupported ->
// Nothing to dispose, no wait handles used
let resultCell = new ResultCell<_>()
let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply,reuseThread=false) |> unfake))
mailbox.Post(msg)
resultCell.AwaitResult_NoDirectCancelOrTimeout
| _ ->
let asyncReply = x.PostAndTryAsyncReply(buildMessage,timeout=timeout)
async { let! res = asyncReply
match res with
| None -> return! raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndAsyncReplyTimedOut)))
| Some res -> return res }

member x.Receive(?timeout) = mailbox.Receive(timeout=defaultArg timeout defaultTimeout)
member x.TryReceive(?timeout) = mailbox.TryReceive(timeout=defaultArg timeout defaultTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,5 @@ type CancellationType() =
Assert.IsFalse((r1a <> r1a'))
Assert.IsTrue((r1a <> r1b))
Assert.IsTrue((r1a <> r2))


Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,43 @@ type MailboxProcessorType() =

test()

[<Test>]
member this.PostAndAsyncReply_Cancellation() =

use cancel = new CancellationTokenSource(500)
let mutable gotGood = false
let mutable gotBad = false

let goodAsync = async {
try
for i in Seq.initInfinite (fun i -> i) do
if i % 10000000 = 0 then
printfn "good async working..."
finally
printfn "good async exited - that's what we want"
gotGood <- true
}

let badAsync (mbox:MailboxProcessor<AsyncReplyChannel<int>>) = async {
try
printfn "bad async working..."
let! result = mbox.PostAndAsyncReply id // <- got stuck in here forever
printfn "%d" result
finally
printfn "bad async exited - that's what we want" // <- we never got here
gotBad <- true
}

let mbox = MailboxProcessor.Start(fun inbox -> async {
let! (reply : AsyncReplyChannel<int>) = inbox.Receive()
do! Async.Sleep 1000000
reply.Reply (200)
}, cancel.Token)

[goodAsync; badAsync mbox]
|> Async.Parallel
|> Async.Ignore
|> fun x -> Async.Start(x, cancel.Token)
System.Threading.Thread.Sleep(1000) // cancellation after 500
if not gotGood || not gotBad then
failwith "Exected both good and bad async's to be cancelled afteMailbox should not fail!"