Skip to content

Commit

Permalink
Respond to PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Nov 30, 2023
1 parent cba9a6b commit c43c47f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 49 deletions.
41 changes: 22 additions & 19 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ type (
sdkVersion string
sdkNameUpdated bool
sdkName string
// Any updates received in a workflow task before we have registered
// any handlers are queued here until either their handler is registered
// or the event loop runs out of work and they are rejected.
blockedUpdates map[string][]func()
// Any update requests received in a workflow task before we have registered
// any handlers are not scheduled and are queued here until either their
// handler is registered or the event loop runs out of work and they are rejected.
bufferedUpdateRequests map[string][]func()

protocols *protocol.Registry
}
Expand Down Expand Up @@ -246,7 +246,7 @@ func newWorkflowExecutionEventHandler(
protocols: protocol.NewRegistry(),
mutableSideEffectCallCounter: make(map[string]int),
sdkFlags: newSDKFlags(capabilities),
blockedUpdates: make(map[string][]func()),
bufferedUpdateRequests: make(map[string][]func()),
}
// Attempt to skip 1 log level to remove the ReplayLogger from the stack.
context.logger = log.Skip(ilog.NewReplayLogger(
Expand Down Expand Up @@ -891,34 +891,37 @@ func (wc *workflowEnvironmentImpl) TryUse(flag sdkFlag) bool {
}

func (wc *workflowEnvironmentImpl) QueueUpdate(name string, f func()) {
wc.blockedUpdates[name] = append(wc.blockedUpdates[name], f)
wc.bufferedUpdateRequests[name] = append(wc.bufferedUpdateRequests[name], f)
}

func (wc *workflowEnvironmentImpl) HandleUpdates(name string) bool {
if !wc.sdkFlags.tryUse(SDKPriorityUpdateHandling, !wc.isReplay) {
return false
}
updatesHandled := false
if blockedUpdates, ok := wc.blockedUpdates[name]; wc.sdkFlags.tryUse(SDKPriorityUpdateHandling, !wc.isReplay) && ok {
updatesHandled = true
for _, update := range blockedUpdates {
update()
if bufferedUpdateRequests, ok := wc.bufferedUpdateRequests[name]; ok {
for _, request := range bufferedUpdateRequests {
request()
updatesHandled = true
}
delete(wc.blockedUpdates, name)
delete(wc.bufferedUpdateRequests, name)
}
return updatesHandled
}

func (wc *workflowEnvironmentImpl) DrainUnhandledUpdates() bool {
rerun := false
// Check if any blocked updates remain when we have no more coroutines to run and let them run so they are rejected.
anyExecuted := false
// Check if any buffered update requests remain when we have no more coroutines to run and let them schedule so they are rejected.
// Generally iterating a map in workflow code is bad because it is non deterministic
// this case is fine since all these update handles will be rejected and not recorded in history.
for name, us := range wc.blockedUpdates {
for _, u := range us {
u()
for name, requests := range wc.bufferedUpdateRequests {
for _, request := range requests {
request()
anyExecuted = true
}
rerun = true
delete(wc.blockedUpdates, name)
delete(wc.bufferedUpdateRequests, name)
}
return rerun
return anyExecuted
}

// lookupMutableSideEffect gets the current value of the MutableSideEffect for id for the
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestDefaultUpdateHandler(t *testing.T) {
Namespace: "namespace:" + t.Name(),
TaskQueueName: "taskqueue:" + t.Name(),
},
blockedUpdates: make(map[string][]func()),
bufferedUpdateRequests: make(map[string][]func()),
}
interceptor, ctx, err := newWorkflowContext(env, nil)
require.NoError(t, err)
Expand Down
14 changes: 8 additions & 6 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,8 @@ func (d *dispatcherImpl) newState(name string, highPriority bool) *coroutineStat
}
d.sequence++
if highPriority {
// Update requests need to be added to the front of the dispatchers coroutine list so they
// are handled before the root coroutine.
d.newEagerCoroutines = append(d.newEagerCoroutines, c)
} else {
d.coroutines = append(d.coroutines, c)
Expand Down Expand Up @@ -1135,10 +1137,12 @@ func (d *dispatcherImpl) ExecuteUntilAllBlocked(deadlockDetectionTimeout time.Du
}
// Set allBlocked to false if new coroutines where created
allBlocked = allBlocked && lastSequence == d.sequence && len(d.newEagerCoroutines) == 0
d.coroutines = append(d.newEagerCoroutines, d.coroutines...)
d.newEagerCoroutines = nil
if len(d.coroutines) == 0 && !d.allBlockedCallback() {
break
if len(d.newEagerCoroutines) > 0 {
d.coroutines = append(d.newEagerCoroutines, d.coroutines...)
d.newEagerCoroutines = nil
allBlocked = false
} else {
allBlocked = allBlocked && lastSequence == d.sequence
}
}
return nil
Expand Down Expand Up @@ -1662,8 +1666,6 @@ func (wg *waitGroupImpl) Wait(ctx Context) {

// Spawn starts a new coroutine with Dispatcher.NewCoroutine
func (us updateSchedulerImpl) Spawn(ctx Context, name string, highPriority bool, f func(Context)) Context {
// Update requests need to be added to the front of the dispatchers coroutine list so they
// are handled before the root coroutine.
return us.dispatcher.NewCoroutine(ctx, name, highPriority, f)
}

Expand Down
42 changes: 21 additions & 21 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ type (
activityEnvOnly bool

workflowFunctionExecuting bool
blockedUpdates map[string][]func()
bufferedUpdateRequests map[string][]func()
}

testSessionEnvironmentImpl struct {
Expand Down Expand Up @@ -261,12 +261,12 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist
changeVersions: make(map[string]Version),
openSessions: make(map[string]*SessionInfo),

doneChannel: make(chan struct{}),
workerStopChannel: make(chan struct{}),
dataConverter: converter.GetDefaultDataConverter(),
failureConverter: GetDefaultFailureConverter(),
runTimeout: maxWorkflowTimeout,
blockedUpdates: make(map[string][]func()),
doneChannel: make(chan struct{}),
workerStopChannel: make(chan struct{}),
dataConverter: converter.GetDefaultDataConverter(),
failureConverter: GetDefaultFailureConverter(),
runTimeout: maxWorkflowTimeout,
bufferedUpdateRequests: make(map[string][]func()),
}

if debugMode {
Expand Down Expand Up @@ -551,17 +551,17 @@ func (env *testWorkflowEnvironmentImpl) TryUse(flag sdkFlag) bool {
}

func (env *testWorkflowEnvironmentImpl) QueueUpdate(name string, f func()) {
env.blockedUpdates[name] = append(env.blockedUpdates[name], f)
env.bufferedUpdateRequests[name] = append(env.bufferedUpdateRequests[name], f)
}

func (env *testWorkflowEnvironmentImpl) HandleUpdates(name string) bool {
updatesHandled := false
if blockedUpdates, ok := env.blockedUpdates[name]; ok {
updatesHandled = true
for _, update := range blockedUpdates {
update()
if bufferedUpdateRequests, ok := env.bufferedUpdateRequests[name]; ok {
for _, requests := range bufferedUpdateRequests {
requests()
updatesHandled = true
}
delete(env.blockedUpdates, name)
delete(env.bufferedUpdateRequests, name)
}
return updatesHandled
}
Expand All @@ -573,18 +573,18 @@ func (env *testWorkflowEnvironmentImpl) DrainUnhandledUpdates() bool {
if !env.workflowFunctionExecuting {
return false
}
rerun := false
// Check if any blocked updates remain when we have no more coroutines to run and let them run so they are rejected.
anyExecuted := false
// Check if any buffered update requests remain when we have no more coroutines to run and let them schedule so they are rejected.
// Generally iterating a map in workflow code is bad because it is non deterministic
// this case is fine since all these update handles will be rejected and not recorded in history.
for name, us := range env.blockedUpdates {
for _, u := range us {
u()
for name, bufferedUpdateRequests := range env.bufferedUpdateRequests {
for _, request := range bufferedUpdateRequests {
request()
anyExecuted = true
}
rerun = true
delete(env.blockedUpdates, name)
delete(env.bufferedUpdateRequests, name)
}
return rerun
return anyExecuted
}

func (env *testWorkflowEnvironmentImpl) executeActivity(
Expand Down
18 changes: 16 additions & 2 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2464,7 +2464,6 @@ func (ts *IntegrationTestSuite) TestUpdateWithWrongHandleRejected() {
func (ts *IntegrationTestSuite) TestWaitOnUpdate() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// We want to start a single long-running activity in a session
options := ts.startWorkflowOptions("test-wait-on-update")
options.StartDelay = time.Hour
run, err := ts.client.ExecuteWorkflow(ctx,
Expand All @@ -2484,7 +2483,6 @@ func (ts *IntegrationTestSuite) TestWaitOnUpdate() {
func (ts *IntegrationTestSuite) TestUpdateOrdering() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// We want to start a single long-running activity in a session
options := ts.startWorkflowOptions("test-update-ordering")
options.StartDelay = time.Hour
run, err := ts.client.ExecuteWorkflow(ctx,
Expand Down Expand Up @@ -2561,6 +2559,22 @@ func (ts *IntegrationTestSuite) testUpdateOrderingCancel(cancelWf bool) {
ts.NoError(run.Get(ctx, &result))
ts.Equal(10, result)
}

func (ts *IntegrationTestSuite) TestUpdateAlwaysHandled() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-update-always-handled")
options.StartDelay = time.Hour
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.UpdateSetHandlerOnly)
ts.NoError(err)
// Send an update before the first workflow task
_, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update")
ts.NoError(err)
var result int
ts.NoError(run.Get(ctx, &result))
ts.Equal(1, result)
}

func (ts *IntegrationTestSuite) TestSessionOnWorkerFailure() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down
11 changes: 11 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2168,6 +2168,16 @@ func (w *Workflows) WaitOnUpdate(ctx workflow.Context) (int, error) {
return updatesRan, nil
}

func (w *Workflows) UpdateSetHandlerOnly(ctx workflow.Context) (int, error) {
updatesRan := 0
updateHandle := func(ctx workflow.Context) error {
updatesRan++
return nil
}
workflow.SetUpdateHandler(ctx, "update", updateHandle)
return updatesRan, nil
}

func (w *Workflows) UpdateOrdering(ctx workflow.Context) (int, error) {
updatesRan := 0
updateHandle := func(ctx workflow.Context) error {
Expand Down Expand Up @@ -2528,6 +2538,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.timer)
worker.RegisterWorkflow(w.WaitOnUpdate)
worker.RegisterWorkflow(w.UpdateOrdering)
worker.RegisterWorkflow(w.UpdateSetHandlerOnly)
}

func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions {
Expand Down

0 comments on commit c43c47f

Please sign in to comment.