Skip to content

Commit

Permalink
lightning: fix incorrect _tidb_rowid allocator value after import for…
Browse files Browse the repository at this point in the history
… table with AUTO_ID_CACHE=1 (#46171)

close #46100
  • Loading branch information
D3Hunter authored Aug 17, 2023
1 parent 7d259bd commit 740f7f5
Show file tree
Hide file tree
Showing 32 changed files with 370 additions and 36 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type panickingAllocator struct {
}

// NewPanickingAllocators creates a PanickingAllocator shared by all allocation types.
// we use this to collect the max id(either _tidb_rowid or auto_increment id or auto_random) used
// during import, and we will use this info to do ALTER TABLE xxx AUTO_RANDOM_BASE or AUTO_INCREMENT
// on post-process phase.
func NewPanickingAllocators(base int64) autoid.Allocators {
sharedBase := &base
return autoid.NewAllocators(
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (e *BaseKVEncoder) ProcessColDatum(col *table.Column, rowID int64, inputDat
}
}
if IsAutoIncCol(col.ToInfo()) {
// same as RowIDAllocType, since SepAutoInc is always false when initializing allocators of Table.
alloc := e.Table.Allocators(e.SessionCtx).Get(autoid.AutoIncrementType)
if err := alloc.Rebase(context.Background(), GetAutoRecordID(value, &col.FieldType), false); err != nil {
return value, errors.Trace(err)
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ go_test(
name = "common_test",
timeout = "short",
srcs = [
"common_test.go",
"errors_test.go",
"main_test.go",
"once_error_test.go",
Expand All @@ -104,15 +105,23 @@ go_test(
],
embed = [":common"],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
"//ddl",
"//errno",
"//kv",
"//meta",
"//meta/autoid",
"//parser",
"//parser/ast",
"//parser/model",
"//store/driver/error",
"//store/mockstore",
"//testkit/testsetup",
"//util/dbutil",
"//util/mock",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
Expand Down
47 changes: 37 additions & 10 deletions br/pkg/lightning/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,42 @@ var DefaultImportVariablesTiDB = map[string]string{
// AllocGlobalAutoID allocs N consecutive autoIDs from TiDB.
func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64,
tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
return alloc.Alloc(ctx, uint64(n), 1, 1)
// there might be 2 allocators when tblInfo.SepAutoInc is true, and in this case
// RowIDAllocType will be the last one.
// we return the value of last Alloc as autoIDBase and autoIDMax, i.e. the value
// either comes from RowIDAllocType or AutoRandomType.
for _, alloc := range allocators {
autoIDBase, autoIDMax, err = alloc.Alloc(ctx, uint64(n), 1, 1)
if err != nil {
return 0, 0, err
}
}
return
}

// RebaseGlobalAutoID rebase the autoID base to newBase.
func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64,
tblInfo *model.TableInfo) error {
alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
if err != nil {
return err
}
return alloc.Rebase(ctx, newBase, false)
for _, alloc := range allocators {
err = alloc.Rebase(ctx, newBase, false)
if err != nil {
return err
}
}
return nil
}

func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoid.Allocator, error) {
// GetGlobalAutoIDAlloc returns the autoID allocators for a table.
// export it for testing.
func GetGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if store == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
Expand All @@ -89,20 +107,29 @@ func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
hasAutoRandID := tblInfo.ContainsAutoRandomBits()

// Current TiDB has some limitations for auto ID.
// TiDB version <= 6.4.0 has some limitations for auto ID.
// 1. Auto increment ID and auto row ID are using the same RowID allocator.
// See https://github.com/pingcap/tidb/issues/982.
// 2. Auto random column must be a clustered primary key. That is to say,
// there is no implicit row ID for tables with auto random column.
// 3. There is at most one auto column in a table.
// Therefore, we assume there is only one auto column in a table and use RowID allocator if possible.
//
// Since TiDB 6.5.0, row ID and auto ID are using different allocators when tblInfo.SepAutoInc is true
switch {
case hasRowID || hasAutoIncID:
return autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer), nil
allocators := make([]autoid.Allocator, 0, 2)
if tblInfo.SepAutoInc() && hasAutoIncID {
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.AutoIncrementType, noCache, tblVer))
}
// this allocator is NOT used when SepAutoInc=true and auto increment column is clustered.
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer))
return allocators, nil
case hasAutoRandID:
return autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer), nil
return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer)}, nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
}
Expand Down
161 changes: 161 additions & 0 deletions br/pkg/lightning/common/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common_test

