From d433428939adfe499a6cf548602e69f9e836b7b0 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 6 Jun 2023 12:32:40 +0800 Subject: [PATCH] replace GetAllocator with AllocID Signed-off-by: Ryan Leung --- pkg/mock/mockcluster/mockcluster.go | 11 +++++------ pkg/replication/replication_mode.go | 8 ++++---- pkg/schedule/core/cluster_informer.go | 3 +-- pkg/schedule/operator/builder.go | 2 +- pkg/syncer/client.go | 4 ++++ pkg/unsaferecovery/unsafe_recovery_controller.go | 7 +++---- pkg/utils/grpcutil/grpcutil.go | 1 + server/cluster/cluster.go | 6 +++--- server/cluster/cluster_test.go | 2 +- tests/server/cluster/cluster_test.go | 10 +++++----- 10 files changed, 28 insertions(+), 26 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index efb13797819..051856abf81 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/mock/mockid" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/labeler" @@ -99,9 +98,9 @@ func (mc *Cluster) GetStorage() storage.Storage { return mc.Storage } -// GetAllocator returns the ID allocator. -func (mc *Cluster) GetAllocator() id.Allocator { - return mc.IDAllocator +// AllocID returns a new unique ID. +func (mc *Cluster) AllocID() (uint64, error) { + return mc.IDAllocator.Alloc() } // GetPersistOptions returns the persist options. @@ -185,7 +184,7 @@ func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics // AllocPeer allocs a new peer on a store. func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { - peerID, err := mc.GetAllocator().Alloc() + peerID, err := mc.AllocID() if err != nil { log.Error("failed to alloc peer", errs.ZapError(err)) return nil, err @@ -358,7 +357,7 @@ func (mc *Cluster) AddRegionStoreWithLeader(storeID uint64, regionCount int, lea } mc.AddRegionStore(storeID, regionCount) for i := 0; i < leaderCount; i++ { - id, _ := mc.GetAllocator().Alloc() + id, _ := mc.AllocID() mc.AddLeaderRegion(id, storeID) } } diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 1aa01b6a243..1a94abb09fe 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -234,7 +234,7 @@ func (m *ModeManager) drSwitchToAsyncWait(availableStores []uint64) error { m.Lock() defer m.Unlock() - id, err := m.cluster.GetAllocator().Alloc() + id, err := m.cluster.AllocID() if err != nil { log.Warn("failed to switch to async wait state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -257,7 +257,7 @@ func (m *ModeManager) drSwitchToAsync(availableStores []uint64) error { } func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { - id, err := m.cluster.GetAllocator().Alloc() + id, err := m.cluster.AllocID() if err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -280,7 +280,7 @@ func (m *ModeManager) drSwitchToSyncRecover() error { } func (m *ModeManager) drSwitchToSyncRecoverWithLock() error { - id, err := m.cluster.GetAllocator().Alloc() + id, err := m.cluster.AllocID() if err != nil { log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -301,7 +301,7 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error { func (m *ModeManager) drSwitchToSync() error { m.Lock() defer m.Unlock() - id, err := m.cluster.GetAllocator().Alloc() + id, err := m.cluster.AllocID() if err != nil { log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index 658a476766b..c7eb0d7581c 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -16,7 +16,6 @@ package core import ( "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/id" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" @@ -49,7 +48,7 @@ type ScheduleCluster interface { GetRegionLabeler() *labeler.RegionLabeler GetBasicCluster() *core.BasicCluster GetStoreConfig() sc.StoreConfig - GetAllocator() id.Allocator + AllocID() (uint64, error) } // BasicCluster is an aggregate interface that wraps multiple interfaces diff --git a/pkg/schedule/operator/builder.go b/pkg/schedule/operator/builder.go index 2cc5a6e7102..06e8628cc6a 100644 --- a/pkg/schedule/operator/builder.go +++ b/pkg/schedule/operator/builder.go @@ -483,7 +483,7 @@ func (b *Builder) prepareBuild() (string, error) { if o == nil || (!b.useJointConsensus && !core.IsLearner(o) && core.IsLearner(n)) { if n.GetId() == 0 { // Allocate peer ID if need. - id, err := b.GetAllocator().Alloc() + id, err := b.AllocID() if err != nil { return "", err } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index cdea9cef6ba..2eeb467c57d 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -92,6 +92,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } // establish client. conn := grpcutil.CreateClientConn(ctx, addr, s.tlsConfig) + // it means the context is canceled. + if conn == nil { + return + } defer conn.Close() // Start syncing data. diff --git a/pkg/unsaferecovery/unsafe_recovery_controller.go b/pkg/unsaferecovery/unsafe_recovery_controller.go index 0c7d087aa72..5db1168c4ce 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller.go @@ -31,7 +31,6 @@ import ( "github.com/tikv/pd/pkg/codec" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/server/config" @@ -108,7 +107,7 @@ type cluster interface { core.StoreSetInformer DropCacheAllRegion() - GetAllocator() id.Allocator + AllocID() (uint64, error) BuryStore(storeID uint64, forceBury bool) error GetPersistOptions() *config.PersistOptions } @@ -1135,11 +1134,11 @@ func (u *Controller) generateCreateEmptyRegionPlan(newestRegionTree *regionTree, hasPlan := false createRegion := func(startKey, endKey []byte, storeID uint64) (*metapb.Region, error) { - regionID, err := u.cluster.GetAllocator().Alloc() + regionID, err := u.cluster.AllocID() if err != nil { return nil, err } - peerID, err := u.cluster.GetAllocator().Alloc() + peerID, err := u.cluster.AllocID() if err != nil { return nil, err } diff --git a/pkg/utils/grpcutil/grpcutil.go b/pkg/utils/grpcutil/grpcutil.go index 001a8540db8..2deff8a5a5c 100644 --- a/pkg/utils/grpcutil/grpcutil.go +++ b/pkg/utils/grpcutil/grpcutil.go @@ -204,6 +204,7 @@ func establish(ctx context.Context, addr string, tlsConfig *TLSConfig) (*grpc.Cl return cc, nil } +// CreateClientConn creates a client connection to the given target. func CreateClientConn(ctx context.Context, addr string, tlsConfig *TLSConfig) *grpc.ClientConn { var ( conn *grpc.ClientConn diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index e5426f2828c..68d9328eb10 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -707,9 +707,9 @@ func (c *RaftCluster) PauseOrResumeChecker(name string, t int64) error { return c.coordinator.PauseOrResumeChecker(name, t) } -// GetAllocator returns cluster's id allocator. -func (c *RaftCluster) GetAllocator() id.Allocator { - return c.id +// AllocID returns a global unique ID. +func (c *RaftCluster) AllocID() (uint64, error) { + return c.id.Alloc() } // GetRegionSyncer returns the region syncer. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 4ca04add818..a722fed9bda 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2157,7 +2157,7 @@ func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind oper } func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { - id, err := c.GetAllocator().Alloc() + id, err := c.AllocID() if err != nil { return nil, err } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 4b9f608d47f..cba7584efd9 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -294,15 +294,15 @@ func testPutStore(re *require.Assertions, clusterID uint64, rc *cluster.RaftClus re.NoError(err) re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) - rc.GetAllocator().Alloc() - id, err := rc.GetAllocator().Alloc() + rc.AllocID() + id, err := rc.AllocID() re.NoError(err) // Put new store with a duplicated address when old store is up will fail. resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) re.NoError(err) re.Equal(pdpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType()) - id, err = rc.GetAllocator().Alloc() + id, err = rc.AllocID() re.NoError(err) // Put new store with a duplicated address when old store is offline will fail. resetStoreState(re, rc, store.GetId(), metapb.StoreState_Offline) @@ -310,7 +310,7 @@ func testPutStore(re *require.Assertions, clusterID uint64, rc *cluster.RaftClus re.NoError(err) re.Equal(pdpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType()) - id, err = rc.GetAllocator().Alloc() + id, err = rc.AllocID() re.NoError(err) // Put new store with a duplicated address when old store is tombstone is OK. resetStoreState(re, rc, store.GetId(), metapb.StoreState_Tombstone) @@ -319,7 +319,7 @@ func testPutStore(re *require.Assertions, clusterID uint64, rc *cluster.RaftClus re.NoError(err) re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) - id, err = rc.GetAllocator().Alloc() + id, err = rc.AllocID() re.NoError(err) deployPath := getTestDeployPath(id) // Put a new store.