Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: fix incorrect _tidb_rowid allocator value after import for table with AUTO_ID_CACHE=1 (#46171) #46183

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type panickingAllocator struct {
}

// NewPanickingAllocators creates a PanickingAllocator shared by all allocation types.
// we use this to collect the max id(either _tidb_rowid or auto_increment id or auto_random) used
// during import, and we will use this info to do ALTER TABLE xxx AUTO_RANDOM_BASE or AUTO_INCREMENT
// on post-process phase.
func NewPanickingAllocators(base int64) autoid.Allocators {
sharedBase := &base
return autoid.NewAllocators(
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
],
embed = [":common"],
flaky = True,
shard_count = 21,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
Expand Down
49 changes: 39 additions & 10 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,22 +1160,38 @@ func (m *singleTaskMetaMgr) Close() {
}

func allocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
return alloc.Alloc(ctx, uint64(n), 1, 1)
// there might be 2 allocators when tblInfo.SepAutoInc is true, and in this case
// RowIDAllocType will be the last one.
// we return the value of last Alloc as autoIDBase and autoIDMax, i.e. the value
// either comes from RowIDAllocType or AutoRandomType.
for _, alloc := range allocators {
autoIDBase, autoIDMax, err = alloc.Alloc(ctx, uint64(n), 1, 1)
if err != nil {
return 0, 0, err
}
}
return
}

func rebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) error {
alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
if err != nil {
return err
}
return alloc.Rebase(ctx, newBase, false)
for _, alloc := range allocators {
err = alloc.Rebase(ctx, newBase, false)
if err != nil {
return err
}
}
return nil
}

func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoid.Allocator, error) {
func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if store == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
Expand All @@ -1195,16 +1211,29 @@ func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
hasAutoRandID := tblInfo.ContainsAutoRandomBits()

// Current TiDB has some limitations for auto ID.
// 1. Auto increment ID and auto row ID are using the same RowID allocator. See https://github.com/pingcap/tidb/issues/982.
// 2. Auto random column must be a clustered primary key. That is to say, there is no implicit row ID for tables with auto random column.
// TiDB version <= 6.4.0 has some limitations for auto ID.
// 1. Auto increment ID and auto row ID are using the same RowID allocator.
// See https://github.com/pingcap/tidb/issues/982.
// 2. Auto random column must be a clustered primary key. That is to say,
// there is no implicit row ID for tables with auto random column.
// 3. There is at most one auto column in a table.
// Therefore, we assume there is only one auto column in a table and use RowID allocator if possible.
//
// Since TiDB 6.5.0, row ID and auto ID are using different allocators when tblInfo.SepAutoInc is true
switch {
case hasRowID || hasAutoIncID:
return autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, noCache, tblVer), nil
allocators := make([]autoid.Allocator, 0, 2)
if tblInfo.SepAutoInc() && hasAutoIncID {
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.AutoIncrementType, noCache, tblVer))
}
// this allocator is NOT used when SepAutoInc=true and auto increment column is clustered.
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer))
return allocators, nil
case hasAutoRandID:
return autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, noCache, tblVer), nil
return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer)}, nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
}
Expand Down
161 changes: 146 additions & 15 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,43 +34,45 @@ type metaMgrSuite struct {
checksumMgr *testChecksumMgr
}

func newTableRestore(t *testing.T, kvStore kv.Storage) *TableRestore {
func newTableRestore(t *testing.T,
db, table string,
dbID, tableID int64,
createTableSQL string, kvStore kv.Storage,
) *TableRestore {
p := parser.New()
se := tmock.NewContext()

node, err := p.ParseOneStmt("CREATE TABLE `t1` (`c1` varchar(5) NOT NULL)", "utf8mb4", "utf8mb4_bin")
node, err := p.ParseOneStmt(createTableSQL, "utf8mb4", "utf8mb4_bin")
require.NoError(t, err)
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), int64(1))
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), tableID)
require.NoError(t, err)
tableInfo.State = model.StatePublic

schema := "test"
tb := "t1"
ti := &checkpoints.TidbTableInfo{
ID: tableInfo.ID,
DB: schema,
Name: tb,
DB: db,
Name: table,
Core: tableInfo,
}
dbInfo := &checkpoints.TidbDBInfo{
ID: 1,
Name: schema,
ID: dbID,
Name: db,
Tables: map[string]*checkpoints.TidbTableInfo{
tb: ti,
table: ti,
},
}

ctx := kv.WithInternalSourceType(context.Background(), "test")
err = kv.RunInNewTxn(ctx, kvStore, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
if err := m.CreateDatabase(&model.DBInfo{ID: dbInfo.ID}); err != nil {
if err := m.CreateDatabase(&model.DBInfo{ID: dbInfo.ID}); err != nil && !errors.ErrorEqual(err, meta.ErrDBExists) {
return err
}
return m.CreateTableOrView(dbInfo.ID, ti.Core)
})
require.NoError(t, err)