import (
"context"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store/mockstore"
tmock "github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/require"
)

func newTableInfo(t *testing.T,
dbID, tableID int64,
createTableSql string, kvStore kv.Storage,
) *model.TableInfo {
p := parser.New()
se := tmock.NewContext()

node, err := p.ParseOneStmt(createTableSql, "utf8mb4", "utf8mb4_bin")
require.NoError(t, err)
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), tableID)
require.NoError(t, err)
tableInfo.State = model.StatePublic

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnLightning)
err = kv.RunInNewTxn(ctx, kvStore, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
if err := m.CreateDatabase(&model.DBInfo{ID: dbID}); err != nil && !errors.ErrorEqual(err, meta.ErrDBExists) {
return err
}
return m.CreateTableOrView(dbID, tableInfo)
})
require.NoError(t, err)
return tableInfo
}

func TestAllocGlobalAutoID(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, kvStore.Close())
})

cases := []struct {
tableID int64
createTableSQL string
expectErrStr string
expectAllocatorTypes []autoid.AllocatorType
}{
// autoID, autoIncrID = false, false
{
tableID: 11,
createTableSQL: "create table t11 (a int primary key clustered)",
expectErrStr: "has no auto ID",
expectAllocatorTypes: nil,
},
{
tableID: 12,
createTableSQL: "create table t12 (a int primary key clustered) AUTO_ID_CACHE 1",
expectErrStr: "has no auto ID",
expectAllocatorTypes: nil,
},
// autoID, autoIncrID = true, false
{
tableID: 21,
createTableSQL: "create table t21 (a int)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
{
tableID: 22,
createTableSQL: "create table t22 (a int) AUTO_ID_CACHE 1",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
// autoID, autoIncrID = false, true
{
tableID: 31,
createTableSQL: "create table t31 (a int primary key clustered auto_increment)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
{
tableID: 32,
createTableSQL: "create table t32 (a int primary key clustered auto_increment) AUTO_ID_CACHE 1",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType},
},
// autoID, autoIncrID = true, true
{
tableID: 41,
createTableSQL: "create table t41 (a int primary key nonclustered auto_increment)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
{
tableID: 42,
createTableSQL: "create table t42 (a int primary key nonclustered auto_increment) AUTO_ID_CACHE 1",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType},
},
// autoRandomID
{
tableID: 51,
createTableSQL: "create table t51 (a bigint primary key auto_random)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.AutoRandomType},
},
}
ctx := context.Background()
for _, c := range cases {
ti := newTableInfo(t, 1, c.tableID, c.createTableSQL, kvStore)
allocators, err := common.GetGlobalAutoIDAlloc(kvStore, 1, ti)
if c.expectErrStr == "" {
require.NoError(t, err, c.tableID)
require.NoError(t, common.RebaseGlobalAutoID(ctx, 123, kvStore, 1, ti))
base, idMax, err := common.AllocGlobalAutoID(ctx, 100, kvStore, 1, ti)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(123), base, c.tableID)
require.Equal(t, int64(223), idMax, c.tableID)
// all allocators are rebased and allocated
for _, alloc := range allocators {
base2, max2, err := alloc.Alloc(ctx, 100, 1, 1)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(223), base2, c.tableID)
require.Equal(t, int64(323), max2, c.tableID)
}
} else {
require.ErrorContains(t, err, c.expectErrStr, c.tableID)
}
var allocatorTypes []autoid.AllocatorType
for _, alloc := range allocators {
allocatorTypes = append(allocatorTypes, alloc.GetType())
}
require.Equal(t, c.expectAllocatorTypes, allocatorTypes, c.tableID)
}
}
33 changes: 18 additions & 15 deletions br/pkg/lightning/importer/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,45 @@ type metaMgrSuite struct {
checksumMgr *testChecksumMgr
}

