Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: separate auto_increment ID from row ID #28448

Closed
wants to merge 8 commits into from
Closed
85 changes: 55 additions & 30 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/pingcap/tidb/br/pkg/conn"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/redact"
Expand Down Expand Up @@ -289,39 +290,10 @@ func BuildBackupRangeAndSchema(
zap.String("db", dbInfo.Name.O),
zap.String("table", tableInfo.Name.O),
)

tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version)
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)

var globalAutoID int64
switch {
case tableInfo.IsSequence():
globalAutoID, err = seqAlloc.NextGlobalAutoID()
case tableInfo.IsView() || !utils.NeedAutoID(tableInfo):
// no auto ID for views or table without either rowID nor auto_increment ID.
default:
globalAutoID, err = idAlloc.NextGlobalAutoID()
}
err := setTableInfoAutoIDs(storage, dbInfo, tableInfo, logger)
if err != nil {
return nil, nil, errors.Trace(err)
}
tableInfo.AutoIncID = globalAutoID

if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
globalAutoRandID, err = randAlloc.NextGlobalAutoID()
if err != nil {
return nil, nil, errors.Trace(err)
}
tableInfo.AutoRandID = globalAutoRandID
logger.Debug("change table AutoRandID",
zap.Int64("AutoRandID", globalAutoRandID))
}
logger.Debug("change table AutoIncID",
zap.Int64("AutoIncID", globalAutoID))

// remove all non-public indices
n := 0
Expand Down Expand Up @@ -355,6 +327,59 @@ func BuildBackupRangeAndSchema(
return ranges, backupSchemas, nil
}

func setTableInfoAutoIDs(storage kv.Storage, dbInfo *model.DBInfo, tableInfo *model.TableInfo, logger *zap.Logger) error {
if tableInfo.IsView() {
return nil
}
if tableInfo.IsSequence() {
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType)
nextSeqID, err := seqAlloc.NextGlobalAutoID()
if err != nil {
return errors.Trace(err)
}
tableInfo.AutoIncID = nextSeqID
logger.Debug("change sequence AutoIncID",
zap.Int64("sequence nextID", nextSeqID))
return nil
}

var autoIDs meta.AutoIDGroup
err := kv.RunInNewTxn(context.Background(), storage, true, func(ctx context.Context, txn kv.Transaction) error {
var err1 error
acc := meta.NewMeta(txn).GetAutoIDAccessors(dbInfo.ID, tableInfo.ID)
autoIDs, err1 = acc.Get()
if err1 != nil {
return errors.Trace(err1)
}
return nil
})
if err != nil {
return errors.Trace(err)
}
if !model.AutoIncrementIDIsSeparated(tableInfo.Version) {
tableInfo.AutoIncID = autoIDs.RowID + 1
logger.Debug("change table AutoIncID",
zap.Int64("AutoIncID", tableInfo.AutoIncID))
} else {
if common.TableHasAutoRowID(tableInfo) {
tableInfo.AutoRowID = autoIDs.RowID + 1
logger.Debug("change table AutoRowID",
zap.Int64("AutoRowID", tableInfo.AutoRowID))
}
if tableInfo.GetAutoIncrementColInfo() != nil {
tableInfo.AutoIncID = autoIDs.IncrementID + 1
logger.Debug("change table AutoIncID",
zap.Int64("AutoIncID", tableInfo.AutoIncID))
}
}
if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
tableInfo.AutoRandID = autoIDs.RandomID + 1
logger.Debug("change table AutoRandID",
zap.Int64("AutoRandID", tableInfo.AutoRandID))
}
return nil
}

// WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter.
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastBackupTS, backupTS uint64) error {
snapshot := store.GetSnapshot(kv.NewVersion(backupTS))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/cdclog/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (t *TableBuffer) ReloadMeta(tbl table.Table, allocator autoid.Allocators) {
if kv.TableHasAutoRowID(tbl.Meta()) {
colPerm = append(colPerm, -1)
}
if t.allocator == nil {
if t.allocator.Items == nil {
t.allocator = allocator
}
t.tableInfo = tbl
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (kvcodec *tableKVEncoder) AddRecord(
_ = alloc.Rebase(context.Background(), value.GetInt64()&((1<<incrementalBits)-1), false)
}
if isAutoIncCol {
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.RowIDAllocType)
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoIncrementType)
_ = alloc.Rebase(context.Background(), getAutoRecordID(value, &col.FieldType), false)
}
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type panickingAllocator struct {
}

// NewPanickingAllocator creates a PanickingAllocator shared by all allocation types.
func NewPanickingAllocators(base int64) autoid.Allocators {
func NewPanickingAllocators(base int64, tblVer uint16) autoid.Allocators {
sharedBase := &base
return autoid.NewAllocators(
return autoid.NewAllocators(tblVer,
&panickingAllocator{base: sharedBase, ty: autoid.RowIDAllocType},
&panickingAllocator{base: sharedBase, ty: autoid.AutoIncrementType},
&panickingAllocator{base: sharedBase, ty: autoid.AutoRandomType},
Expand Down
18 changes: 9 additions & 9 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *kvSuite) TestEncode(c *C) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0, 0), tblInfo)
c.Assert(err, IsNil)

logger := log.Logger{Logger: zap.NewNop()}
Expand Down Expand Up @@ -148,7 +148,7 @@ func (s *kvSuite) TestDecode(c *C) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0, 0), tblInfo)
c.Assert(err, IsNil)
decoder, err := NewTableKVDecoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Expand Down Expand Up @@ -191,7 +191,7 @@ func (s *kvSuite) TestDecodeIndex(c *C) {
State: model.StatePublic,
PKIsHandle: false,
}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0, 0), tblInfo)
if err != nil {
fmt.Printf("error: %v", err.Error())
}
Expand Down Expand Up @@ -232,7 +232,7 @@ func (s *kvSuite) TestEncodeRowFormatV2(c *C) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0, 0), tblInfo)
c.Assert(err, IsNil)

