Skip to content

Commit

Permalink
mcs: fix sync store label (tikv#7396)
Browse files Browse the repository at this point in the history
close tikv#7391, close tikv#7393, close tikv#7394

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Dec 1, 2023
1 parent 0604d49 commit 5b3af51
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 4 deletions.
20 changes: 20 additions & 0 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,23 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption {
store.lastAwakenTime = lastAwaken
}
}

// SetStoreMeta sets the meta for the store.
func SetStoreMeta(newMeta *metapb.Store) StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.Version = newMeta.GetVersion()
meta.GitHash = newMeta.GetGitHash()
meta.Address = newMeta.GetAddress()
meta.StatusAddress = newMeta.GetStatusAddress()
meta.PeerAddress = newMeta.GetPeerAddress()
meta.StartTimestamp = newMeta.GetStartTimestamp()
meta.DeployPath = newMeta.GetDeployPath()
meta.LastHeartbeat = newMeta.GetLastHeartbeat()
meta.State = newMeta.GetState()
meta.Labels = newMeta.GetLabels()
meta.NodeState = newMeta.GetNodeState()
meta.PhysicallyDestroyed = newMeta.GetPhysicallyDestroyed()
store.meta = meta
}
}
3 changes: 1 addition & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
return errors.Errorf("store %v not found", storeID)
}

nowTime := time.Now()
newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime))
newStore := store.Clone(core.SetStoreStats(stats))

if store := c.GetStore(storeID); store != nil {
statistics.UpdateStoreHeartbeatMetrics(store)
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (w *Watcher) initializeStoreWatcher() error {
w.basicCluster.PutStore(core.NewStoreInfo(store))
return nil
}
w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed())))
w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store)))
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
Expand Down
11 changes: 11 additions & 0 deletions tests/integrations/mcs/scheduling/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,15 @@ func (suite *metaTestSuite) TestStoreWatch() {
testutil.Eventually(re, func() bool {
return cluster.GetStore(2) == nil
})

// test synchronized store labels
suite.pdLeaderServer.GetServer().GetRaftCluster().PutStore(
&metapb.Store{Id: 5, Address: "mock-5", State: metapb.StoreState_Up, NodeState: metapb.NodeState_Serving, LastHeartbeat: time.Now().UnixNano(), Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}}},
)
testutil.Eventually(re, func() bool {
if len(cluster.GetStore(5).GetLabels()) == 0 {
return false
}
return cluster.GetStore(5).GetLabels()[0].GetValue() == "z1"
})
}
3 changes: 2 additions & 1 deletion tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestServerTestSuite(t *testing.T) {
func (suite *serverTestSuite) SetupSuite() {
var err error
re := suite.Require()

re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 3)
re.NoError(err)
Expand All @@ -76,6 +76,7 @@ func (suite *serverTestSuite) SetupSuite() {
func (suite *serverTestSuite) TearDownSuite() {
suite.cluster.Destroy()
suite.cancel()
suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
}

func (suite *serverTestSuite) TestAllocID() {
Expand Down
1 change: 1 addition & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ func TestRaftClusterMultipleRestart(t *testing.T) {
err = rc.PutStore(store)
re.NoError(err)
re.NotNil(tc)
rc.Stop()

// let the job run at small interval
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))
Expand Down

0 comments on commit 5b3af51

Please sign in to comment.