From 0c69423d6b98508c910f65ddad17fbe1967cd8d7 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Sun, 29 Aug 2021 21:38:32 -0700 Subject: [PATCH] Allow pending goroutines exit after workflow completion (#520) --- internal/internal_workflow_test.go | 41 +++++++++++++++++++++++++ internal/internal_workflow_testsuite.go | 1 + 2 files changed, 42 insertions(+) diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index 8541feea0..0bbf1f07b 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -1289,6 +1289,47 @@ func (s *WorkflowUnitTest) Test_WaitGroupWorkflowTest() { s.Equal(n, total) } +func (s *WorkflowUnitTest) Test_StaleGoroutinesAreShutDown() { + env := s.NewTestWorkflowEnvironment() + deferred := make(chan struct{}) + after := make(chan struct{}) + wf := func(ctx Context) error { + Go(ctx, func(ctx Context) { + defer func() { close(deferred) }() + _ = Sleep(ctx, time.Hour) // outlive the workflow + close(after) + }) + _ = Sleep(ctx, time.Minute) + return nil + } + env.RegisterWorkflow(wf) + + env.ExecuteWorkflow(wf) + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + + // goroutines are shut down async at the moment, so wait with a timeout. + // give it up to 1s total. + + started := time.Now() + maxWait := time.NewTimer(time.Second) + defer maxWait.Stop() + select { + case <-deferred: + s.T().Logf("deferred callback executed after %v", time.Since(started)) + case <-maxWait.C: + s.Fail("deferred func should have been called within 1 second") + } + // if deferred code has run, this has already occurred-or-not. + // if it timed out waiting for the deferred code, it has waited long enough, and this is mostly a curiosity. + select { + case <-after: + s.Fail("code after sleep should not have run") + default: + s.T().Log("code after sleep correctly not executed") + } +} + var _ WorkflowInterceptor = (*tracingWorkflowInterceptor)(nil) var _ WorkflowOutboundCallsInterceptor = (*tracingOutboundCallsInterceptor)(nil) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 5a7a92290..f634e059f 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -837,6 +837,7 @@ func (env *testWorkflowEnvironmentImpl) Complete(result *commonpb.Payloads, err env.logger.Debug("Workflow already completed.") return } + env.workflowDef.Close() var canceledErr *CanceledError if errors.As(err, &canceledErr) && env.workflowCancelHandler != nil { env.workflowCancelHandler()