logger := log.Logger{Logger: zap.NewNop()}
Expand Down Expand Up @@ -281,7 +281,7 @@ func (s *kvSuite) TestEncodeTimestamp(c *C) {
}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0, 0), tblInfo)
c.Assert(err, IsNil)

logger := log.Logger{Logger: zap.NewNop()}
Expand Down Expand Up @@ -309,7 +309,7 @@ func (s *kvSuite) TestEncodeTimestamp(c *C) {

func (s *kvSuite) TestEncodeDoubleAutoIncrement(c *C) {
tblInfo := mockTableInfo(c, "create table t (id double not null auto_increment, unique key `u_id` (`id`));")
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0, 0), tblInfo)
c.Assert(err, IsNil)

logger := log.Logger{Logger: zap.NewNop()}
Expand Down Expand Up @@ -357,7 +357,7 @@ func (s *kvSuite) TestDefaultAutoRandoms(c *C) {
tblInfo := mockTableInfo(c, "create table t (id bigint unsigned NOT NULL auto_random primary key clustered, a varchar(100));")
// seems parser can't parse auto_random properly.
tblInfo.AutoRandomBits = 5
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0, 0), tblInfo)
c.Assert(err, IsNil)
encoder, err := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Expand Down Expand Up @@ -394,7 +394,7 @@ func (s *kvSuite) TestDefaultAutoRandoms(c *C) {

func (s *kvSuite) TestShardRowId(c *C) {
tblInfo := mockTableInfo(c, "create table t (s varchar(16)) shard_row_id_bits = 3;")
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0, 0), tblInfo)
c.Assert(err, IsNil)
encoder, err := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Expand Down Expand Up @@ -559,7 +559,7 @@ func (s *benchSQL2KVSuite) SetUpTest(c *C) {
tableInfo.State = model.StatePublic

// Construct the corresponding KV encoder.
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tableInfo)
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0, 0), tableInfo)
c.Assert(err, IsNil)
s.encoder, err = NewTableKVEncoder(tbl, &SessionOptions{SysVars: map[string]string{"tidb_row_format_version": "2"}})
c.Assert(err, IsNil)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *mysqlSuite) SetUpTest(c *C) {
cols = append(cols, col)
}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0, 0), tblInfo)
c.Assert(err, IsNil)

s.dbHandle = db
Expand Down Expand Up @@ -222,7 +222,7 @@ func (s *mysqlSuite) testStrictMode(c *C) {
ft.Charset = charset.CharsetASCII
col1 := &model.ColumnInfo{ID: 2, Name: model.NewCIStr("s1"), State: model.StatePublic, Offset: 1, FieldType: ft}
tblInfo := &model.TableInfo{ID: 1, Columns: []*model.ColumnInfo{col0, col1}, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0, 0), tblInfo)
c.Assert(err, IsNil)

bk := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig()))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func (rc *Controller) sampleDataFromTable(ctx context.Context, dbName string, ta
if err != nil {
return errors.Trace(err)
}
idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(0, tableInfo.Version)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo)

