Skip to content

Commit

Permalink
*: share etcd client from domain for autoid allocator (#46647)
Browse files Browse the repository at this point in the history
close #46324
  • Loading branch information
tiancaiamao authored Nov 6, 2023
1 parent b93b2f5 commit 4979e99
Show file tree
Hide file tree
Showing 35 changed files with 245 additions and 168 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go_library(
"//br/pkg/logutil",
"//br/pkg/utils",
"//pkg/errno",
"//pkg/kv",
"//pkg/meta/autoid",
"//pkg/parser/model",
"//pkg/parser/mysql",
Expand Down Expand Up @@ -136,6 +135,7 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_goleak//:goleak",
Expand Down
23 changes: 11 additions & 12 deletions br/pkg/lightning/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/model"
)
Expand Down Expand Up @@ -50,9 +49,9 @@ var DefaultImportVariablesTiDB = map[string]string{
}

// AllocGlobalAutoID allocs N consecutive autoIDs from TiDB.
func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64,
func AllocGlobalAutoID(ctx context.Context, n int64, r autoid.Requirement, dbID int64,
tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
Expand All @@ -70,9 +69,9 @@ func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int6
}

// RebaseGlobalAutoID rebase the autoID base to newBase.
func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64,
func RebaseGlobalAutoID(ctx context.Context, newBase int64, r autoid.Requirement, dbID int64,
tblInfo *model.TableInfo) error {
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return err
}
Expand All @@ -90,9 +89,9 @@ func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, db
// `bases` param, else it will be skipped.
// base is the max id that have been used by the table, the next usable id will
// be base + 1, see Allocator.Alloc.
func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]int64, store kv.Storage, dbID int64,
func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]int64, r autoid.Requirement, dbID int64,
tblInfo *model.TableInfo) error {
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return err
}
Expand All @@ -111,8 +110,8 @@ func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]i

// GetGlobalAutoIDAlloc returns the autoID allocators for a table.
// export it for testing.
func GetGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if store == nil {
func GetGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if r == nil || r.Store() == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
if dbID == 0 {
Expand Down Expand Up @@ -144,15 +143,15 @@ func GetGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo
case hasRowID || hasAutoIncID:
allocators := make([]autoid.Allocator, 0, 2)
if tblInfo.SepAutoInc() && hasAutoIncID {
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.AutoIncrementType, noCache, tblVer))
}
// this allocator is NOT used when SepAutoInc=true and auto increment column is clustered.
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer))
return allocators, nil
case hasAutoRandID:
return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
return []autoid.Allocator{autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer)}, nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
Expand Down
27 changes: 20 additions & 7 deletions br/pkg/lightning/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/store/mockstore"
tmock "github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)

func newTableInfo(t *testing.T,
Expand Down Expand Up @@ -134,11 +135,11 @@ func TestAllocGlobalAutoID(t *testing.T) {
ctx := context.Background()
for _, c := range cases {
ti := newTableInfo(t, 1, c.tableID, c.createTableSQL, kvStore)
allocators, err := common.GetGlobalAutoIDAlloc(kvStore, 1, ti)
allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
if c.expectErrStr == "" {
require.NoError(t, err, c.tableID)
require.NoError(t, common.RebaseGlobalAutoID(ctx, 123, kvStore, 1, ti))
base, idMax, err := common.AllocGlobalAutoID(ctx, 100, kvStore, 1, ti)
require.NoError(t, common.RebaseGlobalAutoID(ctx, 123, mockRequirement{kvStore}, 1, ti))
base, idMax, err := common.AllocGlobalAutoID(ctx, 100, mockRequirement{kvStore}, 1, ti)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(123), base, c.tableID)
require.Equal(t, int64(223), idMax, c.tableID)
Expand All @@ -160,6 +161,18 @@ func TestAllocGlobalAutoID(t *testing.T) {
}
}

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) GetEtcdClient() *clientv3.Client {
return nil
}

func TestRebaseTableAllocators(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
Expand All @@ -169,7 +182,7 @@ func TestRebaseTableAllocators(t *testing.T) {
})
ti := newTableInfo(t, 1, 42,
"create table t42 (a int primary key nonclustered auto_increment) AUTO_ID_CACHE 1", kvStore)
allocators, err := common.GetGlobalAutoIDAlloc(kvStore, 1, ti)
allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
require.NoError(t, err)
require.Len(t, allocators, 2)
for _, alloc := range allocators {
Expand All @@ -186,7 +199,7 @@ func TestRebaseTableAllocators(t *testing.T) {
}
require.Equal(t, []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType}, allocatorTypes)
// this call does nothing
require.NoError(t, common.RebaseTableAllocators(ctx, nil, kvStore, 1, ti))
require.NoError(t, common.RebaseTableAllocators(ctx, nil, mockRequirement{kvStore}, 1, ti))
for _, alloc := range allocators {
nextID, err := alloc.NextGlobalAutoID()
require.NoError(t, err)
Expand All @@ -195,7 +208,7 @@ func TestRebaseTableAllocators(t *testing.T) {
// this call rebase AutoIncrementType allocator to 223
require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.AutoIncrementType: 223,
}, kvStore, 1, ti))
}, mockRequirement{kvStore}, 1, ti))
next, err := allocators[0].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(224), next)
Expand All @@ -206,7 +219,7 @@ func TestRebaseTableAllocators(t *testing.T) {
require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.AutoIncrementType: 323,
autoid.RowIDAllocType: 423,
}, kvStore, 1, ti))
}, mockRequirement{kvStore}, 1, ti))
next, err = allocators[0].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(324), next)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_library(
"//pkg/util/collate",
"//pkg/util/dbterror",
"//pkg/util/engine",
"//pkg/util/etcd",
"//pkg/util/extsort",
"//pkg/util/mock",
"//pkg/util/pdapi",
Expand Down
21 changes: 20 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/br/pkg/version/build"
tidbconfig "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/keyspace"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser"
Expand All @@ -60,12 +61,14 @@ import (
"github.com/pingcap/tidb/pkg/store/driver"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/etcd"
regexprrouter "github.com/pingcap/tidb/pkg/util/regexpr-router"
"github.com/pingcap/tidb/pkg/util/set"
"github.com/prometheus/client_golang/prometheus"
tikvconfig "github.com/tikv/client-go/v2/config"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -1554,6 +1557,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
cleanup := false
postProgress := func() error { return nil }
var kvStore tidbkv.Storage
var etcdCli *clientv3.Client

if isLocalBackend(rc.cfg) {
var (
Expand Down Expand Up @@ -1607,6 +1611,16 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{rc.cfg.TiDB.PdAddr},
AutoSyncInterval: 30 * time.Second,
TLS: rc.tls.TLSConfig(),
})
if err != nil {
return errors.Trace(err)
}
etcd.SetEtcdCliByNamespace(etcdCli, keyspace.MakeKeyspaceEtcdNamespace(kvStore.GetCodec()))

manager, err := NewChecksumManager(ctx, rc, kvStore)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1667,6 +1681,11 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
logTask.Warn("failed to close kv store", zap.Error(err))
}
}
if etcdCli != nil {
if err := etcdCli.Close(); err != nil {
logTask.Warn("failed to close etcd client", zap.Error(err))
}
}
}()

taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
Expand Down Expand Up @@ -1716,7 +1735,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableImporter(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, log.FromContext(ctx))
tr, err := NewTableImporter(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, etcdCli, log.FromContext(ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestNewTableRestore(t *testing.T) {
for _, tc := range testCases {
tableInfo := dbInfo.Tables[tc.name]
tableName := common.UniqueTable("mockdb", tableInfo.Name)
tr, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NotNil(t, tr)
require.NoError(t, err)
}
Expand All @@ -89,7 +89,7 @@ func TestNewTableRestoreFailure(t *testing.T) {
}}
tableName := common.UniqueTable("mockdb", "failure")

_, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
_, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.Regexp(t, `failed to tables\.TableFromMeta.*`, err.Error())
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (s *metaMgrSuite) prepareMockInner(rowsVal [][]driver.Value, nextRowID *int
WillReturnRows(rows)

if nextRowID != nil {
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.kvStore, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
alloc := allocs.Get(autoid.RowIDAllocType)
alloc.ForceRebase(*nextRowID - 1)
}
Expand Down
19 changes: 18 additions & 1 deletion br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/extsort"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand All @@ -69,6 +70,7 @@ type TableImporter struct {
alloc autoid.Allocators
logger log.Logger
kvStore tidbkv.Storage
etcdCli *clientv3.Client

// dupIgnoreRows tracks the rowIDs of rows that are duplicated and should be ignored.
dupIgnoreRows extsort.ExternalSorter
Expand All @@ -85,6 +87,7 @@ func NewTableImporter(
cp *checkpoints.TableCheckpoint,
ignoreColumns map[string]struct{},
kvStore tidbkv.Storage,
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableImporter, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
Expand All @@ -101,6 +104,7 @@ func NewTableImporter(
encTable: tbl,
alloc: idAlloc,
kvStore: kvStore,
etcdCli: etcdCli,
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
Expand Down Expand Up @@ -316,6 +320,19 @@ func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp
return err
}

// AutoIDRequirement implements autoid.Requirement.
var _ autoid.Requirement = &TableImporter{}

// Store implements the autoid.Requirement interface.
func (tr *TableImporter) Store() tidbkv.Storage {
return tr.kvStore
}

// GetEtcdClient implements the autoid.Requirement interface.
func (tr *TableImporter) GetEtcdClient() *clientv3.Client {
return tr.etcdCli
}

// RebaseChunkRowIDs rebase the row id of the chunks.
func (*TableImporter) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
if rowIDBase == 0 {
Expand Down Expand Up @@ -944,7 +961,7 @@ func (tr *TableImporter) postProcess(
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core)
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core)
}
}
rc.alterTableLock.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s *tableRestoreSuiteBase) setupSuite(t *testing.T) {
func (s *tableRestoreSuiteBase) setupTest(t *testing.T) {
// Collect into the test TableImporter structure
var err error
s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(t, err)

s.cfg = config.NewConfig()
Expand Down Expand Up @@ -516,7 +516,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() {
cfg.Mydumper.StrictFormat = true
rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store}

tr, err := NewTableImporter("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableImporter("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(s.T(), err)
require.NoError(s.T(), tr.populateChunks(context.Background(), rc, cp))

Expand Down Expand Up @@ -767,7 +767,7 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated() {
require.NoError(s.T(), err)
core.State = model.StatePublic
tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core}
s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(s.T(), err)
ccp := &checkpoints.ChunkCheckpoint{}

Expand Down
Loading

0 comments on commit 4979e99

Please sign in to comment.