Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix worker leak in eager dispatcher #1723

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
workerInterceptors: workerInterceptors,
excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry,
eagerDispatcher: &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]eagerWorker),
workersByTaskQueue: make(map[string]map[eagerWorker]struct{}),
},
}

Expand Down
23 changes: 18 additions & 5 deletions internal/internal_eager_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,37 @@ import (
// eagerWorkflowDispatcher is responsible for finding an available worker for an eager workflow task.
type eagerWorkflowDispatcher struct {
lock sync.RWMutex
workersByTaskQueue map[string][]eagerWorker
workersByTaskQueue map[string]map[eagerWorker]struct{}
}

// registerWorker registers a worker that can be used for eager workflow dispatch
func (e *eagerWorkflowDispatcher) registerWorker(worker *workflowWorker) {
e.lock.Lock()
defer e.lock.Unlock()
e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.worker)
taskQueue := worker.executionParameters.TaskQueue
if e.workersByTaskQueue[taskQueue] == nil {
e.workersByTaskQueue[taskQueue] = make(map[eagerWorker]struct{})
}
e.workersByTaskQueue[taskQueue][worker.worker] = struct{}{}
}

// deregisterWorker deregister a worker so that it will not be used for eager workflow dispatch
func (e *eagerWorkflowDispatcher) deregisterWorker(worker *workflowWorker) {
e.lock.Lock()
defer e.lock.Unlock()
delete(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.worker)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC it is unfortunate but known that Go does not reclaim memory for map entries on delete. At least it was the case in the older Go versions, but maybe it has changed (see issue https://github.com/golang/go/issues/ + 20135).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is about the memory used by the map, what we care about is making sure nothing is holding a reference to worker.worker so it can be GC'd

}

// applyToRequest updates request if eager workflow dispatch is possible and returns the eagerWorkflowExecutor to use
func (e *eagerWorkflowDispatcher) applyToRequest(request *workflowservice.StartWorkflowExecutionRequest) *eagerWorkflowExecutor {
// Try every worker that is assigned to the desired task queue.
e.lock.RLock()
workers := e.workersByTaskQueue[request.GetTaskQueue().Name]
randWorkers := make([]eagerWorker, len(workers))
// Copy the slice so we can release the lock.
copy(randWorkers, workers)
randWorkers := make([]eagerWorker, 0, len(workers))
// Copy the workers so we can release the lock.
for worker := range workers {
randWorkers = append(randWorkers, worker)
}
Comment on lines +62 to +66
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since deregister is rare I figure, it could be better maybe to burden it with the heavier work than this call, since I suspect copy is cheaper than building this from keys. But this probably doesn't matter since I can't imagine a good use case where a user would ever have more than one workflow worker for a task queue (versioning notwithstanding since this isn't built for versioning). So nothing needed, just thinking out loud.

e.lock.RUnlock()
rand.Shuffle(len(randWorkers), func(i, j int) { randWorkers[i], randWorkers[j] = randWorkers[j], randWorkers[i] })
for _, worker := range randWorkers {
Expand Down
12 changes: 6 additions & 6 deletions internal/internal_eager_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *eagerWorkerMock) pushEagerTask(task eagerTask) {

func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) {
dispatcher := &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]eagerWorker),
workersByTaskQueue: make(map[string]map[eagerWorker]struct{}),
}
dispatcher.registerWorker(&workflowWorker{
executionParameters: workerExecutionParameters{TaskQueue: "bad-task-queue"},
Expand All @@ -66,20 +66,20 @@ func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) {

func TestEagerWorkflowDispatchAvailableWorker(t *testing.T) {
dispatcher := &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]eagerWorker),
workersByTaskQueue: make(map[string]map[eagerWorker]struct{}),
}

availableWorker := &eagerWorkerMock{
tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} },
}
dispatcher.workersByTaskQueue["task-queue"] = []eagerWorker{
dispatcher.workersByTaskQueue["task-queue"] = map[eagerWorker]struct{}{
&eagerWorkerMock{
tryReserveSlotCallback: func() *SlotPermit { return nil },
},
}: {},
&eagerWorkerMock{
tryReserveSlotCallback: func() *SlotPermit { return nil },
},
availableWorker,
}: {},
availableWorker: {},
}

request := &workflowservice.StartWorkflowExecutionRequest{
Expand Down
6 changes: 6 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,9 @@ func (aw *AggregatedWorker) start() error {
// stop workflow worker.
if !util.IsInterfaceNil(aw.workflowWorker) {
if aw.workflowWorker.worker.isWorkerStarted {
if aw.client.eagerDispatcher != nil {
aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker)
}
aw.workflowWorker.Stop()
}
}
Expand Down Expand Up @@ -1218,6 +1221,9 @@ func (aw *AggregatedWorker) Stop() {
}

if !util.IsInterfaceNil(aw.workflowWorker) {
if aw.client.eagerDispatcher != nil {
aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker)
}
aw.workflowWorker.Stop()
}
if !util.IsInterfaceNil(aw.activityWorker) {
Expand Down
3 changes: 3 additions & 0 deletions internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (s *WorkersTestSuite) TestWorkflowWorker() {
workflowWorker.Stop()

s.NoError(ctx.Err())

}

type CountingSlotSupplier struct {
Expand Down Expand Up @@ -736,6 +737,8 @@ func (s *WorkersTestSuite) TestWorkerMultipleStop() {
worker := NewAggregatedWorker(client, "multi-stop-tq", WorkerOptions{})
s.NoError(worker.Start())
worker.Stop()
// Verify stopping the worker removes it from the eager dispatcher
s.Empty(client.eagerDispatcher.workersByTaskQueue["multi-stop-tq"])
worker.Stop()
}

Expand Down
8 changes: 4 additions & 4 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowNotSupported() {
},
}
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}},
workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}},
}
s.True(ok)
options := StartWorkflowOptions{
Expand Down Expand Up @@ -1446,7 +1446,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowNoWorker() {
tryReserveSlotCallback: func() *SlotPermit { return nil },
processTaskAsyncCallback: func(task eagerTask) { processTask = true }}
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}},
workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}},
}
s.True(ok)
options := StartWorkflowOptions{
Expand Down Expand Up @@ -1485,7 +1485,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflow() {
tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} },
processTaskAsyncCallback: func(task eagerTask) { processTask = true }}
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}}
workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}}}
s.True(ok)
options := StartWorkflowOptions{
ID: workflowID,
Expand Down Expand Up @@ -1525,7 +1525,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowStartRequestFail() {
tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} },
processTaskAsyncCallback: func(task eagerTask) { processTask = true }}
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}}
workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}}}
s.True(ok)
options := StartWorkflowOptions{
ID: workflowID,
Expand Down
Loading