Skip to content

Commit

Permalink
Exit coroutine in order (#1362)
Browse files Browse the repository at this point in the history
Exit coroutine in order
  • Loading branch information
Quinn-With-Two-Ns authored Jan 22, 2024
1 parent 14ddd7b commit 30b2681
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 11 deletions.
75 changes: 75 additions & 0 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync/atomic"

"testing"
"time"

Expand Down Expand Up @@ -71,6 +74,78 @@ func TestDispatcher(t *testing.T) {
require.Equal(t, "bar", value)
}

func TestDispatcherDeferClose(t *testing.T) {
var value atomic.Bool
d := createNewDispatcher(func(ctx Context) {
// Block all coroutines on this channel
c1 := NewChannel(ctx)
defer func() {
value.Store(true)
}()
c1.Receive(ctx, nil)
})
defer d.Close()
require.Equal(t, false, value.Load())
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
// Closing the dispatcher will cause the blocked goroutine to stop executing, but defers
// will still run.
d.Close()
require.True(t, d.IsClosed())
require.Eventually(t, value.Load, time.Second, 10*time.Millisecond)
}

func TestDispatcherDeadlockedDefer(t *testing.T) {
var value atomic.Bool
d := createNewDispatcher(func(ctx Context) {
// Block all coroutines on this channel
c1 := NewChannel(ctx)
defer func() {
// The blocking defer should not block the dispatcher closing
time.Sleep(time.Hour)
value.Store(true)
}()
c1.Receive(ctx, nil)
})
require.Equal(t, false, value.Load())
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
// Closing the dispatcher will cause the blocked goroutine to stop executing, but defers
// will still run.
d.Close()
require.True(t, d.IsClosed())
require.Equal(t, false, value.Load())
}

func TestDispatcherDeferCloseRace(t *testing.T) {
var value atomic.Int32
var d dispatcher
d = createNewDispatcher(func(ctx Context) {
// Block all coroutines on this channel
c1 := NewChannel(ctx)
for i := 0; i < 100; i++ {
index := i
id := "coroutine_" + strconv.Itoa(index)
d.NewCoroutine(ctx, id, false, func(ctx Context) {
defer func() {
value.Store(int32(index))
}()
c1.Receive(ctx, nil)
})
}
c1.Receive(ctx, nil)
})
defer d.Close()

require.Equal(t, int32(0), value.Load())
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
// Closing the dispatcher will cause the blocked coroutine to stop executing, but defers
// will still run.
d.Close()
require.True(t, d.IsClosed())
require.Eventually(t, func() bool {
return value.Load() == int32(99)
}, time.Second, 10*time.Millisecond)
}

func TestNonBlockingChildren(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
Expand Down
1 change: 1 addition & 0 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ type (
OnWorkflowTaskStarted(deadlockDetectionTimeout time.Duration)
// StackTrace of all coroutines owned by the Dispatcher instance.
StackTrace() string
// Close destroys all coroutines without waiting for their completion
Close()
}

Expand Down
32 changes: 21 additions & 11 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import (
)

const (
defaultSignalChannelSize = 100000 // really large buffering size(100K)
defaultSignalChannelSize = 100000 // really large buffering size(100K)
defaultCoroutineExitTimeout = 100 * time.Millisecond

panicIllegalAccessCoroutineState = "getState: illegal access from outside of workflow context"
)
Expand Down Expand Up @@ -89,6 +90,7 @@ type (
ExecuteUntilAllBlocked(deadlockDetectionTimeout time.Duration) (err error)
// IsDone returns true when all of coroutines are completed
IsDone() bool
IsClosed() bool
IsExecuting() bool
Close() // Destroys all coroutines without waiting for their completion
StackTrace() string // Stack trace of all coroutines owned by the Dispatcher instance
Expand Down Expand Up @@ -1075,12 +1077,22 @@ func (s *coroutineState) close() {
s.aboutToBlock <- true
}

func (s *coroutineState) exit() {
// exit tries to run Goexit on the coroutine and wait for it to exit
// within timeout.
func (s *coroutineState) exit(timeout time.Duration) {
if !s.closed.Load() {
s.unblock <- func(status string, stackDepth int) bool {
runtime.Goexit()
return true
}

timer := time.NewTimer(timeout)
defer timer.Stop()

select {
case <-s.aboutToBlock:
case <-timer.C:
}
}
}

Expand Down Expand Up @@ -1235,16 +1247,14 @@ func (d *dispatcherImpl) Close() {
}
d.closed = true
d.mutex.Unlock()
// This loop breaks our expectation that only one workflow
// coroutine is running at any time because it triggers all workflow goroutines
// to call their defers at once. Adding synchronization seemed more problematic because
// it could block eviction if there was a deadlock.
for i := 0; i < len(d.coroutines); i++ {
c := d.coroutines[i]
if !c.closed.Load() {
c.exit()
// We need to exit the coroutines in a separate goroutine because:
// * The coroutine may be stuck and won't respond to the exit request.
// * On exit the coroutines defers will still run and that may block.
go func() {
for _, c := range d.coroutines {
c.exit(defaultCoroutineExitTimeout)
}
}
}()
}

func (d *dispatcherImpl) StackTrace() string {
Expand Down

0 comments on commit 30b2681

Please sign in to comment.