From 968305263053eb438e79051953846d2ad1c1b909 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 22 Oct 2024 14:30:46 -0700 Subject: [PATCH 1/8] initial changes, added replay test for legacy history, need to finish writing tests --- internal/internal_coroutines_test.go | 185 ++++++++++++++++++ internal/internal_flags.go | 7 +- internal/internal_workflow.go | 11 +- internal/internal_workflow_testsuite_test.go | 51 +++++ test/replaytests/replay_test.go | 10 + .../selector-blocking-default.json | 89 +++++++++ test/replaytests/workflows.go | 46 +++++ 7 files changed, 397 insertions(+), 2 deletions(-) create mode 100644 test/replaytests/selector-blocking-default.json diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 4e3478961..3eb18ea6d 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -551,6 +551,191 @@ func TestBlockingSelect(t *testing.T) { require.EqualValues(t, expected, history) } +func TestSelectBlockingDefault(t *testing.T) { + // manually create a dispatcher to ensure sdkFlags are not set + var history []string + env := &workflowEnvironmentImpl{ + sdkFlags: &sdkFlags{}, + commandsHelper: newCommandsHelper(), + dataConverter: converter.GetDefaultDataConverter(), + workflowInfo: &WorkflowInfo{ + Namespace: "namespace:" + t.Name(), + TaskQueueName: "taskqueue:" + t.Name(), + }, + } + interceptor, ctx, err := newWorkflowContext(env, nil) + require.NoError(t, err, "newWorkflowContext failed") + d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { + c1 := NewChannel(ctx) + c2 := NewChannel(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + + }) + + Go(ctx, func(ctx Context) { + history = append(history, "add-two") + c2.Send(ctx, "two") + history = append(history, "add-two-done") + }) + + selector := NewSelector(ctx) + var v string + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddDefault(func() { + c2.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c2-%v", v)) + }) + history = append(history, "select1") + require.False(t, selector.HasPending()) + selector.Select(ctx) + + // Default behavior this signal is lost + require.True(t, c1.Len() == 0 && v == "two") + + history = append(history, "select2") + require.False(t, selector.HasPending()) + history = append(history, "done") + }, func() bool { return false }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone()) + + expected := []string{ + "select1", + "add-one", + "add-one-done", + "add-two", + "add-two-done", + "c2-two", + "select2", + "done", + } + require.EqualValues(t, expected, history) +} + +func TestSelectBlockingDefaultWithFlag(t *testing.T) { + // sdkFlags are set by default for tests + var history []string + d := createNewDispatcher(func(ctx Context) { + c1 := NewChannel(ctx) + c2 := NewChannel(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + + }) + + Go(ctx, func(ctx Context) { + history = append(history, "add-two") + c2.Send(ctx, "two") + history = append(history, "add-two-done") + }) + + selector := NewSelector(ctx) + var v string + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddDefault(func() { + c2.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c2-%v", v)) + }) + history = append(history, "select1") + require.False(t, selector.HasPending()) + selector.Select(ctx) + + // Signal should not be lost + require.False(t, c1.Len() == 0 && v == "two") + + history = append(history, "select2") + require.True(t, selector.HasPending()) + selector.Select(ctx) + require.False(t, selector.HasPending()) + history = append(history, "done") + }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone()) + + expected := []string{ + "select1", + "add-one", + "add-one-done", + "add-two", + "add-two-done", + "c2-two", + "select2", + "c1-one", + "done", + } + require.EqualValues(t, expected, history) +} + +func TestSelectBlockingFuture(t *testing.T) { + var history []string + d := createNewDispatcher(func(ctx Context) { + // TODO + c1 := NewChannel(ctx) + selector := NewSelector(ctx) + var v string + selector.AddReceive(c1, func(c ReceiveChannel, more bool) { + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }).AddFuture(NewTimer(ctx, time.Second), func(f Future) { + f.Get(ctx, nil) + history = append(history, "future") + }) + + history = append(history, "select1") + require.False(t, selector.HasPending()) + selector.Select(ctx) + + // Signal should not be lost + require.False(t, c1.Len() == 0 && v == "two") + + history = append(history, "select2") + require.True(t, selector.HasPending()) + selector.Select(ctx) + require.False(t, selector.HasPending()) + history = append(history, "done") + }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone()) + + expected := []string{ + // TODO + } + require.EqualValues(t, expected, history) +} + +func TestSelectBlockingSend(t *testing.T) { + var history []string + d := createNewDispatcher(func(ctx Context) { + // TODO + }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone()) + + expected := []string{ + // TODO + } + require.EqualValues(t, expected, history) +} + func TestBlockingSelectAsyncSend(t *testing.T) { var history []string d := createNewDispatcher(func(ctx Context) { diff --git a/internal/internal_flags.go b/internal/internal_flags.go index 503c650a5..de2e3f5b2 100644 --- a/internal/internal_flags.go +++ b/internal/internal_flags.go @@ -47,7 +47,10 @@ const ( // SDKPriorityUpdateHandling will cause update request to be handled before the main workflow method. // It will also cause the SDK to immediately handle updates when a handler is registered. SDKPriorityUpdateHandling = 4 - SDKFlagUnknown = math.MaxUint32 + // SDKFlagBlockedSelectorSignalReceive will cause a signal to not be lost + // when the Default path is blocked. + SDKFlagBlockedSelectorSignalReceive = 5 + SDKFlagUnknown = math.MaxUint32 ) func sdkFlagFromUint(value uint32) sdkFlag { @@ -62,6 +65,8 @@ func sdkFlagFromUint(value uint32) sdkFlag { return SDKFlagProtocolMessageCommand case uint32(SDKPriorityUpdateHandling): return SDKPriorityUpdateHandling + case uint32(SDKFlagBlockedSelectorSignalReceive): + return SDKFlagBlockedSelectorSignalReceive default: return SDKFlagUnknown } diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index ce4eb8986..9bfa0279e 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -1407,8 +1407,17 @@ func (s *selectorImpl) Select(ctx Context) { if readyBranch != nil { return false } - readyBranch = func() { + // readyBranch is not executed when AddDefault is specified, + // setting the value here prevents the signal from being dropped + dropSignalFlag := getWorkflowEnvironment(ctx).TryUse(SDKFlagBlockedSelectorSignalReceive) + if dropSignalFlag { c.recValue = &v + } + + readyBranch = func() { + if !dropSignalFlag { + c.recValue = &v + } f(c, more) } return true diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 472e8f1fa..c1ace05ce 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -39,6 +39,7 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" "google.golang.org/protobuf/proto" "go.temporal.io/sdk/converter" @@ -4240,3 +4241,53 @@ func (s *WorkflowTestSuiteUnitTest) Test_SameWorkflowAndActivityNames() { s.Require().True(env.IsWorkflowCompleted()) s.Require().NoError(env.GetWorkflowError()) } + +func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() { + workflowFn := func(ctx Context) error { + ch1 := GetSignalChannel(ctx, "test-signal") + ch2 := GetSignalChannel(ctx, "test-signal-2") + selector := NewSelector(ctx) + var v string + selector.AddReceive(ch1, func(c ReceiveChannel, more bool) { + c.Receive(ctx, &v) + fmt.Println("received signal from ch1") + }) + selector.AddDefault(func() { + ch2.Receive(ctx, &v) + fmt.Println("received signal from ch2") + }) + selector.Select(ctx) + fmt.Println("ch1.Len()", ch1.Len(), "s", v) + // default behavior is this signal is lost + s.Require().True(ch1.Len() == 0 && "s2" == v) + + return nil + } + + // send a signal 5 seconds after workflow started + env := s.NewTestWorkflowEnvironment() + env.RegisterDelayedCallback(func() { + fmt.Println("sending signal to 1") + env.SignalWorkflow("test-signal", "s1") + fmt.Println("sending signal to 2") + env.SignalWorkflow("test-signal-2", "s2") + }, 5*time.Second) + env.ExecuteWorkflow(workflowFn) + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) +} + +// Not sure we need this? +func (s *WorkflowTestSuiteUnitTest) Test_SDKFlagBlockedSelectorSignalReceive() { + flags := newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{}) + flags.set(SDKFlagBlockedSelectorSignalReceive) + fmt.Println("flags", flags) +} + +func (s *WorkflowTestSuiteUnitTest) Test_SelectBlockingFuture() { + // TODO +} + +func (s *WorkflowTestSuiteUnitTest) Test_SelectBlockingSend() { + // TODO +} diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index 55c55e2f6..452c4501e 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -464,6 +464,16 @@ func (s *replayTestSuite) TestGogoprotoPayloadWorkflow() { s.NoError(err) } +func (s *replayTestSuite) TestSelectorBlockingDefault() { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(SelectorBlockingDefaultWorkflow) + // Verify we can still replay an old workflow that does + // not have the SDKFlagBlockedSelectorSignalReceive flag + err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "selector-blocking-default.json") + s.NoError(err) + require.NoError(s.T(), err) +} + type captureConverter struct { converter.DataConverter toPayloads []interface{} diff --git a/test/replaytests/selector-blocking-default.json b/test/replaytests/selector-blocking-default.json new file mode 100644 index 000000000..07c2d0387 --- /dev/null +++ b/test/replaytests/selector-blocking-default.json @@ -0,0 +1,89 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-10-21T23:39:08.991521Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1048587", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "SelectorBlockingDefaultWorkflow" + }, + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "0s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "dde5f879-0e59-47e8-8048-ac0f164866fd", + "identity": "47182@Andrews-MacBook-Pro.local@", + "firstExecutionRunId": "dde5f879-0e59-47e8-8048-ac0f164866fd", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": {}, + "workflowId": "hello_world_workflowID" + } + }, + { + "eventId": "2", + "eventTime": "2024-10-21T23:39:08.991569Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1048588", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-10-21T23:39:08.994898Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1048593", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "47139@Andrews-MacBook-Pro.local@", + "requestId": "a7a50c99-1d0d-449c-9d75-09458ac1e7af", + "historySizeBytes": "282", + "workerVersion": { + "buildId": "e15e79cbae5f5acc33774a930eed2f97" + } + } + }, + { + "eventId": "4", + "eventTime": "2024-10-21T23:39:08.999006Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1048597", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "47139@Andrews-MacBook-Pro.local@", + "workerVersion": { + "buildId": "e15e79cbae5f5acc33774a930eed2f97" + }, + "sdkMetadata": { + "langUsedFlags": [ + 3 + ], + "sdkName": "temporal-go", + "sdkVersion": "1.29.1" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-10-21T23:39:08.999055Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1048598", + "workflowExecutionCompletedEventAttributes": { + "workflowTaskCompletedEventId": "4" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/workflows.go b/test/replaytests/workflows.go index 90fbdff5c..cd6cf5c33 100644 --- a/test/replaytests/workflows.go +++ b/test/replaytests/workflows.go @@ -610,3 +610,49 @@ func ListAndDescribeWorkflow(ctx workflow.Context) (int, error) { } return len(result.Executions), nil } + +func SelectorBlockingDefaultWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + ch1 := workflow.NewChannel(ctx) + ch2 := workflow.NewChannel(ctx) + + workflow.Go(ctx, func(ctx workflow.Context) { + ch1.Send(ctx, "one") + + }) + + workflow.Go(ctx, func(ctx workflow.Context) { + ch2.Send(ctx, "two") + }) + + selector := workflow.NewSelector(ctx) + var s string + selector.AddReceive(ch1, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &s) + }) + selector.AddDefault(func() { + ch2.Receive(ctx, &s) + }) + selector.Select(ctx) + if ch1.Len() == 0 && s == "two" { + logger.Info("Signal in ch1 lost") + return nil + } else { + var result string + activity := workflow.ExecuteActivity(ctx, SelectorBlockingDefaultActivity, "Signal not lost") + activity.Get(ctx, &result) + logger.Info("Result", result) + } + return nil +} + +func SelectorBlockingDefaultActivity(ctx context.Context, value string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Activity", "value", value) + return value + " was logged!", nil +} From d0e924642ea8474509d0e75f3a7d18b3d1186a2d Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 25 Oct 2024 15:33:29 -0700 Subject: [PATCH 2/8] Clean up tests, fix error --- internal/internal_coroutines_test.go | 53 -------------------- internal/internal_workflow_testsuite_test.go | 18 +------ 2 files changed, 1 insertion(+), 70 deletions(-) diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 3eb18ea6d..c1ed43d16 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -683,59 +683,6 @@ func TestSelectBlockingDefaultWithFlag(t *testing.T) { require.EqualValues(t, expected, history) } -func TestSelectBlockingFuture(t *testing.T) { - var history []string - d := createNewDispatcher(func(ctx Context) { - // TODO - c1 := NewChannel(ctx) - selector := NewSelector(ctx) - var v string - selector.AddReceive(c1, func(c ReceiveChannel, more bool) { - c.Receive(ctx, &v) - history = append(history, fmt.Sprintf("c1-%v", v)) - }).AddFuture(NewTimer(ctx, time.Second), func(f Future) { - f.Get(ctx, nil) - history = append(history, "future") - }) - - history = append(history, "select1") - require.False(t, selector.HasPending()) - selector.Select(ctx) - - // Signal should not be lost - require.False(t, c1.Len() == 0 && v == "two") - - history = append(history, "select2") - require.True(t, selector.HasPending()) - selector.Select(ctx) - require.False(t, selector.HasPending()) - history = append(history, "done") - }) - defer d.Close() - requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) - require.True(t, d.IsDone()) - - expected := []string{ - // TODO - } - require.EqualValues(t, expected, history) -} - -func TestSelectBlockingSend(t *testing.T) { - var history []string - d := createNewDispatcher(func(ctx Context) { - // TODO - }) - defer d.Close() - requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) - require.True(t, d.IsDone()) - - expected := []string{ - // TODO - } - require.EqualValues(t, expected, history) -} - func TestBlockingSelectAsyncSend(t *testing.T) { var history []string d := createNewDispatcher(func(ctx Context) { diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index c1ace05ce..7705b4014 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -39,7 +39,6 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" - "go.temporal.io/api/workflowservice/v1" "google.golang.org/protobuf/proto" "go.temporal.io/sdk/converter" @@ -4259,7 +4258,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() { selector.Select(ctx) fmt.Println("ch1.Len()", ch1.Len(), "s", v) // default behavior is this signal is lost - s.Require().True(ch1.Len() == 0 && "s2" == v) + s.Require().True(ch1.Len() == 0 && v == "s2") return nil } @@ -4276,18 +4275,3 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() { s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError()) } - -// Not sure we need this? -func (s *WorkflowTestSuiteUnitTest) Test_SDKFlagBlockedSelectorSignalReceive() { - flags := newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{}) - flags.set(SDKFlagBlockedSelectorSignalReceive) - fmt.Println("flags", flags) -} - -func (s *WorkflowTestSuiteUnitTest) Test_SelectBlockingFuture() { - // TODO -} - -func (s *WorkflowTestSuiteUnitTest) Test_SelectBlockingSend() { - // TODO -} From ca909b6dae76b8f78af8c2e2d8cc2ce4c39554cf Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 28 Oct 2024 09:35:31 -0700 Subject: [PATCH 3/8] unit test for fixed behavior --- internal/internal_workflow_testsuite_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 7705b4014..494e95720 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4257,8 +4257,9 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() { }) selector.Select(ctx) fmt.Println("ch1.Len()", ch1.Len(), "s", v) - // default behavior is this signal is lost - s.Require().True(ch1.Len() == 0 && v == "s2") + // testWorkflowEnvironmentImpl.TryUse always returns true for flags + // test for fixed behavior + s.Require().True(ch1.Len() == 1 && v == "s2") return nil } From 5d2778ee9f6c7e9ff652dc59c3f652c79732ef56 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 28 Oct 2024 16:36:16 -0700 Subject: [PATCH 4/8] PR feedback --- internal/internal_event_handlers.go | 4 ++++ internal/internal_flags.go | 5 +++++ internal/internal_worker_base.go | 2 ++ internal/internal_workflow.go | 2 +- internal/internal_workflow_testsuite.go | 4 ++++ internal/internal_workflow_testsuite_test.go | 10 ++-------- test/replaytests/workflows.go | 8 ++++---- 7 files changed, 22 insertions(+), 13 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 82a9785be..9f6860fc3 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -978,6 +978,10 @@ func (wc *workflowEnvironmentImpl) TryUse(flag sdkFlag) bool { return wc.sdkFlags.tryUse(flag, !wc.isReplay) } +func (wc *workflowEnvironmentImpl) GetFlag(flag sdkFlag) bool { + return wc.sdkFlags.getFlag(flag) +} + func (wc *workflowEnvironmentImpl) QueueUpdate(name string, f func()) { wc.bufferedUpdateRequests[name] = append(wc.bufferedUpdateRequests[name], f) } diff --git a/internal/internal_flags.go b/internal/internal_flags.go index de2e3f5b2..89693d839 100644 --- a/internal/internal_flags.go +++ b/internal/internal_flags.go @@ -110,6 +110,11 @@ func (sf *sdkFlags) tryUse(flag sdkFlag, record bool) bool { } } +// getFlag returns true if the flag is currently set. +func (sf *sdkFlags) getFlag(flag sdkFlag) bool { + return sf.currentFlags[flag] || sf.newFlags[flag] +} + // set marks a flag as in current use regardless of replay status. func (sf *sdkFlags) set(flags ...sdkFlag) { if !sf.capabilities.GetSdkMetadata() { diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index dc3865e6c..f7160da8e 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -145,6 +145,8 @@ type ( DrainUnhandledUpdates() bool // TryUse returns true if this flag may currently be used. TryUse(flag sdkFlag) bool + // GetFlag returns if the flag is currently used. + GetFlag(flag sdkFlag) bool } // WorkflowDefinitionFactory factory for creating WorkflowDefinition instances. diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 9bfa0279e..245661027 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -1409,7 +1409,7 @@ func (s *selectorImpl) Select(ctx Context) { } // readyBranch is not executed when AddDefault is specified, // setting the value here prevents the signal from being dropped - dropSignalFlag := getWorkflowEnvironment(ctx).TryUse(SDKFlagBlockedSelectorSignalReceive) + dropSignalFlag := getWorkflowEnvironment(ctx).GetFlag(SDKFlagBlockedSelectorSignalReceive) if dropSignalFlag { c.recValue = &v } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 3b9742f73..a92fbb371 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -584,6 +584,10 @@ func (env *testWorkflowEnvironmentImpl) TryUse(flag sdkFlag) bool { return true } +func (env *testWorkflowEnvironmentImpl) GetFlag(flag sdkFlag) bool { + return true +} + func (env *testWorkflowEnvironmentImpl) QueueUpdate(name string, f func()) { env.bufferedUpdateRequests[name] = append(env.bufferedUpdateRequests[name], f) } diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 494e95720..452d51555 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4249,27 +4249,21 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() { var v string selector.AddReceive(ch1, func(c ReceiveChannel, more bool) { c.Receive(ctx, &v) - fmt.Println("received signal from ch1") }) selector.AddDefault(func() { ch2.Receive(ctx, &v) - fmt.Println("received signal from ch2") }) selector.Select(ctx) - fmt.Println("ch1.Len()", ch1.Len(), "s", v) - // testWorkflowEnvironmentImpl.TryUse always returns true for flags - // test for fixed behavior s.Require().True(ch1.Len() == 1 && v == "s2") + s.Require().True(selector.HasPending()) return nil } - // send a signal 5 seconds after workflow started + // send a signal after workflow has started env := s.NewTestWorkflowEnvironment() env.RegisterDelayedCallback(func() { - fmt.Println("sending signal to 1") env.SignalWorkflow("test-signal", "s1") - fmt.Println("sending signal to 2") env.SignalWorkflow("test-signal-2", "s2") }, 5*time.Second) env.ExecuteWorkflow(workflowFn) diff --git a/test/replaytests/workflows.go b/test/replaytests/workflows.go index cd6cf5c33..97089844a 100644 --- a/test/replaytests/workflows.go +++ b/test/replaytests/workflows.go @@ -639,14 +639,14 @@ func SelectorBlockingDefaultWorkflow(ctx workflow.Context) error { ch2.Receive(ctx, &s) }) selector.Select(ctx) - if ch1.Len() == 0 && s == "two" { - logger.Info("Signal in ch1 lost") - return nil - } else { + if selector.HasPending() { var result string activity := workflow.ExecuteActivity(ctx, SelectorBlockingDefaultActivity, "Signal not lost") activity.Get(ctx, &result) logger.Info("Result", result) + } else { + logger.Info("Signal in ch1 lost") + return nil } return nil } From ceadefdb0843708009e2db4e0c043184bc937417 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 1 Nov 2024 15:56:18 -0700 Subject: [PATCH 5/8] improve tests, add tests for AddFuture, AddSend --- internal/internal_coroutines_test.go | 141 +++++++++++++++++-- internal/internal_workflow_testsuite.go | 7 +- internal/internal_workflow_testsuite_test.go | 10 +- 3 files changed, 141 insertions(+), 17 deletions(-) diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index c1ed43d16..8114c47fd 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -38,6 +38,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/converter" ) @@ -552,10 +553,9 @@ func TestBlockingSelect(t *testing.T) { } func TestSelectBlockingDefault(t *testing.T) { - // manually create a dispatcher to ensure sdkFlags are not set var history []string env := &workflowEnvironmentImpl{ - sdkFlags: &sdkFlags{}, + sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), commandsHelper: newCommandsHelper(), dataConverter: converter.GetDefaultDataConverter(), workflowInfo: &WorkflowInfo{ @@ -563,6 +563,8 @@ func TestSelectBlockingDefault(t *testing.T) { TaskQueueName: "taskqueue:" + t.Name(), }, } + // Verify that the flag is not set + require.False(t, env.GetFlag(SDKFlagBlockedSelectorSignalReceive)) interceptor, ctx, err := newWorkflowContext(env, nil) require.NoError(t, err, "newWorkflowContext failed") d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { @@ -594,19 +596,18 @@ func TestSelectBlockingDefault(t *testing.T) { history = append(history, fmt.Sprintf("c2-%v", v)) }) history = append(history, "select1") - require.False(t, selector.HasPending()) selector.Select(ctx) // Default behavior this signal is lost require.True(t, c1.Len() == 0 && v == "two") history = append(history, "select2") - require.False(t, selector.HasPending()) + selector.Select(ctx) history = append(history, "done") }, func() bool { return false }) defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) - require.True(t, d.IsDone()) + require.False(t, d.IsDone()) expected := []string{ "select1", @@ -616,15 +617,25 @@ func TestSelectBlockingDefault(t *testing.T) { "add-two-done", "c2-two", "select2", - "done", } require.EqualValues(t, expected, history) } func TestSelectBlockingDefaultWithFlag(t *testing.T) { - // sdkFlags are set by default for tests var history []string - d := createNewDispatcher(func(ctx Context) { + env := &workflowEnvironmentImpl{ + sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), + commandsHelper: newCommandsHelper(), + dataConverter: converter.GetDefaultDataConverter(), + workflowInfo: &WorkflowInfo{ + Namespace: "namespace:" + t.Name(), + TaskQueueName: "taskqueue:" + t.Name(), + }, + } + require.True(t, env.TryUse(SDKFlagBlockedSelectorSignalReceive)) + interceptor, ctx, err := newWorkflowContext(env, nil) + require.NoError(t, err, "newWorkflowContext failed") + d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { c1 := NewChannel(ctx) c2 := NewChannel(ctx) @@ -653,18 +664,15 @@ func TestSelectBlockingDefaultWithFlag(t *testing.T) { history = append(history, fmt.Sprintf("c2-%v", v)) }) history = append(history, "select1") - require.False(t, selector.HasPending()) selector.Select(ctx) // Signal should not be lost require.False(t, c1.Len() == 0 && v == "two") history = append(history, "select2") - require.True(t, selector.HasPending()) selector.Select(ctx) - require.False(t, selector.HasPending()) history = append(history, "done") - }) + }, func() bool { return false }) defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) require.True(t, d.IsDone()) @@ -680,6 +688,115 @@ func TestSelectBlockingDefaultWithFlag(t *testing.T) { "c1-one", "done", } + + require.EqualValues(t, expected, history) +} + +func TestBlockingSelectFuture(t *testing.T) { + var history []string + d := createNewDispatcher(func(ctx Context) { + c1 := NewChannel(ctx) + f1, s1 := NewFuture(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + }) + Go(ctx, func(ctx Context) { + history = append(history, "add-two") + s1.SetValue("one-future") + }) + + selector := NewSelector(ctx) + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + var v string + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddFuture(f1, func(f Future) { + var v string + err := f.Get(ctx, &v) + require.NoError(t, err) + history = append(history, fmt.Sprintf("f1-%v", v)) + }) + history = append(history, "select1") + selector.Select(ctx) + fmt.Println("select1 done", history) + + history = append(history, "select2") + selector.Select(ctx) + history = append(history, "done") + + }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone(), strings.Join(history, "\n")) + expected := []string{ + "select1", + "add-one", + "add-one-done", + "add-two", + "c1-one", + "select2", + "f1-one-future", + "done", + } + require.EqualValues(t, expected, history) +} + +func TestBlockingSelectSend(t *testing.T) { + var history []string + d := createNewDispatcher(func(ctx Context) { + c1 := NewChannel(ctx) + c2 := NewChannel(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + }) + Go(ctx, func(ctx Context) { + require.True(t, c2.Len() == 1) + history = append(history, "receiver") + var v string + more := c2.Receive(ctx, &v) + require.True(t, more) + history = append(history, fmt.Sprintf("c2-%v", v)) + require.True(t, c2.Len() == 0) + }) + + selector := NewSelector(ctx) + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + var v string + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddSend(c2, "two", func() { history = append(history, "send2") }) + history = append(history, "select1") + selector.Select(ctx) + + history = append(history, "select2") + selector.Select(ctx) + history = append(history, "done") + + }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone(), strings.Join(history, "\n")) + expected := []string{ + "select1", + "add-one", + "add-one-done", + "receiver", + "c1-one", + "select2", + "send2", + "done", + "c2-two", + } require.EqualValues(t, expected, history) } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index a92fbb371..acefd43ce 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -229,6 +229,8 @@ type ( workflowFunctionExecuting bool bufferedUpdateRequests map[string][]func() + + sdkFlags *sdkFlags } testSessionEnvironmentImpl struct { @@ -289,6 +291,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist failureConverter: GetDefaultFailureConverter(), runTimeout: maxWorkflowTimeout, bufferedUpdateRequests: make(map[string][]func()), + sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), } if debugMode { @@ -581,11 +584,11 @@ func (env *testWorkflowEnvironmentImpl) getWorkflowDefinition(wt WorkflowType) ( } func (env *testWorkflowEnvironmentImpl) TryUse(flag sdkFlag) bool { - return true + return env.sdkFlags.tryUse(flag, true) } func (env *testWorkflowEnvironmentImpl) GetFlag(flag sdkFlag) bool { - return true + return env.sdkFlags.getFlag(flag) } func (env *testWorkflowEnvironmentImpl) QueueUpdate(name string, f func()) { diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 452d51555..558ae7a43 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4254,8 +4254,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() { ch2.Receive(ctx, &v) }) selector.Select(ctx) - s.Require().True(ch1.Len() == 1 && v == "s2") - s.Require().True(selector.HasPending()) + s.Require().True(ch1.Len() == 0 && v == "s2") + selector.Select(ctx) return nil } @@ -4268,5 +4268,9 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() { }, 5*time.Second) env.ExecuteWorkflow(workflowFn) s.True(env.IsWorkflowCompleted()) - s.NoError(env.GetWorkflowError()) + err := env.GetWorkflowError() + s.Error(err) + var workflowErr *WorkflowExecutionError + s.True(errors.As(err, &workflowErr)) + s.Equal("deadline exceeded (type: ScheduleToClose)", workflowErr.cause.Error()) } From 7ad81699d95cf7dd8b8aeeb5e2129e051e567bbe Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 13 Nov 2024 10:09:43 -0800 Subject: [PATCH 6/8] add integration tests, add debug API to enable SDK flag for tests --- internal/internal_flags.go | 8 + internal/internal_workflow.go | 9 +- test/integration_test.go | 22 ++ test/replaytests/replay_test.go | 10 + test/replaytests/selector-non-blocking.json | 211 ++++++++++++++++++++ test/workflow_test.go | 44 ++++ 6 files changed, 303 insertions(+), 1 deletion(-) create mode 100644 test/replaytests/selector-non-blocking.json diff --git a/internal/internal_flags.go b/internal/internal_flags.go index 89693d839..ec7721c61 100644 --- a/internal/internal_flags.go +++ b/internal/internal_flags.go @@ -53,6 +53,8 @@ const ( SDKFlagUnknown = math.MaxUint32 ) +var unblockSelectorSignal bool + func sdkFlagFromUint(value uint32) sdkFlag { switch value { case uint32(SDKFlagUnset): @@ -141,3 +143,9 @@ func (sf *sdkFlags) gatherNewSDKFlags() []sdkFlag { } return flags } + +// SetUnblockSelectorSignal sets the flag to unblock the selector signal. +// For test use only, +func SetUnblockSelectorSignal() { + unblockSelectorSignal = true +} diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 245661027..4bc78cac0 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -1409,7 +1409,14 @@ func (s *selectorImpl) Select(ctx Context) { } // readyBranch is not executed when AddDefault is specified, // setting the value here prevents the signal from being dropped - dropSignalFlag := getWorkflowEnvironment(ctx).GetFlag(SDKFlagBlockedSelectorSignalReceive) + env := getWorkflowEnvironment(ctx) + var dropSignalFlag bool + if unblockSelectorSignal { + dropSignalFlag = env.TryUse(SDKFlagBlockedSelectorSignalReceive) + } else { + dropSignalFlag = env.GetFlag(SDKFlagBlockedSelectorSignalReceive) + } + if dropSignalFlag { c.recValue = &v } diff --git a/test/integration_test.go b/test/integration_test.go index d8c80cee8..eb7f0f5b9 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6602,6 +6602,28 @@ func (ts *IntegrationTestSuite) getReportedOperationCount(metricName string, ope return count } +func (ts *IntegrationTestSuite) TestSelectorBlock() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-selector-block") + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal, false) + ts.NoError(err) + var result string + ts.NoError(run.Get(ctx, &result)) + ts.Equal("hello", result) +} + +func (ts *IntegrationTestSuite) TestSelectorNoBlock() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-selector-block") + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal, true) + ts.NoError(err) + var result string + ts.NoError(run.Get(ctx, &result)) + ts.Equal("HELLO", result) +} + type coroutineCountingInterceptor struct { interceptor.WorkerInterceptorBase // Access via count() diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index 452c4501e..b328a9752 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -474,6 +474,16 @@ func (s *replayTestSuite) TestSelectorBlockingDefault() { require.NoError(s.T(), err) } +func (s *replayTestSuite) TestSelectorNonBlocking() { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(SelectorBlockingDefaultWorkflow) + // Verify we can replay the new workflow that has the + // SDKFlagBlockedSelectorSignalReceive flag + err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "selector-non-blocking.json") + s.NoError(err) + require.NoError(s.T(), err) +} + type captureConverter struct { converter.DataConverter toPayloads []interface{} diff --git a/test/replaytests/selector-non-blocking.json b/test/replaytests/selector-non-blocking.json new file mode 100644 index 000000000..6ae33da84 --- /dev/null +++ b/test/replaytests/selector-non-blocking.json @@ -0,0 +1,211 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-11-13T17:54:47.478632Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1048626", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "SelectorBlockingDefaultWorkflow" + }, + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowExecutionTimeout": "2s", + "workflowRunTimeout": "2s", + "workflowTaskTimeout": "2s", + "originalExecutionRunId": "a73567ce-1a8e-4c86-9286-65e9039663a3", + "identity": "94241@Andrews-MacBook-Pro.local@", + "firstExecutionRunId": "a73567ce-1a8e-4c86-9286-65e9039663a3", + "attempt": 1, + "workflowExecutionExpirationTime": "2024-11-13T17:54:49.478Z", + "firstWorkflowTaskBackoff": "0s", + "header": {}, + "workflowId": "hello_world_workflowID" + } + }, + { + "eventId": "2", + "eventTime": "2024-11-13T17:54:47.478680Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1048627", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "2s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-11-13T17:54:47.480740Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1048636", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "94147@Andrews-MacBook-Pro.local@", + "requestId": "9fae9b1e-4182-4f47-a675-ba4facd08273", + "historySizeBytes": "602", + "workerVersion": { + "buildId": "7e5be6238aa91ebec5dcc5b6859e87c6" + } + } + }, + { + "eventId": "4", + "eventTime": "2024-11-13T17:54:47.485146Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1048640", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "94147@Andrews-MacBook-Pro.local@", + "workerVersion": { + "buildId": "7e5be6238aa91ebec5dcc5b6859e87c6" + }, + "sdkMetadata": { + "langUsedFlags": [ + 3, + 5 + ], + "sdkName": "temporal-go", + "sdkVersion": "1.29.1" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-11-13T17:54:47.485222Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED", + "taskId": "1048641", + "activityTaskScheduledEventAttributes": { + "activityId": "5", + "activityType": { + "name": "SelectorBlockingDefaultActivity" + }, + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "header": {}, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IlNpZ25hbCBub3QgbG9zdCI=" + } + ] + }, + "scheduleToCloseTimeout": "2s", + "scheduleToStartTimeout": "2s", + "startToCloseTimeout": "2s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "4", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + }, + "useWorkflowBuildId": true + } + }, + { + "eventId": "6", + "eventTime": "2024-11-13T17:54:47.486704Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED", + "taskId": "1048646", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "5", + "identity": "94147@Andrews-MacBook-Pro.local@", + "requestId": "31f676df-39a4-4ef7-ad2e-fd2166139abd", + "attempt": 1, + "workerVersion": { + "buildId": "7e5be6238aa91ebec5dcc5b6859e87c6" + } + } + }, + { + "eventId": "7", + "eventTime": "2024-11-13T17:54:47.488853Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED", + "taskId": "1048647", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IlNpZ25hbCBub3QgbG9zdCB3YXMgbG9nZ2VkISI=" + } + ] + }, + "scheduledEventId": "5", + "startedEventId": "6", + "identity": "94147@Andrews-MacBook-Pro.local@" + } + }, + { + "eventId": "8", + "eventTime": "2024-11-13T17:54:47.488857Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1048648", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Andrews-MacBook-Pro.local:ffbf63d9-bf89-41ab-8431-2f3d60c085c7", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "hello-world" + }, + "startToCloseTimeout": "2s", + "attempt": 1 + } + }, + { + "eventId": "9", + "eventTime": "2024-11-13T17:54:47.489773Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1048652", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "8", + "identity": "94147@Andrews-MacBook-Pro.local@", + "requestId": "fc1ab01a-627d-49db-a0c0-0829e9938212", + "historySizeBytes": "1417", + "workerVersion": { + "buildId": "7e5be6238aa91ebec5dcc5b6859e87c6" + } + } + }, + { + "eventId": "10", + "eventTime": "2024-11-13T17:54:47.491177Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1048656", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "8", + "startedEventId": "9", + "identity": "94147@Andrews-MacBook-Pro.local@", + "workerVersion": { + "buildId": "7e5be6238aa91ebec5dcc5b6859e87c6" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "11", + "eventTime": "2024-11-13T17:54:47.491192Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1048657", + "workflowExecutionCompletedEventAttributes": { + "workflowTaskCompletedEventId": "10" + } + } + ] +} \ No newline at end of file diff --git a/test/workflow_test.go b/test/workflow_test.go index 562761d8d..d80e2c3aa 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -3174,6 +3174,49 @@ func (w *Workflows) RunsLocalAndNonlocalActsWithRetries(ctx workflow.Context, nu return nil } +func (w *Workflows) SelectorBlockSignal(ctx workflow.Context, enableFlag bool) (string, error) { + ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) + var logger = workflow.GetLogger(ctx) + logger.Info("calling ExecuteActivity") + ch1 := workflow.NewChannel(ctx) + ch2 := workflow.NewChannel(ctx) + + if enableFlag { + internal.SetUnblockSelectorSignal() + } + + workflow.Go(ctx, func(ctx workflow.Context) { + ch1.Send(ctx, "one") + + }) + + workflow.Go(ctx, func(ctx workflow.Context) { + ch2.Send(ctx, "two") + }) + + selector := workflow.NewSelector(ctx) + var s string + selector.AddReceive(ch1, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &s) + }) + selector.AddDefault(func() { + ch2.Receive(ctx, &s) + }) + selector.Select(ctx) + + var hello = "hello" + if selector.HasPending() { + var result string + activity := workflow.ExecuteActivity(ctx, "Prefix_ToUpper", hello) + activity.Get(ctx, &result) + logger.Info("Result", result) + return result, nil + } else { + logger.Info("Signal in ch1 lost") + } + return hello, nil +} + func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ActivityCancelRepro) worker.RegisterWorkflow(w.ActivityCompletionUsingID) @@ -3310,6 +3353,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.UpdateSetHandlerOnly) worker.RegisterWorkflow(w.Echo) worker.RegisterWorkflow(w.RunsLocalAndNonlocalActsWithRetries) + worker.RegisterWorkflow(w.SelectorBlockSignal) } func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions { From c96c3a5e7334645caa206eb04698891d3db73016 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 14 Nov 2024 15:20:24 -0800 Subject: [PATCH 7/8] set flag in test itself not workflow, unset flag after test --- internal/internal_flags.go | 6 ++++++ test/integration_test.go | 8 ++++++-- test/workflow_test.go | 6 +----- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/internal/internal_flags.go b/internal/internal_flags.go index ec7721c61..d3c174de3 100644 --- a/internal/internal_flags.go +++ b/internal/internal_flags.go @@ -149,3 +149,9 @@ func (sf *sdkFlags) gatherNewSDKFlags() []sdkFlag { func SetUnblockSelectorSignal() { unblockSelectorSignal = true } + +// UnsetUnblockSelectorSignal unsets the flag to unblock the selector signal. +// For test use only, +func UnsetUnblockSelectorSignal() { + unblockSelectorSignal = false +} diff --git a/test/integration_test.go b/test/integration_test.go index eb7f0f5b9..ff8aef905 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6606,7 +6606,7 @@ func (ts *IntegrationTestSuite) TestSelectorBlock() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() options := ts.startWorkflowOptions("test-selector-block") - run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal, false) + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal) ts.NoError(err) var result string ts.NoError(run.Get(ctx, &result)) @@ -6617,7 +6617,11 @@ func (ts *IntegrationTestSuite) TestSelectorNoBlock() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() options := ts.startWorkflowOptions("test-selector-block") - run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal, true) + + internal.SetUnblockSelectorSignal() + defer internal.UnsetUnblockSelectorSignal() + + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal) ts.NoError(err) var result string ts.NoError(run.Get(ctx, &result)) diff --git a/test/workflow_test.go b/test/workflow_test.go index d80e2c3aa..b99d8c731 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -3174,17 +3174,13 @@ func (w *Workflows) RunsLocalAndNonlocalActsWithRetries(ctx workflow.Context, nu return nil } -func (w *Workflows) SelectorBlockSignal(ctx workflow.Context, enableFlag bool) (string, error) { +func (w *Workflows) SelectorBlockSignal(ctx workflow.Context) (string, error) { ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) var logger = workflow.GetLogger(ctx) logger.Info("calling ExecuteActivity") ch1 := workflow.NewChannel(ctx) ch2 := workflow.NewChannel(ctx) - if enableFlag { - internal.SetUnblockSelectorSignal() - } - workflow.Go(ctx, func(ctx workflow.Context) { ch1.Send(ctx, "one") From 2c390ab50bd79e5ca52c75f84204782f0d5f987a Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 15 Nov 2024 09:37:15 -0800 Subject: [PATCH 8/8] unify set/unset function into one --- internal/internal_flags.go | 12 +++--------- test/integration_test.go | 4 ++-- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/internal/internal_flags.go b/internal/internal_flags.go index d3c174de3..4a697d69e 100644 --- a/internal/internal_flags.go +++ b/internal/internal_flags.go @@ -144,14 +144,8 @@ func (sf *sdkFlags) gatherNewSDKFlags() []sdkFlag { return flags } -// SetUnblockSelectorSignal sets the flag to unblock the selector signal. +// SetUnblockSelectorSignal toggles the flag to unblock the selector signal. // For test use only, -func SetUnblockSelectorSignal() { - unblockSelectorSignal = true -} - -// UnsetUnblockSelectorSignal unsets the flag to unblock the selector signal. -// For test use only, -func UnsetUnblockSelectorSignal() { - unblockSelectorSignal = false +func SetUnblockSelectorSignal(unblockSignal bool) { + unblockSelectorSignal = unblockSignal } diff --git a/test/integration_test.go b/test/integration_test.go index ff8aef905..b897c39ce 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6618,8 +6618,8 @@ func (ts *IntegrationTestSuite) TestSelectorNoBlock() { defer cancel() options := ts.startWorkflowOptions("test-selector-block") - internal.SetUnblockSelectorSignal() - defer internal.UnsetUnblockSelectorSignal() + internal.SetUnblockSelectorSignal(true) + defer internal.SetUnblockSelectorSignal(false) run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.SelectorBlockSignal) ts.NoError(err)