tableName := common.UniqueTable(schema, tb)
tableName := common.UniqueTable(db, table)
logger := log.With(zap.String("table", tableName))

return &TableRestore{
Expand All @@ -92,9 +94,10 @@ func newMetaMgrSuite(t *testing.T) *metaMgrSuite {

var s metaMgrSuite
s.mgr = &dbTableMetaMgr{
session: db,
taskID: 1,
tr: newTableRestore(t, kvStore),
session: db,
taskID: 1,
tr: newTableRestore(t, "test", "t1", 1, 1,
"CREATE TABLE `t1` (`c1` varchar(5) NOT NULL)", kvStore),
tableName: common.UniqueTable("test", TableMetaTableName),
needChecksum: true,
}
Expand Down Expand Up @@ -451,3 +454,131 @@ func TestSingleTaskMetaMgr(t *testing.T) {
})
require.NoError(t, err)
}

func newTableInfo2(t *testing.T,
dbID, tableID int64,
createTableSql string, kvStore kv.Storage,
) *model.TableInfo {
p := parser.New()
se := tmock.NewContext()

node, err := p.ParseOneStmt(createTableSql, "utf8mb4", "utf8mb4_bin")
require.NoError(t, err)
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), tableID)
require.NoError(t, err)
tableInfo.State = model.StatePublic

ctx := kv.WithInternalSourceType(context.Background(), "test")
err = kv.RunInNewTxn(ctx, kvStore, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
if err := m.CreateDatabase(&model.DBInfo{ID: dbID}); err != nil && !errors.ErrorEqual(err, meta.ErrDBExists) {
return err
}
return m.CreateTableOrView(dbID, tableInfo)
})
require.NoError(t, err)
return tableInfo
}

func TestAllocGlobalAutoID(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, kvStore.Close())
})

