Skip to content

Commit

Permalink
id: make id allocator general purpose (tikv#5284)
Browse files Browse the repository at this point in the history
close tikv#5294

make id allocator general purpose

Signed-off-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
2 people authored and HuSharp committed Jul 21, 2022
1 parent d132614 commit dacd254
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 14 deletions.
54 changes: 43 additions & 11 deletions server/id/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"path"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/pkg/syncutil"
Expand All @@ -37,22 +38,53 @@ type Allocator interface {
Rebase() error
}

const allocStep = uint64(1000)
const defaultAllocStep = uint64(1000)

// allocatorImpl is used to allocate ID.
type allocatorImpl struct {
mu syncutil.Mutex
base uint64
end uint64

client *clientv3.Client
rootPath string
member string
client *clientv3.Client
rootPath string
allocPath string
label string
member string
step uint64
metrics *metrics
}

// metrics is a collection of idAllocator's metrics.
type metrics struct {
idGauge prometheus.Gauge
}

// AllocatorParams are parameters needed to create a new ID Allocator.
type AllocatorParams struct {
Client *clientv3.Client
RootPath string
AllocPath string // AllocPath specifies path to the persistent window boundary.
Label string // Label used to label metrics and logs.
Member string // Member value, used to check if current pd leader.
Step uint64 // Step size of each persistent window boundary increment, default 1000.
}

// NewAllocator creates a new ID Allocator.
func NewAllocator(client *clientv3.Client, rootPath string, member string) Allocator {
return &allocatorImpl{client: client, rootPath: rootPath, member: member}
func NewAllocator(params *AllocatorParams) Allocator {
allocator := &allocatorImpl{
client: params.Client,
rootPath: params.RootPath,
allocPath: params.AllocPath,
label: params.Label,
member: params.Member,
step: params.Step,
metrics: &metrics{idGauge: idGauge.WithLabelValues(params.Label)},
}
if allocator.step == 0 {
allocator.step = defaultAllocStep
}
return allocator
}

// Alloc returns a new id.
Expand Down Expand Up @@ -106,7 +138,7 @@ func (alloc *allocatorImpl) rebaseLocked() error {
cmp = clientv3.Compare(clientv3.Value(key), "=", string(value))
}

end += allocStep
end += alloc.step
value = typeutil.Uint64ToBytes(end)
txn := kv.NewSlowLogTxn(alloc.client)
leaderPath := path.Join(alloc.rootPath, "leader")
Expand All @@ -119,13 +151,13 @@ func (alloc *allocatorImpl) rebaseLocked() error {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}

log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end))
idallocGauge.Set(float64(end))
log.Info("idAllocator allocates a new id", zap.String("label", alloc.label), zap.Uint64("alloc-id", end))
alloc.metrics.idGauge.Set(float64(end))
alloc.end = end
alloc.base = end - allocStep
alloc.base = end - alloc.step
return nil
}

func (alloc *allocatorImpl) getAllocIDPath() string {
return path.Join(alloc.rootPath, "alloc_id")
return path.Join(alloc.rootPath, alloc.allocPath)
}
91 changes: 91 additions & 0 deletions server/id/id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package id

import (
"context"
"strconv"
"sync"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

const (
rootPath = "/pd"
leaderPath = "/pd/leader"
allocPath = "alloc_id"
label = "idalloc"
memberVal = "member"
step = uint64(500)
)

// TestMultipleAllocator tests situation where multiple allocators that
// share rootPath and member val update their ids concurrently.
func TestMultipleAllocator(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()

// Put memberValue to leaderPath to simulate an election success.
_, err = client.Put(context.Background(), leaderPath, memberVal)
re.NoError(err)

wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
iStr := strconv.Itoa(i)
wg.Add(1)
// All allocators share rootPath and memberVal, but they have different allocPaths, labels and steps.
allocator := NewAllocator(&AllocatorParams{
Client: client,
RootPath: rootPath,
AllocPath: allocPath + iStr,
Label: label + iStr,
Member: memberVal,
Step: step * uint64(i), // allocator 0, 1, 2 should have step size 1000 (default), 500, 1000 respectively.
})
go func(re *require.Assertions, allocator Allocator) {
defer wg.Done()
testAllocator(re, allocator)
}(re, allocator)
}
wg.Wait()
}

// testAllocator sequentially updates given allocator and check if values are expected.
func testAllocator(re *require.Assertions, allocator Allocator) {
startID, err := allocator.Alloc()
re.NoError(err)
for i := startID + 1; i < startID+step*20; i++ {
id, err := allocator.Alloc()
re.NoError(err)
re.Equal(i, id)
}
}
2 changes: 0 additions & 2 deletions server/id/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ var (
Name: "id",
Help: "Record of id allocator.",
}, []string{"type"})

idallocGauge = idGauge.WithLabelValues("idalloc")
)

func init() {
Expand Down
11 changes: 10 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ const (
pdRootPath = "/pd"
pdAPIPrefix = "/pd/"
pdClusterIDPath = "/pd/cluster_id"
// idAllocPath for idAllocator to save persistent window's end.
idAllocPath = "alloc_id"
idAllocLabel = "idalloc"
)

// EtcdStartTimeout the timeout of the startup etcd.
Expand Down Expand Up @@ -381,7 +384,13 @@ func (s *Server) startServer(ctx context.Context) error {
s.member.SetMemberDeployPath(s.member.ID())
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
s.idAllocator = id.NewAllocator(s.client, s.rootPath, s.member.MemberValue())
s.idAllocator = id.NewAllocator(&id.AllocatorParams{
Client: s.client,
RootPath: s.rootPath,
AllocPath: idAllocPath,
Label: idAllocLabel,
Member: s.member.MemberValue(),
})
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member, s.rootPath, s.cfg,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() })
Expand Down

0 comments on commit dacd254

Please sign in to comment.