kvEncoder, err := rc.backend.NewEncoder(tbl, &kv.SessionOptions{
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,11 @@ func (tr *TableRestore) restoreTable(
return false, err
}
} else {
cp.AllocBase = mathutil.MaxInt64(cp.AllocBase, tr.tableInfo.Core.AutoIncID)
maxRowID := tr.tableInfo.Core.AutoIncID
if model.AutoIncrementIDIsSeparated(tr.tableInfo.Core.Version) {
maxRowID = tr.tableInfo.Core.AutoRowID
}
cp.AllocBase = mathutil.MaxInt64(cp.AllocBase, maxRowID)
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1932,7 +1932,7 @@ func (s *tableRestoreSuite) TestEstimate(c *C) {
controller := gomock.NewController(c)
defer controller.Finish()
mockBackend := mock.NewMockBackend(controller)
idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(0, s.tableInfo.Core.Version)
tbl, err := tables.TableFromMeta(idAlloc, s.tableInfo.Core)
c.Assert(err, IsNil)

Expand Down
20 changes: 16 additions & 4 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewTableRestore(
cp *checkpoints.TableCheckpoint,
ignoreColumns []string,
) (*TableRestore, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
idAlloc := kv.NewPanickingAllocators(cp.AllocBase, tableInfo.Core.Version)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core)
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
Expand Down Expand Up @@ -678,12 +678,24 @@ func (tr *TableRestore) postProcess(
if cp.Status < checkpoints.CheckpointStatusAlteredAutoInc {
rc.alterTableLock.Lock()
tblInfo := tr.tableInfo.Core
hasRowID := common.TableHasAutoRowID(tblInfo)
hasIncID := tblInfo.GetAutoIncrementColInfo() != nil
var err error
if tblInfo.PKIsHandle && tblInfo.ContainsAutoRandomBits() {
err = AlterAutoRandom(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, tr.alloc.Get(autoid.AutoRandomType).Base()+1)
} else if common.TableHasAutoRowID(tblInfo) || tblInfo.GetAutoIncrementColInfo() != nil {
// only alter auto increment id iff table contains auto-increment column or generated handle
err = AlterAutoIncrement(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, tr.alloc.Get(autoid.RowIDAllocType).Base()+1)
}
if model.AutoIncrementIDIsSeparated(tblInfo.Version) {
if hasRowID && err == nil {
err = AlterRowID(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, tr.alloc.Get(autoid.RowIDAllocType).Base()+1)
}
if hasIncID && err == nil {
err = AlterAutoIncrement(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, tr.alloc.Get(autoid.AutoIncrementType).Base()+1)
}
} else {
if (hasRowID || hasIncID) && err == nil {
// only alter auto increment id iff table contains auto-increment column or generated handle
err = AlterAutoIncrement(ctx, rc.tidbGlue.GetSQLExecutor(), tr.tableName, tr.alloc.Get(autoid.RowIDAllocType).Base()+1)
}
}
rc.alterTableLock.Unlock()
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusAlteredAutoInc)
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,21 @@ func AlterAutoIncrement(ctx context.Context, g glue.SQLExecutor, tableName strin
return errors.Annotatef(err, "%s", query)
}

func AlterRowID(ctx context.Context, g glue.SQLExecutor, tableName string, rowIDBase int64) error {
logger := log.With(zap.String("table", tableName), zap.Int64("row_id", rowIDBase))
query := fmt.Sprintf("ALTER TABLE %s ROW_ID=%d", tableName, rowIDBase)
task := logger.Begin(zap.InfoLevel, "alter table row_id")
err := g.ExecuteWithLog(ctx, query, "alter table row_id", logger)
task.End(zap.ErrorLevel, err)
if err != nil {
task.Error(
"alter table row_id failed, please perform the query manually",
zap.String("query", query),
)
}
return errors.Annotatef(err, "%s", query)
}

func AlterAutoRandom(ctx context.Context, g glue.SQLExecutor, tableName string, randomBase int64) error {
logger := log.With(zap.String("table", tableName), zap.Int64("auto_random", randomBase))
query := fmt.Sprintf("ALTER TABLE %s AUTO_RANDOM_BASE=%d", tableName, randomBase)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (l *LogClient) reloadTableMeta(dom *domain.Domain, tableID int64, item *cdc
zap.Int64("restore table id", newTableID),
zap.String("restore table name", item.Table),
zap.String("restore schema name", item.Schema),
zap.Any("allocator", len(allocs)),
zap.Any("allocator", len(allocs.Items)),
zap.Any("auto", newTableInfo.Meta().GetAutoIncrementColInfo()),
)
return nil
Expand Down
7 changes: 0 additions & 7 deletions br/pkg/utils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ import (
// temporaryDBNamePrefix is the prefix name of system db, e.g. mysql system db will be rename to __TiDB_BR_Temporary_mysql
const temporaryDBNamePrefix = "__TiDB_BR_Temporary_"

// NeedAutoID checks whether the table needs backing up with an autoid.
func NeedAutoID(tblInfo *model.TableInfo) bool {
hasRowID := !tblInfo.PKIsHandle && !tblInfo.IsCommonHandle
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
return hasRowID || hasAutoIncID
}

// Database wraps the schema and tables of a database.
type Database struct {
Info *model.DBInfo
Expand Down
8 changes: 8 additions & 0 deletions cmd/explaintest/r/separate_auto_id.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
drop table if exists t;
create table t (c bigint auto_increment, unique key(c));
insert into t values();
insert into t values();
select *, _tidb_rowid from t;
c _tidb_rowid
1 1
2 2
6 changes: 6 additions & 0 deletions cmd/explaintest/t/separate_auto_id.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- See https://github.com/pingcap/tidb/issues/982 for the original issue.
drop table if exists t;
create table t (c bigint auto_increment, unique key(c));
insert into t values();
insert into t values();
select *, _tidb_rowid from t;
Loading