diff --git a/server/schedule/checker/replica_strategy.go b/server/schedule/checker/replica_strategy.go index 1adbf383ad6..0f816df28c6 100644 --- a/server/schedule/checker/replica_strategy.go +++ b/server/schedule/checker/replica_strategy.go @@ -93,8 +93,12 @@ func (s *ReplicaStrategy) SelectStoreToFix(coLocationStores []*core.StoreInfo, o func (s *ReplicaStrategy) SelectStoreToImprove(coLocationStores []*core.StoreInfo, old uint64) uint64 { // trick to avoid creating a slice with `old` removed. s.swapStoreToFirst(coLocationStores, old) + oldStore := s.cluster.GetStore(old) + if oldStore == nil { + return 0 + } filters := []filter.Filter{ - filter.NewLocationImprover(s.checkerName, s.locationLabels, coLocationStores, s.cluster.GetStore(old)), + filter.NewLocationImprover(s.checkerName, s.locationLabels, coLocationStores, oldStore), } if len(s.locationLabels) > 0 && s.isolationLevel != "" { filters = append(filters, filter.NewIsolationFilter(s.checkerName, s.isolationLevel, s.locationLabels, coLocationStores[1:])) diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 4a6c2216c67..527c35376c1 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -279,6 +279,9 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * // Group peers by the engine of their stores for _, peer := range region.GetPeers() { store := r.cluster.GetStore(peer.GetStoreId()) + if store == nil { + return nil + } if ordinaryFilter.Target(r.cluster.GetOpts(), store) { ordinaryPeers[peer.GetId()] = peer } else { @@ -407,6 +410,9 @@ func (r *RegionScatterer) selectAvailableLeaderStores(group string, peers map[ui leaderCandidateStores := make([]uint64, 0) for storeID := range peers { store := r.cluster.GetStore(storeID) + if store == nil { + return 0 + } engine := store.GetLabelValue(filter.EngineKey) if len(engine) < 1 { leaderCandidateStores = append(leaderCandidateStores, storeID) @@ -431,6 +437,9 @@ func (r *RegionScatterer) Put(peers map[uint64]*metapb.Peer, leaderStoreID uint6 for _, peer := range peers { storeID := peer.GetStoreId() store := r.cluster.GetStore(storeID) + if store == nil { + continue + } if ordinaryFilter.Target(r.cluster.GetOpts(), store) { r.ordinaryEngine.selectedPeer.Put(storeID, group) scatterDistributionCounter.WithLabelValues( diff --git a/server/schedulers/shuffle_region.go b/server/schedulers/shuffle_region.go index 487e3bcaebb..996dc2416a6 100644 --- a/server/schedulers/shuffle_region.go +++ b/server/schedulers/shuffle_region.go @@ -153,7 +153,11 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster opt.Cluster) (*core. } func (s *shuffleRegionScheduler) scheduleAddPeer(cluster opt.Cluster, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer { - scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster, region, cluster.GetStore(oldPeer.GetStoreId())) + store := cluster.GetStore(oldPeer.GetStoreId()) + if store == nil { + return nil + } + scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster, region, store) excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIds()) target := filter.NewCandidates(cluster.GetStores()). diff --git a/server/statistics/store.go b/server/statistics/store.go index ab616b00cb0..74573661a52 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -113,7 +113,7 @@ func (s *StoresStats) GetStoresLoads() map[uint64][]float64 { func (s *StoresStats) storeIsUnhealthy(cluster core.StoreSetInformer, storeID uint64) bool { store := cluster.GetStore(storeID) - return store.IsTombstone() || store.IsUnhealthy() || store.IsPhysicallyDestroyed() + return store == nil || store.IsTombstone() || store.IsUnhealthy() || store.IsPhysicallyDestroyed() } // FilterUnhealthyStore filter unhealthy store diff --git a/server/statistics/store_test.go b/server/statistics/store_test.go new file mode 100644 index 00000000000..e3247ea1c46 --- /dev/null +++ b/server/statistics/store_test.go @@ -0,0 +1,48 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/server/core" +) + +var _ = Suite(&testStoreSuite{}) + +type testStoreSuite struct{} + +func (s *testStoreSuite) TestFilterUnhealtyStore(c *C) { + stats := NewStoresStats() + cluster := core.NewBasicCluster() + for i := uint64(1); i <= 5; i++ { + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: i}, core.SetLastHeartbeatTS(time.Now()))) + stats.Observe(i, &pdpb.StoreStats{}) + } + c.Assert(stats.GetStoresLoads(), HasLen, 5) + + cluster.PutStore(cluster.GetStore(1).Clone(core.SetLastHeartbeatTS(time.Now().Add(-24 * time.Hour)))) + cluster.PutStore(cluster.GetStore(2).Clone(core.TombstoneStore())) + cluster.DeleteStore(cluster.GetStore(3)) + + stats.FilterUnhealthyStore(cluster) + loads := stats.GetStoresLoads() + c.Assert(loads, HasLen, 2) + c.Assert(loads[4], NotNil) + c.Assert(loads[5], NotNil) +}