Skip to content

Commit

Permalink
Cherry-pick the verbose-logging statements and non-rule-changing refa…
Browse files Browse the repository at this point in the history
…ctorings from #42 for smaller diffing
  • Loading branch information
abelbraaksma committed Oct 24, 2022
1 parent dce5a14 commit 651b143
Showing 1 changed file with 98 additions and 29 deletions.
127 changes: 98 additions & 29 deletions src/FSharpy.TaskSeq/TaskSeqBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ open FSharp.Core.CompilerServices.StateMachineHelpers
module Internal = // cannot be marked with 'internal' scope
let verbose = false

let inline MoveNext (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext()
/// Call MoveNext on an IAsyncStateMachine by reference
let inline moveNextRef (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext()

// F# requires that we implement interfaces even on an abstract class
let inline raiseNotImpl () =
Expand Down Expand Up @@ -63,17 +64,15 @@ type TaskSeqStateMachineData<'T>() =
[<DefaultValue(false)>]
val mutable tailcallTarget: TaskSeq<'T> option

member data.PushDispose(f: unit -> Task) =
match data.disposalStack with
| null -> data.disposalStack <- ResizeArray()
| _ -> ()
member data.PushDispose(disposer: unit -> Task) =
if isNull data.disposalStack then
data.disposalStack <- ResizeArray()

data.disposalStack.Add(f)
data.disposalStack.Add disposer

member data.PopDispose() =
match data.disposalStack with
| null -> ()
| _ -> data.disposalStack.RemoveAt(data.disposalStack.Count - 1)
if not (isNull data.disposalStack) then
data.disposalStack.RemoveAt(data.disposalStack.Count - 1)

and [<AbstractClass; NoEquality; NoComparison>] TaskSeq<'T>() =

Expand Down Expand Up @@ -117,6 +116,8 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T

match res with
| Some tg ->
// we get here only when there are multiple returns (it seems)
// hence the tailcall logic
match tg.TailcallTarget with
| None -> res
| (Some tg2 as res2) ->
Expand Down Expand Up @@ -153,21 +154,46 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T

member this.GetResult(token: int16) =
match this.hijack () with
| Some tg -> (tg :> IValueTaskSource<bool>).GetResult(token)
| None -> this.Machine.Data.promiseOfValueOrEnd.GetResult(token)

| Some tg ->
if verbose then
printfn
"Getting result for token on 'Some' branch, status: %A"
((tg :> IValueTaskSource<bool>).GetStatus(token))
(tg :> IValueTaskSource<bool>).GetResult(token)
| None ->
try
if verbose then
printfn
"Getting result for token on 'None' branch, status: %A"
(this.Machine.Data.promiseOfValueOrEnd.GetStatus(token))
this.Machine.Data.promiseOfValueOrEnd.GetResult(token)
with e ->
// FYI: an exception here is usually caused by the CE statement (user code) throwing an exception
// We're just logging here because the following error would also be caught right here:
// "An attempt was made to transition a task to a final state when it had already completed."
if verbose then
printfn "Error '%s' for token: %i" e.Message token

reraise ()
member this.OnCompleted(continuation, state, token, flags) =
match this.hijack () with
| Some tg -> (tg :> IValueTaskSource<bool>).OnCompleted(continuation, state, token, flags)
| None -> this.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags)

interface IAsyncStateMachine with
/// The MoveNext method is called by builder.MoveNext() in the resumable code
member this.MoveNext() =
match this.hijack () with
| Some tg -> (tg :> IAsyncStateMachine).MoveNext()
| None -> MoveNext(&this.Machine)
| Some tg ->
// jump to the hijacked method
(tg :> IAsyncStateMachine).MoveNext()
| None -> moveNextRef &this.Machine

member _.SetStateMachine(_state) = () // not needed for reference type
/// SetStatemachine is (currently) never called
member _.SetStateMachine(_state) =
if verbose then
printfn "Setting state machine -- ignored"
() // not needed for reference type

interface IAsyncEnumerable<'T> with
member this.GetAsyncEnumerator(ct) =
Expand All @@ -180,11 +206,17 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
data.taken <- true
data.cancellationToken <- ct
data.builder <- AsyncIteratorMethodBuilder.Create()
if verbose then
printfn "No cloning, resumption point: %i" this.Machine.ResumptionPoint
(this :> IAsyncEnumerator<_>)
else
if verbose then
printfn "GetAsyncEnumerator, cloning..."

// it appears that the issue is possibly caused by the problem
// of having ValueTask all over the place, and by going over the
// iteration twice, we are trying to *await* twice, which is not allowed
// see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4
let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T>
data.taken <- true
clone.Machine.Data.cancellationToken <- ct
Expand Down Expand Up @@ -234,13 +266,21 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
printfn "MoveNextAsync..."

if this.Machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable
if verbose then
printfn "at MoveNextAsync: Resumption point = -1"
ValueTask<bool>()
else
if verbose then
printfn "at MoveNextAsync: normal resumption scenario"
let data = this.Machine.Data
data.promiseOfValueOrEnd.Reset()
let mutable ts = this

if verbose then
printfn "at MoveNextAsync: start calling builder.MoveNext()"
data.builder.MoveNext(&ts)
if verbose then
printfn "at MoveNextAsync: done calling builder.MoveNext()"

// If the move did a hijack then get the result from the final one
match this.hijack () with
Expand All @@ -253,14 +293,25 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
let version = data.promiseOfValueOrEnd.Version
let status = data.promiseOfValueOrEnd.GetStatus(version)

if status = ValueTaskSourceStatus.Succeeded then
match status with
| ValueTaskSourceStatus.Succeeded ->
if verbose then
printfn "at MoveNextAsyncResult: case succeeded..."
let result = data.promiseOfValueOrEnd.GetResult(version)
ValueTask<bool>(result)
else

| ValueTaskSourceStatus.Faulted
| ValueTaskSourceStatus.Canceled
| ValueTaskSourceStatus.Pending ->
if verbose then
printfn "MoveNextAsync pending/faulted/cancelled..."
printfn "at MoveNextAsyncResult: case pending/faulted/cancelled..."

ValueTask<bool>(this, version) // uses IValueTaskSource<'T>
| _ ->
if verbose then
printfn "at MoveNextAsyncResult: Unexpected state"
// assume it's a possibly new, not yet supported case, treat as default
ValueTask<bool>(this, version) // uses IValueTaskSource<'T>

override cr.TailcallTarget = cr.hijack ()

Expand All @@ -282,43 +333,57 @@ type TaskSeqBuilder() =
//-- RESUMABLE CODE START
__resumeAt sm.ResumptionPoint

if verbose then
printfn "Resuming at resumption point %i" sm.ResumptionPoint
try
//printfn "at Run.MoveNext start"
//Console.WriteLine("[{0}] resuming by invoking {1}....", sm.MethodBuilder.Task.Id, hashq sm.ResumptionFunc )
if verbose then
printfn "at Run.MoveNext start"

let __stack_code_fin = code.Invoke(&sm)
//printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}"
if verbose then
printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}"
if __stack_code_fin then
//printfn $"at Run.MoveNext, done"
if verbose then
printfn $"at Run.MoveNext, done"
sm.Data.promiseOfValueOrEnd.SetResult(false)
sm.Data.builder.Complete()
elif sm.Data.current.IsSome then
//printfn $"at Run.MoveNext, yield"
if verbose then
printfn $"at Run.MoveNext, yield"
sm.Data.promiseOfValueOrEnd.SetResult(true)
else
// Goto request
match sm.Data.tailcallTarget with
| Some tg ->
//printfn $"at Run.MoveNext, hijack"
if verbose then
printfn $"at Run.MoveNext, hijack"
let mutable tg = tg
MoveNext(&tg)
moveNextRef &tg
| None ->
//printfn $"at Run.MoveNext, await"
if verbose then
printfn $"at Run.MoveNext, await"
let boxed = sm.Data.boxed