cases := []struct {
tableID int64
createTableSQL string
expectErrStr string
expectAllocatorTypes []autoid.AllocatorType
}{
// autoID, autoIncrID = false, false
{
tableID: 11,
createTableSQL: "create table t11 (a int primary key clustered)",
expectErrStr: "has no auto ID",
expectAllocatorTypes: nil,
},
{
tableID: 12,
createTableSQL: "create table t12 (a int primary key clustered) AUTO_ID_CACHE 1",
expectErrStr: "has no auto ID",
expectAllocatorTypes: nil,
},
// autoID, autoIncrID = true, false
{
tableID: 21,
createTableSQL: "create table t21 (a int)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
{
tableID: 22,
createTableSQL: "create table t22 (a int) AUTO_ID_CACHE 1",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
// autoID, autoIncrID = false, true
{
tableID: 31,
createTableSQL: "create table t31 (a int primary key clustered auto_increment)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
{
tableID: 32,
createTableSQL: "create table t32 (a int primary key clustered auto_increment) AUTO_ID_CACHE 1",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType},
},
// autoID, autoIncrID = true, true
{
tableID: 41,
createTableSQL: "create table t41 (a int primary key nonclustered auto_increment)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
{
tableID: 42,
createTableSQL: "create table t42 (a int primary key nonclustered auto_increment) AUTO_ID_CACHE 1",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType},
},
// autoRandomID
{
tableID: 51,
createTableSQL: "create table t51 (a bigint primary key auto_random)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.AutoRandomType},
},
}
ctx := context.Background()
for _, c := range cases {
ti := newTableInfo2(t, 1, c.tableID, c.createTableSQL, kvStore)
allocators, err := getGlobalAutoIDAlloc(kvStore, 1, ti)
if c.expectErrStr == "" {
require.NoError(t, err, c.tableID)
require.NoError(t, rebaseGlobalAutoID(ctx, 123, kvStore, 1, ti))
base, idMax, err := allocGlobalAutoID(ctx, 100, kvStore, 1, ti)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(123), base, c.tableID)
require.Equal(t, int64(223), idMax, c.tableID)
// all allocators are rebased and allocated
for _, alloc := range allocators {
base2, max2, err := alloc.Alloc(ctx, 100, 1, 1)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(223), base2, c.tableID)
require.Equal(t, int64(323), max2, c.tableID)
}
} else {
require.ErrorContains(t, err, c.expectErrStr, c.tableID)
}
var allocatorTypes []autoid.AllocatorType
for _, alloc := range allocators {
allocatorTypes = append(allocatorTypes, alloc.GetType())
}
require.Equal(t, c.expectAllocatorTypes, allocatorTypes, c.tableID)
}
}
17 changes: 15 additions & 2 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,8 +745,21 @@ func (tr *TableRestore) postProcess(
maxCap := shardFmt.IncrementalBitsCapacity()
err = AlterAutoRandom(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, uint64(tr.alloc.Get(autoid.AutoRandomType).Base())+1, maxCap)
} else if common.TableHasAutoRowID(tblInfo) || tblInfo.GetAutoIncrementColInfo() != nil {
// only alter auto increment id iff table contains auto-increment column or generated handle
err = AlterAutoIncrement(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, uint64(tr.alloc.Get(autoid.RowIDAllocType).Base())+1)
// only alter auto increment id iff table contains auto-increment column or generated handle.
// ALTER TABLE xxx AUTO_INCREMENT = yyy has a bad naming.
// if a table has implicit _tidb_rowid column & tbl.SepAutoID=false, then it works on _tidb_rowid
// allocator, even if the table has NO auto-increment column.
newBase := uint64(tr.alloc.Get(autoid.RowIDAllocType).Base()) + 1
err = AlterAutoIncrement(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, newBase)

if err == nil && isLocalBackend(rc.cfg) {
// for TiDB version >= 6.5.0, a table might have separate allocators for auto_increment column and _tidb_rowid,
// especially when a table has auto_increment non-clustered PK, it will use both allocators.
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = rebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core)
}
}
rc.alterTableLock.Unlock()
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusAlteredAutoInc)
Expand Down
20 changes: 14 additions & 6 deletions br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,19 +372,18 @@ func ObtainNewCollationEnabled(ctx context.Context, g glue.SQLExecutor) (bool, e
// AlterAutoIncrement rebase the table auto increment id
//
// NOTE: since tidb can make sure the auto id is always be rebase even if the `incr` value is smaller
// the the auto incremanet base in tidb side, we needn't fetch currently auto increment value here.
// than the auto increment base in tidb side, we needn't fetch currently auto increment value here.
// See: https://github.com/pingcap/tidb/blob/64698ef9a3358bfd0fdc323996bb7928a56cadca/ddl/ddl_api.go#L2528-L2533
func AlterAutoIncrement(ctx context.Context, g glue.SQLExecutor, tableName string, incr uint64) error {
var query string
logger := log.FromContext(ctx).With(zap.String("table", tableName), zap.Uint64("auto_increment", incr))
base := adjustIDBase(incr)
var forceStr string
if incr > math.MaxInt64 {
// automatically set max value
logger.Warn("auto_increment out of the maximum value TiDB supports, automatically set to the max", zap.Uint64("auto_increment", incr))
incr = math.MaxInt64
query = fmt.Sprintf("ALTER TABLE %s FORCE AUTO_INCREMENT=%d", tableName, incr)
} else {
query = fmt.Sprintf("ALTER TABLE %s AUTO_INCREMENT=%d", tableName, incr)
forceStr = "FORCE"
}
query := fmt.Sprintf("ALTER TABLE %s %s AUTO_INCREMENT=%d", tableName, forceStr, base)
task := logger.Begin(zap.InfoLevel, "alter table auto_increment")
err := g.ExecuteWithLog(ctx, query, "alter table auto_increment", logger)
task.End(zap.ErrorLevel, err)
Expand All @@ -397,6 +396,14 @@ func AlterAutoIncrement(ctx context.Context, g glue.SQLExecutor, tableName strin
return errors.Annotatef(err, "%s", query)
}

func adjustIDBase(incr uint64) int64 {
if incr > math.MaxInt64 {
return math.MaxInt64
}
return int64(incr)
}

// AlterAutoRandom rebase the table auto random id.
func AlterAutoRandom(ctx context.Context, g glue.SQLExecutor, tableName string, randomBase uint64, maxAutoRandom uint64) error {
logger := log.FromContext(ctx).With(zap.String("table", tableName), zap.Uint64("auto_random", randomBase))
if randomBase == maxAutoRandom+1 {
Expand All @@ -407,6 +414,7 @@ func AlterAutoRandom(ctx context.Context, g glue.SQLExecutor, tableName string,
logger.Warn("auto_random out of the maximum value TiDB supports")
return nil
}
// if new base is smaller than current, this query will success with a warning
query := fmt.Sprintf("ALTER TABLE %s AUTO_RANDOM_BASE=%d", tableName, randomBase)
task := logger.Begin(zap.InfoLevel, "alter table auto_random")
err := g.ExecuteWithLog(ctx, query, "alter table auto_random_base", logger)
Expand Down
1 change: 1 addition & 0 deletions br/tests/lightning_csv/data/auto_incr_id-schema-create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE DATABASE `auto_incr_id`;
5 changes: 5 additions & 0 deletions br/tests/lightning_csv/data/auto_incr_id.clustered-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE `clustered` (
`id` int(11) NOT NULL AUTO_INCREMENT,
v int,
PRIMARY KEY(`id`) CLUSTERED
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
4 changes: 4 additions & 0 deletions br/tests/lightning_csv/data/auto_incr_id.clustered.0.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
4,1
5,2
6,3

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE `clustered_cache1` (
`id` int(11) NOT NULL AUTO_INCREMENT,
v int,
PRIMARY KEY(`id`) CLUSTERED
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_ID_CACHE 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
4,1
5,2
6,3

Loading