From 2990ebf3124dcf542b143967d6e49ac201fc3ec6 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 14 Jun 2023 10:58:13 -0700 Subject: [PATCH] Change message look ahead (#1136) Change message look ahead --- internal/internal_task_handlers.go | 46 ++++-- .../internal_task_handlers_interfaces_test.go | 71 ++++++++- internal/internal_task_handlers_test.go | 139 +++++++++++++++++- internal/internal_worker.go | 23 --- internal/internal_worker_interfaces_test.go | 28 +++- 5 files changed, 259 insertions(+), 48 deletions(-) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index ed7910254..38f40f649 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -49,6 +49,7 @@ import ( "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/internal/common/retry" + "go.temporal.io/sdk/internal/protocol" "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal/common" @@ -292,11 +293,11 @@ func isCommandEvent(eventType enumspb.EventType) bool { // NextCommandEvents returns events that there processed as new by the next command. // TODO(maxim): Refactor to return a struct instead of multiple parameters -func (eh *history) NextCommandEvents() (result []*historypb.HistoryEvent, markers []*historypb.HistoryEvent, binaryChecksum string, sdkFlags []sdkFlag, err error) { +func (eh *history) NextCommandEvents() (result []*historypb.HistoryEvent, markers []*historypb.HistoryEvent, binaryChecksum string, sdkFlags []sdkFlag, msgs []*protocolpb.Message, err error) { if eh.next == nil { - eh.next, _, eh.nextFlags, err = eh.nextCommandEvents() + eh.next, _, eh.nextFlags, _, err = eh.nextCommandEvents() if err != nil { - return result, markers, eh.binaryChecksum, sdkFlags, err + return result, markers, eh.binaryChecksum, sdkFlags, msgs, err } } @@ -304,9 +305,9 @@ func (eh *history) NextCommandEvents() (result []*historypb.HistoryEvent, marker checksum := eh.binaryChecksum sdkFlags = eh.nextFlags if len(result) > 0 { - eh.next, markers, eh.nextFlags, err = eh.nextCommandEvents() + eh.next, markers, eh.nextFlags, msgs, err = eh.nextCommandEvents() } - return result, markers, checksum, sdkFlags, err + return result, markers, checksum, sdkFlags, msgs, err } func (eh *history) hasMoreEvents() bool { @@ -334,12 +335,12 @@ func (eh *history) verifyAllEventsProcessed() error { return nil } -func (eh *history) nextCommandEvents() (nextEvents []*historypb.HistoryEvent, markers []*historypb.HistoryEvent, sdkFlags []sdkFlag, err error) { +func (eh *history) nextCommandEvents() (nextEvents []*historypb.HistoryEvent, markers []*historypb.HistoryEvent, sdkFlags []sdkFlag, msgs []*protocolpb.Message, err error) { if eh.currentIndex == len(eh.loadedEvents) && !eh.hasMoreEvents() { if err := eh.verifyAllEventsProcessed(); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } - return []*historypb.HistoryEvent{}, []*historypb.HistoryEvent{}, []sdkFlag{}, nil + return []*historypb.HistoryEvent{}, []*historypb.HistoryEvent{}, []sdkFlag{}, []*protocolpb.Message{}, nil } // Process events @@ -391,6 +392,15 @@ OrderEvents: default: if isPreloadMarkerEvent(event) { markers = append(markers, event) + } else if attrs := event.GetWorkflowExecutionUpdateAcceptedEventAttributes(); attrs != nil { + msgs = append(msgs, &protocolpb.Message{ + Id: attrs.GetAcceptedRequestMessageId(), + ProtocolInstanceId: attrs.GetProtocolInstanceId(), + SequencingId: &protocolpb.Message_EventId{ + EventId: attrs.GetAcceptedRequestSequencingEventId(), + }, + Body: protocol.MustMarshalAny(attrs.GetAcceptedRequest()), + }) } nextEvents = append(nextEvents, event) } @@ -408,7 +418,7 @@ OrderEvents: eh.currentIndex = 0 - return nextEvents, markers, sdkFlags, nil + return nextEvents, markers, sdkFlags, msgs, nil } func isPreloadMarkerEvent(event *historypb.HistoryEvent) bool { @@ -566,7 +576,6 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice. if taskQueue == nil || taskQueue.Name == "" { return nil, errors.New("nil or empty TaskQueue in WorkflowExecutionStarted event") } - task.Messages = append(inferMessages(task.GetHistory().GetEvents()), task.Messages...) runID := task.WorkflowExecution.GetRunId() workflowID := task.WorkflowExecution.GetWorkflowId() @@ -713,7 +722,6 @@ func (w *workflowExecutionContextImpl) resetStateIfDestroyed(task *workflowservi return err } } - task.Messages = append(inferMessages(task.GetHistory().GetEvents()), task.Messages...) if w.workflowInfo != nil { // Reset the search attributes and memos from the WorkflowExecutionStartedEvent. // The search attributes and memo may have been modified by calls like UpsertMemo @@ -878,8 +886,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo var replayCommands []*commandpb.Command var respondEvents []*historypb.HistoryEvent - msgs := indexMessagesByEventID(workflowTask.task.GetMessages()) - + taskMessages := workflowTask.task.GetMessages() skipReplayCheck := w.skipReplayCheck() shouldForceReplayCheck := func() bool { isInReplayer := IsReplayNamespace(w.wth.namespace) @@ -899,7 +906,17 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo ProcessEvents: for { - reorderedEvents, markers, binaryChecksum, flags, err := reorderedHistory.NextCommandEvents() + reorderedEvents, markers, binaryChecksum, flags, historyMessages, err := reorderedHistory.NextCommandEvents() + // Check if we are replaying so we know if we should use the messages in the WFT or the history + isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) + var msgs *eventMsgIndex + if isReplay { + msgs = indexMessagesByEventID(historyMessages) + } else { + msgs = indexMessagesByEventID(taskMessages) + taskMessages = []*protocolpb.Message{} + } + if err != nil { return nil, err } @@ -993,7 +1010,6 @@ ProcessEvents: } } } - isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) if isReplay { eventCommands := eventHandler.commandsHelper.getCommands(true) if !skipReplayCheck { diff --git a/internal/internal_task_handlers_interfaces_test.go b/internal/internal_task_handlers_interfaces_test.go index 9e5df5bb6..8fbbf12f0 100644 --- a/internal/internal_task_handlers_interfaces_test.go +++ b/internal/internal_task_handlers_interfaces_test.go @@ -34,6 +34,7 @@ import ( historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/sdk/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" + updatepb "go.temporal.io/api/update/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/api/workflowservicemock/v1" ) @@ -179,7 +180,7 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommands() { eh := newHistory(workflowTask, nil) - events, _, _, _, err := eh.NextCommandEvents() + events, _, _, _, _, err := eh.NextCommandEvents() s.NoError(err) s.Equal(3, len(events)) @@ -222,7 +223,7 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommandsSdkFlags() { eh := newHistory(workflowTask, nil) - events, _, _, sdkFlags, err := eh.NextCommandEvents() + events, _, _, sdkFlags, _, err := eh.NextCommandEvents() s.NoError(err) s.Equal(2, len(events)) @@ -232,7 +233,7 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommandsSdkFlags() { s.Equal(1, len(sdkFlags)) s.EqualValues(SDKFlagLimitChangeVersionSASize, sdkFlags[0]) - events, _, _, sdkFlags, err = eh.NextCommandEvents() + events, _, _, sdkFlags, _, err = eh.NextCommandEvents() s.NoError(err) s.Equal(4, len(events)) @@ -243,3 +244,67 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommandsSdkFlags() { s.Equal(0, len(sdkFlags)) } + +func (s *PollLayerInterfacesTestSuite) TestMessageCommands() { + // Schedule an activity and see if we complete workflow. + taskQueue := "tq1" + testEvents := []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}), + createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}), + createTestEventWorkflowTaskStarted(3), + { + EventId: 4, + EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED, + }, + createTestEventWorkflowTaskScheduled(5, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}), + createTestEventWorkflowTaskStarted(6), + createTestEventWorkflowTaskCompleted(7, &historypb.WorkflowTaskCompletedEventAttributes{ + ScheduledEventId: 5, + StartedEventId: 6, + }), + { + EventId: 8, + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, + Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{ + WorkflowExecutionUpdateAcceptedEventAttributes: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{ + ProtocolInstanceId: "test", + AcceptedRequest: &updatepb.Request{}, + }, + }, + }, + createTestEventWorkflowTaskScheduled(9, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}), + createTestEventWorkflowTaskStarted(10), + } + task := createWorkflowTaskWithQueries(testEvents[0:3], 0, "HelloWorld_Workflow", nil, false) + + historyIterator := &historyIteratorImpl{ + iteratorFunc: func(nextToken []byte) (*historypb.History, []byte, error) { + return &historypb.History{ + Events: testEvents[3:], + }, nil, nil + }, + nextPageToken: []byte("test"), + } + + workflowTask := &workflowTask{task: task, historyIterator: historyIterator} + + eh := newHistory(workflowTask, nil) + + events, _, _, _, msgs, err := eh.NextCommandEvents() + s.NoError(err) + s.Equal(2, len(events)) + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, events[0].GetEventType()) + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, events[1].GetEventType()) + + s.Equal(1, len(msgs)) + s.Equal("test", msgs[0].GetProtocolInstanceId()) + + events, _, _, _, msgs, err = eh.NextCommandEvents() + s.NoError(err) + s.Equal(3, len(events)) + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED, events[0].GetEventType()) + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, events[1].GetEventType()) + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, events[2].GetEventType()) + + s.Equal(0, len(msgs)) +} diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 91c9365a1..9436ec06d 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -44,6 +44,7 @@ import ( historypb "go.temporal.io/api/history/v1" protocolpb "go.temporal.io/api/protocol/v1" querypb "go.temporal.io/api/query/v1" + "go.temporal.io/api/sdk/v1" "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" updatepb "go.temporal.io/api/update/v1" @@ -105,6 +106,10 @@ func registerWorkflows(r *registry) { binaryChecksumWorkflowFunc, RegisterWorkflowOptions{Name: "BinaryChecksumWorkflow"}, ) + r.RegisterWorkflowWithOptions( + helloUpdateWorkflowFunc, + RegisterWorkflowOptions{Name: "HelloUpdate_Workflow"}, + ) } func returnPanicWorkflowFunc(Context, []byte) error { @@ -496,6 +501,16 @@ func (t *TaskHandlersTestSuite) getTestWorkerExecutionParams() workerExecutionPa Logger: t.logger, FailureConverter: GetDefaultFailureConverter(), cache: cache, + capabilities: &workflowservice.GetSystemInfoResponse_Capabilities{ + SignalAndQueryHeader: true, + InternalErrorDifferentiation: true, + ActivityFailureIncludeHeartbeat: true, + SupportsSchedules: true, + EncodedFailureAttributes: true, + UpsertMemo: true, + EagerWorkflowStart: true, + SdkMetadata: true, + }, } } @@ -1223,6 +1238,122 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_PageToken() { t.NotNil(response) } +func (t *TaskHandlersTestSuite) TestWorkflowTask_Messages() { + taskQueue := "taskQueue" + testEvents := []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: testWorkflowTaskTaskqueue}}), + createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}), + createTestEventWorkflowTaskStarted(3), + createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{ + ScheduledEventId: 2, + StartedEventId: 3, + SdkMetadata: &sdk.WorkflowTaskCompletedMetadata{ + LangUsedFlags: []uint32{ + 3, + }, + }, + }), + { + EventId: 5, + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, + Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{ + WorkflowExecutionUpdateAcceptedEventAttributes: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{ + AcceptedRequestSequencingEventId: 2, + ProtocolInstanceId: "test", + AcceptedRequest: &updatepb.Request{ + Meta: &updatepb.Meta{ + UpdateId: "test", + }, + Input: &updatepb.Input{ + Name: updateType, + }, + }, + }, + }, + }, + createTestEventActivityTaskScheduled(6, &historypb.ActivityTaskScheduledEventAttributes{ + ActivityId: "6", + ActivityType: &commonpb.ActivityType{Name: "Greeter_Activity"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + }), + } + // createWorkflowTask add a schedule and start event + task := createWorkflowTask(testEvents, 0, "HelloUpdate_Workflow") + task.NextPageToken = []byte("token") + task.PreviousStartedEventId = 14 + + params := t.getTestWorkerExecutionParams() + + nextEvents := []*historypb.HistoryEvent{ + createTestEventWorkflowTaskCompleted(9, &historypb.WorkflowTaskCompletedEventAttributes{ + ScheduledEventId: 7, + StartedEventId: 8, + }), + { + EventId: 10, + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, + Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{ + WorkflowExecutionUpdateAcceptedEventAttributes: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{ + AcceptedRequestSequencingEventId: 5, + ProtocolInstanceId: "test_2", + AcceptedRequest: &updatepb.Request{ + Meta: &updatepb.Meta{ + UpdateId: "test_2", + }, + Input: &updatepb.Input{ + Name: updateType, + }, + }, + }, + }, + }, + createTestEventActivityTaskScheduled(11, &historypb.ActivityTaskScheduledEventAttributes{ + ActivityId: "11", + ActivityType: &commonpb.ActivityType{Name: "Greeter_Activity"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + }), + { + EventId: 12, + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, + Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{ + WorkflowExecutionUpdateAcceptedEventAttributes: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{ + AcceptedRequestSequencingEventId: 5, + ProtocolInstanceId: "test_3", + AcceptedRequest: &updatepb.Request{ + Meta: &updatepb.Meta{ + UpdateId: "test_3", + }, + Input: &updatepb.Input{ + Name: updateType, + }, + }, + }, + }, + }, + createTestEventActivityTaskScheduled(13, &historypb.ActivityTaskScheduledEventAttributes{ + ActivityId: "13", + ActivityType: &commonpb.ActivityType{Name: "Greeter_Activity"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + }), + createTestEventWorkflowTaskScheduled(14, &historypb.WorkflowTaskScheduledEventAttributes{}), + createTestEventWorkflowTaskStarted(15), + } + + historyIterator := &historyIteratorImpl{ + nextPageToken: []byte("token"), + iteratorFunc: func(nextToken []byte) (*historypb.History, []byte, error) { + return &historypb.History{Events: nextEvents}, nil, nil + }, + } + taskHandler := newWorkflowTaskHandler(params, nil, t.registry) + request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{ + task: task, historyIterator: historyIterator, + }, nil) + response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) + t.NoError(err) + t.NotNil(response) +} + func (t *TaskHandlersTestSuite) TestLocalActivityRetry_Workflow() { backoffInterval := 10 * time.Millisecond workflowComplete := false @@ -2034,7 +2165,7 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) { createTestEventWorkflowTaskScheduled(2, nil), createTestEventWorkflowTaskStarted(3), createTestEventWorkflowTaskCompleted(4, nil), - &historypb.HistoryEvent{ + { EventId: 5, EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{ @@ -2105,7 +2236,7 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) { t.Run("cache miss", func(t *testing.T) { task := &workflowservice.PollWorkflowTaskQueueResponse{ History: taskHist, - Messages: []*protocolpb.Message{&protocolpb.Message{Id: wftNewMsgID}}, + Messages: []*protocolpb.Message{{Id: wftNewMsgID}}, } require.EqualValues(t, 0, cache.Size()) @@ -2117,12 +2248,11 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) { require.Len(t, task.History.Events, len(fullHist.Events), "expected task to be mutated to carry full WF history (all events)") requireContainsMsgWithID(t, task.Messages, wftNewMsgID) - requireContainsMsgWithID(t, task.Messages, historyAcceptedMsgID) }) t.Run("cache hit but destroyed", func(t *testing.T) { task := &workflowservice.PollWorkflowTaskQueueResponse{ History: taskHist, - Messages: []*protocolpb.Message{&protocolpb.Message{Id: wftNewMsgID}}, + Messages: []*protocolpb.Message{{Id: wftNewMsgID}}, } // trick the execution context into thinking it has been destroyed @@ -2133,6 +2263,5 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) { require.Len(t, task.History.Events, len(fullHist.Events), "expected task to be mutated to carry full WF history (all events)") requireContainsMsgWithID(t, task.Messages, wftNewMsgID) - requireContainsMsgWithID(t, task.Messages, historyAcceptedMsgID) }) } diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 16dd36c2a..e785d1ece 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -49,7 +49,6 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" - protocolpb "go.temporal.io/api/protocol/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/api/workflowservicemock/v1" @@ -58,7 +57,6 @@ import ( "go.temporal.io/sdk/internal/common/serializer" "go.temporal.io/sdk/internal/common/util" ilog "go.temporal.io/sdk/internal/log" - "go.temporal.io/sdk/internal/protocol" "go.temporal.io/sdk/log" ) @@ -1289,27 +1287,6 @@ func (aw *WorkflowReplayer) GetWorkflowResult(workflowID string, valuePtr interf return dc.FromPayloads(payloads, valuePtr) } -// inferMessages extracts the set of *interactionpb.Invocation objects that -// should be attached to a workflow task (i.e. the -// PollWorkflowTaskQueueResponse.Messages) if that task were to carry the -// provided slice of history events. -func inferMessages(events []*historypb.HistoryEvent) []*protocolpb.Message { - var messages []*protocolpb.Message - for _, e := range events { - if attrs := e.GetWorkflowExecutionUpdateAcceptedEventAttributes(); attrs != nil { - messages = append(messages, &protocolpb.Message{ - Id: attrs.GetAcceptedRequestMessageId(), - ProtocolInstanceId: attrs.GetProtocolInstanceId(), - SequencingId: &protocolpb.Message_EventId{ - EventId: attrs.GetAcceptedRequestSequencingEventId(), - }, - Body: protocol.MustMarshalAny(attrs.GetAcceptedRequest()), - }) - } - } - return messages -} - func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service workflowservice.WorkflowServiceClient, namespace string, originalExecution WorkflowExecution, history *historypb.History) error { taskQueue := "ReplayTaskQueue" events := history.Events diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 0135dc5bc..d70564baa 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -44,6 +44,7 @@ import ( const ( queryType = "test-query" + updateType = "update-query" errQueryType = "test-err-query" signalCh = "signal-chan" @@ -54,8 +55,7 @@ const ( type ( // Greeter activity - greeterActivity struct { - } + greeterActivity struct{} InterfacesTestSuite struct { suite.Suite @@ -91,6 +91,30 @@ func helloWorldWorkflowFunc(ctx Context, _ []byte) error { return err } +func helloUpdateWorkflowFunc(ctx Context, _ []byte) error { + err := setUpdateHandler(ctx, updateType, func(ctx Context) (string, error) { + activityName := "Greeter_Activity" + ao := ActivityOptions{ + TaskQueue: "taskQueue", + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: 20 * time.Second, + } + ctx = WithActivityOptions(ctx, ao) + var result string + err := ExecuteActivity(ctx, activityName).Get(ctx, &result) + return result, err + }, UpdateHandlerOptions{}) + if err != nil { + return err + } + + ch := GetSignalChannel(ctx, signalCh) + var signalResult string + ch.Receive(ctx, &signalResult) + return nil +} + func querySignalWorkflowFunc(ctx Context, numSignals int) error { queryResult := startingQueryValue _ = SetQueryHandler(ctx, queryType, func() (string, error) {