Skip to content

*: share etcd client from domain for autoid allocator #46647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Nov 6, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
cleanup
  • Loading branch information
tiancaiamao committed Sep 5, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 2be9421da13b0ad715c489108b6b3da2fbd43b1a
1 change: 0 additions & 1 deletion br/pkg/lightning/common/common.go
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ import (
"context"

"github.com/pingcap/errors"
// "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
)
21 changes: 20 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ import (
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/br/pkg/version/build"
tidbconfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/keyspace"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
@@ -60,13 +61,15 @@ import (
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/etcd"
"github.com/pingcap/tidb/util/mathutil"
regexprrouter "github.com/pingcap/tidb/util/regexpr-router"
"github.com/pingcap/tidb/util/set"
"github.com/prometheus/client_golang/prometheus"
tikvconfig "github.com/tikv/client-go/v2/config"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
@@ -1553,6 +1556,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
cleanup := false
postProgress := func() error { return nil }
var kvStore tidbkv.Storage
var etcdCli *clientv3.Client

if isLocalBackend(rc.cfg) {
var (
@@ -1601,6 +1605,16 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{rc.cfg.TiDB.PdAddr},
AutoSyncInterval: 30 * time.Second,
TLS: rc.tls.TLSConfig(),
})
if err != nil {
return errors.Trace(err)
}
etcd.SetEtcdCliByNamespace(etcdCli, keyspace.MakeKeyspaceEtcdNamespace(kvStore.GetCodec()))

manager, err := NewChecksumManager(ctx, rc, kvStore)
if err != nil {
return errors.Trace(err)
@@ -1661,6 +1675,11 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
logTask.Warn("failed to close kv store", zap.Error(err))
}
}
if etcdCli != nil {
if err := etcdCli.Close(); err != nil {
logTask.Warn("failed to close etcd client", zap.Error(err))
}
}
}()

taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
@@ -1710,7 +1729,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableImporter(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, log.FromContext(ctx))
tr, err := NewTableImporter(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, etcdCli, log.FromContext(ctx))
if err != nil {
return errors.Trace(err)
}
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/import_test.go
Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@ func TestNewTableRestore(t *testing.T) {
for _, tc := range testCases {
tableInfo := dbInfo.Tables[tc.name]
tableName := common.UniqueTable("mockdb", tableInfo.Name)
tr, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NotNil(t, tr)
require.NoError(t, err)
}
@@ -89,7 +89,7 @@ func TestNewTableRestoreFailure(t *testing.T) {
}}
tableName := common.UniqueTable("mockdb", "failure")

_, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
_, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.Regexp(t, `failed to tables\.TableFromMeta.*`, err.Error())
}

