Skip to content

Commit

Permalink
*: add test for misusing keyspace ID when creating the client (#6754)
Browse files Browse the repository at this point in the history
ref #6747, ref #6748, ref #6749

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Jul 5, 2023
1 parent 69c6a34 commit dc180ca
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 41 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ report.xml
coverage.xml
coverage
*.txt
go.work*
32 changes: 32 additions & 0 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package keyspace
import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
Expand All @@ -25,6 +26,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/balancer"
"github.com/tikv/pd/pkg/mcs/discovery"
Expand Down Expand Up @@ -1010,3 +1012,33 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
zap.Reflect("merge-list", mergeList))
return nil
}

// GetKeyspaceGroupPrimaryByID returns the primary node of the keyspace group by ID.
func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
// check if the keyspace group exists
kg, err := m.GetKeyspaceGroupByID(id)
if err != nil {
return "", err
}
if kg == nil {
return "", ErrKeyspaceGroupNotExists(id)
}

// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
path := fmt.Sprintf("/ms/%d/tso/00000/primary", m.clusterID)
if id != utils.DefaultKeyspaceGroupID {
path = fmt.Sprintf("/ms/%d/tso/keyspace_groups/election/%05d/primary", m.clusterID, id)
}
leader := &tsopb.Participant{}
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, path, leader)
if err != nil {
return "", err
}
if !ok {
return "", ErrKeyspaceGroupPrimaryNotFound
}
// The format of leader name is address-groupID.
contents := strings.Split(leader.GetName(), "-")
return contents[0], err
}
3 changes: 3 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ var (
}
// Only keyspaces in the state specified by allowChangeConfig are allowed to change their config.
allowChangeConfig = []keyspacepb.KeyspaceState{keyspacepb.KeyspaceState_ENABLED, keyspacepb.KeyspaceState_DISABLED}

// ErrKeyspaceGroupPrimaryNotFound is used to indicate primary of target keyspace group does not exist.
ErrKeyspaceGroupPrimaryNotFound = errors.New("primary of keyspace group does not exist")
)

// validateID check if keyspace falls within the acceptable range.
Expand Down
113 changes: 72 additions & 41 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`))

// Init api server config but not start.
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) {
Expand All @@ -503,21 +504,6 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

// Start pd client and wait pd server start.
var clients sync.Map
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
clients.Store("keyspace_b", cli)
}()
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_a") // its keyspace id is 1.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
clients.Store("keyspace_a", cli)
}()

// Start api server and tso server.
err = tc.RunInitialServers()
re.NoError(err)
Expand All @@ -531,20 +517,6 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)

// Wait pd clients are ready.
testutil.Eventually(re, func() bool {
count := 0
clients.Range(func(_, _ interface{}) bool {
count++
return true
})
return count == 2
})
clientA, ok := clients.Load("keyspace_a")
re.True(ok)
clientB, ok := clients.Load("keyspace_b")
re.True(ok)

// First split keyspace group 0 to 1 with keyspace 2.
kgm := leaderServer.GetServer().GetKeyspaceGroupManager()
re.NotNil(kgm)
Expand All @@ -553,27 +525,15 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
return err == nil
})

// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = clientB.(pd.Client).GetTS(ctx)
return err == nil
})
waitFinishSplit(re, leaderServer, 0, 1, []uint32{mcsutils.DefaultKeyspaceID, 1}, []uint32{2})
clientB.(pd.Client).Close()

// Then split keyspace group 0 to 2 with keyspace 1.
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 2, []uint32{1})
return err == nil
})

// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = clientA.(pd.Client).GetTS(ctx)
return err == nil
})
waitFinishSplit(re, leaderServer, 0, 2, []uint32{mcsutils.DefaultKeyspaceID}, []uint32{1})
clientA.(pd.Client).Close()

// Check the keyspace group 0 is split to 1 and 2.
kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
Expand All @@ -586,6 +546,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re.False(kg1.IsSplitting())
re.False(kg2.IsSplitting())

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

Expand Down Expand Up @@ -724,3 +685,73 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient()
cancel()
wg.Wait()
}

// See https://github.com/tikv/pd/issues/6748
func TestGetTSOImmediately(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`))

// Init api server config but not start.
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = []string{
"keyspace_a", "keyspace_b",
}
})
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

// Start api server and tso server.
err = tc.RunInitialServers()
re.NoError(err)
defer tc.Destroy()
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)

// First split keyspace group 0 to 1 with keyspace 2.
kgm := leaderServer.GetServer().GetKeyspaceGroupManager()
re.NotNil(kgm)
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 1, []uint32{2})
return err == nil
})

waitFinishSplit(re, leaderServer, 0, 1, []uint32{mcsutils.DefaultKeyspaceID, 1}, []uint32{2})

kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 1)
re.Equal([]uint32{0, 1}, kg0.Keyspaces)
re.Equal([]uint32{2}, kg1.Keyspaces)
re.False(kg0.IsSplitting())
re.False(kg1.IsSplitting())

// Let group 0 and group 1 have different primary node.
kgm.SetPriorityForKeyspaceGroup(0, kg0.Members[0].Address, 100)
kgm.SetPriorityForKeyspaceGroup(1, kg1.Members[1].Address, 100)
testutil.Eventually(re, func() bool {
p0, err := kgm.GetKeyspaceGroupPrimaryByID(0)
re.NoError(err)
p1, err := kgm.GetKeyspaceGroupPrimaryByID(1)
re.NoError(err)
return p0 == kg0.Members[0].Address && p1 == kg1.Members[1].Address
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
_, _, err = cli.GetTS(ctx)
re.NoError(err)
cli.Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller"))
}

0 comments on commit dc180ca

Please sign in to comment.