From dacd2546ed0f26e623aa771aea27a2a6cec72081 Mon Sep 17 00:00:00 2001 From: David <8039876+AmoebaProtozoa@users.noreply.github.com> Date: Wed, 20 Jul 2022 17:45:09 +0800 Subject: [PATCH] id: make id allocator general purpose (#5284) close tikv/pd#5294 make id allocator general purpose Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com> Co-authored-by: Ti Chi Robot --- server/id/id.go | 54 ++++++++++++++++++++------ server/id/id_test.go | 91 ++++++++++++++++++++++++++++++++++++++++++++ server/id/metrics.go | 2 - server/server.go | 11 +++++- 4 files changed, 144 insertions(+), 14 deletions(-) create mode 100644 server/id/id_test.go diff --git a/server/id/id.go b/server/id/id.go index bca376d70b84..60d55f27cf11 100644 --- a/server/id/id.go +++ b/server/id/id.go @@ -18,6 +18,7 @@ import ( "path" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/syncutil" @@ -37,7 +38,7 @@ type Allocator interface { Rebase() error } -const allocStep = uint64(1000) +const defaultAllocStep = uint64(1000) // allocatorImpl is used to allocate ID. type allocatorImpl struct { @@ -45,14 +46,45 @@ type allocatorImpl struct { base uint64 end uint64 - client *clientv3.Client - rootPath string - member string + client *clientv3.Client + rootPath string + allocPath string + label string + member string + step uint64 + metrics *metrics +} + +// metrics is a collection of idAllocator's metrics. +type metrics struct { + idGauge prometheus.Gauge +} + +// AllocatorParams are parameters needed to create a new ID Allocator. +type AllocatorParams struct { + Client *clientv3.Client + RootPath string + AllocPath string // AllocPath specifies path to the persistent window boundary. + Label string // Label used to label metrics and logs. + Member string // Member value, used to check if current pd leader. + Step uint64 // Step size of each persistent window boundary increment, default 1000. } // NewAllocator creates a new ID Allocator. -func NewAllocator(client *clientv3.Client, rootPath string, member string) Allocator { - return &allocatorImpl{client: client, rootPath: rootPath, member: member} +func NewAllocator(params *AllocatorParams) Allocator { + allocator := &allocatorImpl{ + client: params.Client, + rootPath: params.RootPath, + allocPath: params.AllocPath, + label: params.Label, + member: params.Member, + step: params.Step, + metrics: &metrics{idGauge: idGauge.WithLabelValues(params.Label)}, + } + if allocator.step == 0 { + allocator.step = defaultAllocStep + } + return allocator } // Alloc returns a new id. @@ -106,7 +138,7 @@ func (alloc *allocatorImpl) rebaseLocked() error { cmp = clientv3.Compare(clientv3.Value(key), "=", string(value)) } - end += allocStep + end += alloc.step value = typeutil.Uint64ToBytes(end) txn := kv.NewSlowLogTxn(alloc.client) leaderPath := path.Join(alloc.rootPath, "leader") @@ -119,13 +151,13 @@ func (alloc *allocatorImpl) rebaseLocked() error { return errs.ErrEtcdTxnConflict.FastGenByArgs() } - log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end)) - idallocGauge.Set(float64(end)) + log.Info("idAllocator allocates a new id", zap.String("label", alloc.label), zap.Uint64("alloc-id", end)) + alloc.metrics.idGauge.Set(float64(end)) alloc.end = end - alloc.base = end - allocStep + alloc.base = end - alloc.step return nil } func (alloc *allocatorImpl) getAllocIDPath() string { - return path.Join(alloc.rootPath, "alloc_id") + return path.Join(alloc.rootPath, alloc.allocPath) } diff --git a/server/id/id_test.go b/server/id/id_test.go new file mode 100644 index 000000000000..5e6a629c6876 --- /dev/null +++ b/server/id/id_test.go @@ -0,0 +1,91 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package id + +import ( + "context" + "strconv" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/etcdutil" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/embed" +) + +const ( + rootPath = "/pd" + leaderPath = "/pd/leader" + allocPath = "alloc_id" + label = "idalloc" + memberVal = "member" + step = uint64(500) +) + +// TestMultipleAllocator tests situation where multiple allocators that +// share rootPath and member val update their ids concurrently. +func TestMultipleAllocator(t *testing.T) { + re := require.New(t) + cfg := etcdutil.NewTestSingleConfig(t) + etcd, err := embed.StartEtcd(cfg) + defer func() { + etcd.Close() + }() + re.NoError(err) + + ep := cfg.LCUrls[0].String() + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + + <-etcd.Server.ReadyNotify() + + // Put memberValue to leaderPath to simulate an election success. + _, err = client.Put(context.Background(), leaderPath, memberVal) + re.NoError(err) + + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + iStr := strconv.Itoa(i) + wg.Add(1) + // All allocators share rootPath and memberVal, but they have different allocPaths, labels and steps. + allocator := NewAllocator(&AllocatorParams{ + Client: client, + RootPath: rootPath, + AllocPath: allocPath + iStr, + Label: label + iStr, + Member: memberVal, + Step: step * uint64(i), // allocator 0, 1, 2 should have step size 1000 (default), 500, 1000 respectively. + }) + go func(re *require.Assertions, allocator Allocator) { + defer wg.Done() + testAllocator(re, allocator) + }(re, allocator) + } + wg.Wait() +} + +// testAllocator sequentially updates given allocator and check if values are expected. +func testAllocator(re *require.Assertions, allocator Allocator) { + startID, err := allocator.Alloc() + re.NoError(err) + for i := startID + 1; i < startID+step*20; i++ { + id, err := allocator.Alloc() + re.NoError(err) + re.Equal(i, id) + } +} diff --git a/server/id/metrics.go b/server/id/metrics.go index 958582d023e7..bd7e1cc32baa 100644 --- a/server/id/metrics.go +++ b/server/id/metrics.go @@ -24,8 +24,6 @@ var ( Name: "id", Help: "Record of id allocator.", }, []string{"type"}) - - idallocGauge = idGauge.WithLabelValues("idalloc") ) func init() { diff --git a/server/server.go b/server/server.go index 9b92b424e884..8172ebaf23e4 100644 --- a/server/server.go +++ b/server/server.go @@ -81,6 +81,9 @@ const ( pdRootPath = "/pd" pdAPIPrefix = "/pd/" pdClusterIDPath = "/pd/cluster_id" + // idAllocPath for idAllocator to save persistent window's end. + idAllocPath = "alloc_id" + idAllocLabel = "idalloc" ) // EtcdStartTimeout the timeout of the startup etcd. @@ -381,7 +384,13 @@ func (s *Server) startServer(ctx context.Context) error { s.member.SetMemberDeployPath(s.member.ID()) s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion) s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash) - s.idAllocator = id.NewAllocator(s.client, s.rootPath, s.member.MemberValue()) + s.idAllocator = id.NewAllocator(&id.AllocatorParams{ + Client: s.client, + RootPath: s.rootPath, + AllocPath: idAllocPath, + Label: idAllocLabel, + Member: s.member.MemberValue(), + }) s.tsoAllocatorManager = tso.NewAllocatorManager( s.member, s.rootPath, s.cfg, func() time.Duration { return s.persistOptions.GetMaxResetTSGap() })