4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/meta_manager.go
Original file line number Diff line number Diff line change
@@ -252,10 +252,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.AutoIDRequirement(), m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr.AutoIDRequirement(), m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/meta_manager_test.go
Original file line number Diff line number Diff line change
@@ -325,7 +325,7 @@ func (s *metaMgrSuite) prepareMockInner(rowsVal [][]driver.Value, nextRowID *int
WillReturnRows(rows)

if nextRowID != nil {
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.kvStore, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.AutoIDRequirement(), s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
alloc := allocs.Get(autoid.RowIDAllocType)
alloc.ForceRebase(*nextRowID - 1)
}
21 changes: 20 additions & 1 deletion br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/extsort"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
@@ -70,6 +71,7 @@ type TableImporter struct {
alloc autoid.Allocators
logger log.Logger
kvStore tidbkv.Storage
etcdCli *clientv3.Client

// dupIgnoreRows tracks the rowIDs of rows that are duplicated and should be ignored.
dupIgnoreRows extsort.ExternalSorter
@@ -86,6 +88,7 @@ func NewTableImporter(
cp *checkpoints.TableCheckpoint,
ignoreColumns map[string]struct{},
kvStore tidbkv.Storage,
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableImporter, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
@@ -102,6 +105,7 @@ func NewTableImporter(
encTable: tbl,
alloc: idAlloc,
kvStore: kvStore,
etcdCli: etcdCli,
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
@@ -317,6 +321,21 @@ func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp
return err
}

type asAutoIDRequirement TableImporter

func (r *asAutoIDRequirement) Store() tidbkv.Storage {
return r.kvStore
}

func (r *asAutoIDRequirement) GetEtcdClient() *clientv3.Client {
return r.etcdCli
}

// Allocators returns an autoid.Requirement.
func (tr *TableImporter) AutoIDRequirement() autoid.Requirement {
return (*asAutoIDRequirement)(tr)
}

// RebaseChunkRowIDs rebase the row id of the chunks.
func (*TableImporter) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
if rowIDBase == 0 {
@@ -945,7 +964,7 @@ func (tr *TableImporter) postProcess(
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core)
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.AutoIDRequirement(), tr.dbInfo.ID, tr.tableInfo.Core)
}
}
rc.alterTableLock.Unlock()
6 changes: 3 additions & 3 deletions br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
@@ -210,7 +210,7 @@ func (s *tableRestoreSuiteBase) setupSuite(t *testing.T) {
func (s *tableRestoreSuiteBase) setupTest(t *testing.T) {
// Collect into the test TableImporter structure
var err error
s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(t, err)

s.cfg = config.NewConfig()
@@ -515,7 +515,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() {
cfg.Mydumper.StrictFormat = true
rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store}

tr, err := NewTableImporter("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableImporter("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(s.T(), err)
require.NoError(s.T(), tr.populateChunks(context.Background(), rc, cp))

@@ -766,7 +766,7 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated() {
require.NoError(s.T(), err)
core.State = model.StatePublic
tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core}
s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(s.T(), err)
ccp := &checkpoints.ChunkCheckpoint{}

35 changes: 5 additions & 30 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
@@ -26,15 +26,13 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/autoid"
// "github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/dbterror"
// "github.com/pingcap/tidb/util/etcd"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
@@ -570,41 +568,18 @@ func NextStep(curStep int64, consumeDur time.Duration) int64 {
var MockForTest func(kv.Storage) autoid.AutoIDAllocClient

func newSinglePointAlloc(r Requirement, dbID, tblID int64, isUnsigned bool) *singlePointAlloc {
// ebd, ok := store.(kv.EtcdBackend)
// if !ok {
// // newSinglePointAlloc fail because not etcd background
// // This could happen in the server package unit test
// return nil
// }

// addrs, err := ebd.EtcdAddrs()
// if err != nil {
// panic(err)
// }

keyspaceID := uint32(r.Store().GetCodec().GetKeyspaceID())
spa := &singlePointAlloc{
dbID: dbID,
tblID: tblID,
isUnsigned: isUnsigned,
keyspaceID: keyspaceID,
}
// if len(addrs) > 0 {
// etcdCli, err := clientv3.New(clientv3.Config{
// Endpoints: addrs,
// AutoSyncInterval: 30 * time.Second,
// TLS: ebd.TLSConfig(),
// })
// etcd.SetEtcdCliByNamespace(etcdCli, keyspace.MakeKeyspaceEtcdNamespace(store.GetCodec()))
// if err != nil {
// logutil.BgLogger().Error("fail to connect etcd, fallback to default", zap.String("category", "autoid client"), zap.Error(err))
// return nil
// }
// spa.clientDiscover = clientDiscover{etcdCli: etcdCli}
// } else {
// spa.clientDiscover = clientDiscover{}
// spa.mu.AutoIDAllocClient = MockForTest(store)
// }
if r.GetEtcdClient() == nil {
// Only for test in mockstore
spa.clientDiscover = clientDiscover{}
spa.mu.AutoIDAllocClient = MockForTest(r.Store())
}
spa.clientDiscover = clientDiscover{etcdCli: r.GetEtcdClient()}

// mockAutoIDChange failpoint is not implemented in this allocator, so fallback to use the default one.
Loading