Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix startWatchLoop leak #6352

Merged
merged 2 commits into from
Apr 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.manager = NewKeyspaceManager(store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(kgm.Bootstrap())
suite.NoError(kgm.Bootstrap(suite.ctx))
suite.NoError(suite.manager.Bootstrap())
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupSt
}

// Bootstrap saves default keyspace group info and init group mapping in the memory.
func (m *GroupManager) Bootstrap() error {
func (m *GroupManager) Bootstrap(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have understood the problem, and I feel this fix could be cleaner, though it can fix the problem. The GroupManage still holds the context directly inherited from pd server, but the resources are under RaftCluster's context.

With the current logic, keyspaceGroupManager's state is leader instance specific, can we new KeyspaceGroupManager inside of RaftCluster.Start with RaftCluster's context, just as NewRuleManager/NewRegionLabeler/NewReplicationModeManager does. We'll naturally avoid the bug.

Even better, can every API node, including leader and follower, keep and track keyspace group's membership/tso service registry/distribution change to serve keyspace group meta query, but only leader can serve membership/distribution write requests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we new a keyspaceManger, we need to pass keyspaceGroupManager as a parameter.

// Force the membership restriction that the default keyspace must belong to default keyspace group.
// Have no information to specify the distribution of the default keyspace group replicas, so just
// leave the replica/member list empty. The TSO service will assign the default keyspace group replica
Expand Down Expand Up @@ -135,7 +135,7 @@ func (m *GroupManager) Bootstrap() error {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
m.serviceRegistryMap = make(map[string]string)
m.wg.Add(1)
go m.startWatchLoop()
go m.startWatchLoop(ctx)
}
return nil
}
Expand All @@ -146,10 +146,10 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) startWatchLoop() {
func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
defer logutil.LogPanic()
defer m.wg.Done()
ctx, cancel := context.WithCancel(m.ctx)
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
var (
resp *clientv3.GetResponse
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
idAllocator := mockid.NewIDAllocator()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
suite.kg = NewKeyspaceManager(store, cluster, idAllocator, &mockConfig{}, suite.kgm)
suite.NoError(suite.kgm.Bootstrap())
suite.NoError(suite.kgm.Bootstrap(suite.ctx))
}

func (suite *keyspaceGroupTestSuite) TearDownTest() {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}
if s.IsAPIServiceMode() {
err = c.keyspaceGroupManager.Bootstrap()
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
if err != nil {
return err
}
Expand Down