func newTableRestore(t *testing.T, kvStore kv.Storage) *TableImporter {
func newTableRestore(t *testing.T,
db, table string,
dbID, tableID int64,
createTableSQL string, kvStore kv.Storage,
) *TableImporter {
p := parser.New()
se := tmock.NewContext()

node, err := p.ParseOneStmt("CREATE TABLE `t1` (`c1` varchar(5) NOT NULL)", "utf8mb4", "utf8mb4_bin")
node, err := p.ParseOneStmt(createTableSQL, "utf8mb4", "utf8mb4_bin")
require.NoError(t, err)
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), int64(1))
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), tableID)
require.NoError(t, err)
tableInfo.State = model.StatePublic

schema := "test"
tb := "t1"
ti := &checkpoints.TidbTableInfo{
ID: tableInfo.ID,
DB: schema,
Name: tb,
DB: db,
Name: table,
Core: tableInfo,
}
dbInfo := &checkpoints.TidbDBInfo{
ID: 1,
Name: schema,
ID: dbID,
Name: db,
Tables: map[string]*checkpoints.TidbTableInfo{
tb: ti,
table: ti,
},
}

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnLightning)
err = kv.RunInNewTxn(ctx, kvStore, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
if err := m.CreateDatabase(&model.DBInfo{ID: dbInfo.ID}); err != nil {
if err := m.CreateDatabase(&model.DBInfo{ID: dbInfo.ID}); err != nil && !errors.ErrorEqual(err, meta.ErrDBExists) {
return err
}
return m.CreateTableOrView(dbInfo.ID, ti.Core)
})
require.NoError(t, err)

tableName := common.UniqueTable(schema, tb)
tableName := common.UniqueTable(db, table)
logger := log.With(zap.String("table", tableName))

return &TableImporter{
Expand All @@ -93,9 +95,10 @@ func newMetaMgrSuite(t *testing.T) *metaMgrSuite {

var s metaMgrSuite
s.mgr = &dbTableMetaMgr{
session: db,
taskID: 1,
tr: newTableRestore(t, kvStore),
session: db,
taskID: 1,
tr: newTableRestore(t, "test", "t1", 1, 1,
"CREATE TABLE `t1` (`c1` varchar(5) NOT NULL)", kvStore),
tableName: common.UniqueTable("test", TableMetaTableName),
needChecksum: true,
}
Expand Down
17 changes: 15 additions & 2 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,8 +931,21 @@ func (tr *TableImporter) postProcess(
maxCap := shardFmt.IncrementalBitsCapacity()
err = AlterAutoRandom(ctx, rc.db, tr.tableName, uint64(tr.alloc.Get(autoid.AutoRandomType).Base())+1, maxCap)
} else if common.TableHasAutoRowID(tblInfo) || tblInfo.GetAutoIncrementColInfo() != nil {
// only alter auto increment id iff table contains auto-increment column or generated handle
err = AlterAutoIncrement(ctx, rc.db, tr.tableName, uint64(tr.alloc.Get(autoid.RowIDAllocType).Base())+1)
// only alter auto increment id iff table contains auto-increment column or generated handle.
// ALTER TABLE xxx AUTO_INCREMENT = yyy has a bad naming.
// if a table has implicit _tidb_rowid column & tbl.SepAutoID=false, then it works on _tidb_rowid
// allocator, even if the table has NO auto-increment column.
newBase := uint64(tr.alloc.Get(autoid.RowIDAllocType).Base()) + 1
err = AlterAutoIncrement(ctx, rc.db, tr.tableName, newBase)

if err == nil && isLocalBackend(rc.cfg) {
// for TiDB version >= 6.5.0, a table might have separate allocators for auto_increment column and _tidb_rowid,
// especially when a table has auto_increment non-clustered PK, it will use both allocators.
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core)
}
}
rc.alterTableLock.Unlock()
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusAlteredAutoInc)
Expand Down
Loading

0 comments on commit 740f7f5

Please sign in to comment.