sm.Data.awaiter.UnsafeOnCompleted(
Action(fun () ->
let mutable boxed = boxed
MoveNext(&boxed))
moveNextRef &boxed)
)

with exn ->
//Console.WriteLine("[{0}] SetException {1}", sm.MethodBuilder.Task.Id, exn)
if verbose then
printfn "Setting exception of PromiseOfValueOrEnd to: %s" exn.Message
sm.Data.promiseOfValueOrEnd.SetException(exn)
sm.Data.builder.Complete()
//-- RESUMABLE CODE END
))
(SetStateMachineMethodImpl<_>(fun sm state -> ()))
(SetStateMachineMethodImpl<_>(fun sm state ->
if verbose then
printfn "at SetStatemachingMethodImpl, ignored"
()))
(AfterCode<_, _>(fun sm ->
if verbose then
printfn "at AfterCode<_, _>, setting the Machine field to the StateMachine"
let ts = TaskSeq<TaskSeqStateMachine<'T>, 'T>()
ts.Machine <- sm
ts.Machine.Data <- TaskSeqStateMachineData()
Expand Down Expand Up @@ -355,9 +420,13 @@ type TaskSeqBuilder() =
let __stack_vtask = condition ()

if __stack_vtask.IsCompleted then
if verbose then
printfn "Returning completed task (in while)"
__stack_condition_fin <- true
condition_res <- __stack_vtask.Result
else
if verbose then
printfn "Awaiting non-completed task (in while)"
let task = __stack_vtask.AsTask()
let mutable awaiter = task.GetAwaiter()
// This will yield with __stack_fin = false
Expand Down

0 comments on commit 651b143

Please sign in to comment.