Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: separate auto_increment ID allocator from _tidb_rowid allocator #20708

Closed
wants to merge 12 commits into from
Closed
12 changes: 7 additions & 5 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2321,13 +2321,13 @@ func (s *testIntegrationSuite3) TestCreateTableWithAutoIdCache(c *C) {
c.Assert(err, IsNil)

tk.MustExec("insert into t(b) values(NULL)")
tk.MustQuery("select b, _tidb_rowid from t").Check(testkit.Rows("1 2"))
tk.MustQuery("select b, _tidb_rowid from t").Check(testkit.Rows("1 1"))
tk.MustExec("delete from t")

// Invalid the allocator cache, insert will trigger a new cache.
tk.MustExec("rename table t to t1;")
tk.MustExec("insert into t1(b) values(NULL)")
tk.MustQuery("select b, _tidb_rowid from t1").Check(testkit.Rows("101 102"))
tk.MustQuery("select b, _tidb_rowid from t1").Check(testkit.Rows("101 101"))
tk.MustExec("delete from t1")

// Test alter auto_id_cache.
Expand All @@ -2337,13 +2337,13 @@ func (s *testIntegrationSuite3) TestCreateTableWithAutoIdCache(c *C) {
c.Assert(tblInfo.Meta().AutoIdCache, Equals, int64(200))

tk.MustExec("insert into t1(b) values(NULL)")
tk.MustQuery("select b, _tidb_rowid from t1").Check(testkit.Rows("201 202"))
tk.MustQuery("select b, _tidb_rowid from t1").Check(testkit.Rows("201 201"))
tk.MustExec("delete from t1")

// Invalid the allocator cache, insert will trigger a new cache.
tk.MustExec("rename table t1 to t;")
tk.MustExec("insert into t(b) values(NULL)")
tk.MustQuery("select b, _tidb_rowid from t").Check(testkit.Rows("401 402"))
tk.MustQuery("select b, _tidb_rowid from t").Check(testkit.Rows("401 401"))
tk.MustExec("delete from t")

tk.MustExec("drop table if exists t;")
Expand All @@ -2354,7 +2354,9 @@ func (s *testIntegrationSuite3) TestCreateTableWithAutoIdCache(c *C) {
c.Assert(tblInfo.Meta().AutoIdCache, Equals, int64(3))

// Test insert batch size(4 here) greater than the customized autoid step(3 here).
tk.MustExec("insert into t(a) values(NULL),(NULL),(NULL),(NULL)")
// We split them into 2 statements to trigger the cache for the second time.
tk.MustExec("insert into t(a) values(NULL),(NULL),(NULL);")
tk.MustExec("insert into t(a) values(NULL);")
tk.MustQuery("select a from t").Check(testkit.Rows("1", "2", "3", "4"))
tk.MustExec("delete from t")

Expand Down
10 changes: 4 additions & 6 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5122,10 +5122,9 @@ func (s *testSerialDBSuite) TestAlterShardRowIDBits(c *C) {
tk.MustExec(fmt.Sprintf("alter table t1 auto_increment = %d;", 1<<56))
tk.MustExec("insert into t1 set a=1;")

// Test increase shard_row_id_bits failed by overflow global auto ID.
// Test rebase auto_increment does not affect shard_row_id_bits.
_, err := tk.Exec("alter table t1 SHARD_ROW_ID_BITS = 10;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[autoid:1467]shard_row_id_bits 10 will cause next global auto ID 72057594037932936 overflow")
c.Assert(err, IsNil)

// Test reduce shard_row_id_bits will be ok.
tk.MustExec("alter table t1 SHARD_ROW_ID_BITS = 3;")
Expand All @@ -5134,7 +5133,7 @@ func (s *testSerialDBSuite) TestAlterShardRowIDBits(c *C) {
c.Assert(tbl.Meta().MaxShardRowIDBits == maxShardRowIDBits, IsTrue)
c.Assert(tbl.Meta().ShardRowIDBits == shardRowIDBits, IsTrue)
}
checkShardRowID(5, 3)
checkShardRowID(10, 3)

// Test reduce shard_row_id_bits but calculate overflow should use the max record shard_row_id_bits.
tk.MustExec("drop table if exists t1")
Expand All @@ -5143,8 +5142,7 @@ func (s *testSerialDBSuite) TestAlterShardRowIDBits(c *C) {
checkShardRowID(10, 5)
tk.MustExec(fmt.Sprintf("alter table t1 auto_increment = %d;", 1<<56))
_, err = tk.Exec("insert into t1 set a=1;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[autoid:1467]Failed to read auto-increment value from storage engine")
c.Assert(err, IsNil)
}

// port from mysql
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,4 +618,5 @@ type RecoverInfo struct {
SnapshotTS uint64
CurAutoIncID int64
CurAutoRandID int64
CurRowID int64
}
20 changes: 13 additions & 7 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1793,7 +1793,7 @@ func (d *ddl) CreateTableWithInfo(
// Default tableAutoIncID base is 0.
// If the first ID is expected to greater than 1, we need to do rebase.
newEnd := tbInfo.AutoIncID - 1
if err = d.handleAutoIncID(tbInfo, schema.ID, newEnd, autoid.RowIDAllocType); err != nil {
if err = d.handleAutoIncID(tbInfo, schema.ID, newEnd, autoid.AutoIncrementType); err != nil {
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -1860,8 +1860,8 @@ func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (er
SchemaName: schema.Name.L,
Type: model.ActionRecoverTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo, recoverInfo.CurAutoIncID, recoverInfo.DropJobID,
recoverInfo.SnapshotTS, recoverTableCheckFlagNone, recoverInfo.CurAutoRandID},
Args: []interface{}{tbInfo, recoverInfo.CurRowID, recoverInfo.DropJobID,
recoverInfo.SnapshotTS, recoverTableCheckFlagNone, recoverInfo.CurAutoRandID, recoverInfo.CurAutoIncID},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
Expand Down Expand Up @@ -2434,7 +2434,7 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
}
err = d.ShardRowID(ctx, ident, opt.UintValue)
case ast.TableOptionAutoIncrement:
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue), autoid.RowIDAllocType)
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue), autoid.AutoIncrementType)
case ast.TableOptionAutoIdCache:
if opt.UintValue > uint64(math.MaxInt64) {
// TODO: Refine this error.
Expand Down Expand Up @@ -2517,7 +2517,7 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6
return errors.Trace(ErrInvalidAutoRandom.GenWithStackByArgs(errMsg))
}
actionType = model.ActionRebaseAutoRandomBase
case autoid.RowIDAllocType:
case autoid.AutoIncrementType:
actionType = model.ActionRebaseAutoID
}

Expand Down Expand Up @@ -2552,15 +2552,21 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
if err != nil {
return errors.Trace(err)
}
if uVal == t.Meta().ShardRowIDBits {
tblInfo := t.Meta()
if uVal == tblInfo.ShardRowIDBits {
// Nothing need to do.
return nil
}
if uVal > 0 && t.Meta().PKIsHandle {
noRowID := tblInfo.PKIsHandle || tblInfo.IsCommonHandle
if uVal > 0 && noRowID {
return errUnsupportedShardRowIDBits
}
err = verifyNoOverflowShardBits(d.sessPool, t, uVal)
if err != nil {
// Ignore the 'shard_row_id_bits' option if there is no _tidb_rowid allocator.
if errors.ErrorEqual(err, autoid.ErrAutoIDAllocatorNotFound) {
return nil
}
return err
}
job := &model.Job{
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
case model.ActionTruncateTable:
ver, err = onTruncateTable(d, t, job)
case model.ActionRebaseAutoID:
ver, err = onRebaseRowIDType(d.store, t, job)
ver, err = onRebaseAutoIncrementIDType(d.store, t, job)
case model.ActionRebaseAutoRandomBase:
ver, err = onRebaseAutoRandomType(d.store, t, job)
case model.ActionRenameTable:
Expand Down
50 changes: 17 additions & 33 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,10 @@ const (
func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo := &model.TableInfo{}
var autoIncID, autoRandID, dropJobID, recoverTableCheckFlag int64
var rowID, autoIncID, autoRandID, dropJobID, recoverTableCheckFlag int64
var snapshotTS uint64
const checkFlagIndexInJobArgs = 4 // The index of `recoverTableCheckFlag` in job arg list.
if err = job.DecodeArgs(tblInfo, &autoIncID, &dropJobID, &snapshotTS, &recoverTableCheckFlag, &autoRandID); err != nil {
if err = job.DecodeArgs(tblInfo, &rowID, &dropJobID, &snapshotTS, &recoverTableCheckFlag, &autoRandID, &autoIncID); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -310,7 +310,7 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in

tblInfo.State = model.StatePublic
tblInfo.UpdateTS = t.StartTS
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoIncID, autoRandID)
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, rowID, autoIncID, autoRandID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -504,8 +504,8 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
return ver, nil
}

func onRebaseRowIDType(store kv.Storage, t *meta.Meta, job *model.Job) (ver int64, _ error) {
return onRebaseAutoID(store, t, job, autoid.RowIDAllocType)
func onRebaseAutoIncrementIDType(store kv.Storage, t *meta.Meta, job *model.Job) (ver int64, _ error) {
return onRebaseAutoID(store, t, job, autoid.AutoIncrementType)
}

func onRebaseAutoRandomType(store kv.Storage, t *meta.Meta, job *model.Job) (ver int64, _ error) {
Expand All @@ -526,7 +526,7 @@ func onRebaseAutoID(store kv.Storage, t *meta.Meta, job *model.Job, tp autoid.Al
return ver, errors.Trace(err)
}
// No need to check `newBase` again, because `RebaseAutoID` will do this check.
if tp == autoid.RowIDAllocType {
if tp == autoid.AutoIncrementType {
tblInfo.AutoIncID = newBase
} else {
tblInfo.AutoRandID = newBase
Expand Down Expand Up @@ -619,7 +619,11 @@ func verifyNoOverflowShardBits(s *sessionPool, tbl table.Table, shardRowIDBits u
defer s.put(ctx)

// Check next global max auto ID first.
autoIncID, err := tbl.Allocators(ctx).Get(autoid.RowIDAllocType).NextGlobalAutoID(tbl.Meta().ID)
var alloc autoid.Allocator
if alloc = tbl.Allocators(ctx).Get(autoid.RowIDAllocType); alloc == nil {
return autoid.ErrAutoIDAllocatorNotFound.GenWithStackByArgs("_tidb_rowid")
}
autoIncID, err := alloc.NextGlobalAutoID(tbl.Meta().ID)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -694,17 +698,10 @@ func checkAndRenameTables(t *meta.Meta, job *model.Job, oldSchemaID int64, newSc
return ver, tblInfo, errors.Trace(err)
}

var autoTableID int64
var autoRandID int64
shouldDelAutoID := false
if newSchemaID != oldSchemaID {
shouldDelAutoID = true
autoTableID, err = t.GetAutoTableID(tblInfo.GetDBID(oldSchemaID), tblInfo.ID)
if err != nil {
job.State = model.JobStateCancelled
return ver, tblInfo, errors.Trace(err)
}
autoRandID, err = t.GetAutoRandomID(tblInfo.GetDBID(oldSchemaID), tblInfo.ID)
var rowID, incID, randID int64
shouldRecreateAutoID := newSchemaID != oldSchemaID
if shouldRecreateAutoID {
rowID, incID, randID, err = t.GetAllAutoIDs(tblInfo.GetDBID(oldSchemaID), tblInfo.ID)
if err != nil {
job.State = model.JobStateCancelled
return ver, tblInfo, errors.Trace(err)
Expand All @@ -714,7 +711,7 @@ func checkAndRenameTables(t *meta.Meta, job *model.Job, oldSchemaID int64, newSc
tblInfo.OldSchemaID = 0
}

err = t.DropTableOrView(oldSchemaID, tblInfo.ID, shouldDelAutoID)
err = t.DropTableOrView(oldSchemaID, tblInfo.ID, shouldRecreateAutoID)
if err != nil {
job.State = model.JobStateCancelled
return ver, tblInfo, errors.Trace(err)
Expand All @@ -730,24 +727,11 @@ func checkAndRenameTables(t *meta.Meta, job *model.Job, oldSchemaID int64, newSc
})

tblInfo.Name = *tableName
err = t.CreateTableOrView(newSchemaID, tblInfo)
err = t.CreateTableAndSetAutoID(newSchemaID, tblInfo, rowID, incID, randID)
if err != nil {
job.State = model.JobStateCancelled
return ver, tblInfo, errors.Trace(err)
}
// Update the table's auto-increment ID.
if newSchemaID != oldSchemaID {
_, err = t.GenAutoTableID(newSchemaID, tblInfo.ID, autoTableID)
if err != nil {
job.State = model.JobStateCancelled
return ver, tblInfo, errors.Trace(err)
}
_, err = t.GenAutoRandomID(newSchemaID, tblInfo.ID, autoRandID)
if err != nil {
job.State = model.JobStateCancelled
return ver, tblInfo, errors.Trace(err)
}
}

return ver, tblInfo, nil
}
Expand Down
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,7 @@ const (
ErrTableOptionInsertMethodUnsupported = 8233
ErrInvalidPlacementSpec = 8234
ErrDDLReorgElementNotExist = 8235
ErrAutoIDAllocatorNotFound = 8236

// TiKV/PD errors.
ErrPDServerTimeout = 9001
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrUnknownAllocatorType: mysql.Message("Invalid allocator type", nil),
ErrAutoRandReadFailed: mysql.Message("Failed to read auto-random value from storage engine", nil),
ErrInvalidIncrementAndOffset: mysql.Message("Invalid auto_increment settings: auto_increment_increment: %d, auto_increment_offset: %d, both of them must be in range [1..65535]", nil),
ErrAutoIDAllocatorNotFound: mysql.Message("ID allocator for %s not found", nil),

ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil),
ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ error = '''
Failed to read auto-random value from storage engine
'''

["autoid:8236"]
error = '''
ID allocator for %s not found
'''

["ddl:1025"]
error = '''
Error on rename of '%-.210s' to '%-.210s' (errno: %d - %s)
Expand Down
21 changes: 10 additions & 11 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
return infoschema.ErrTableExists.GenWithStack("Table '%-.192s' already been recover to '%-.192s', can't be recover repeatedly", s.Table.Name.O, tbl.Meta().Name.O)
}

autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
rowID, autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
if err != nil {
return err
}
Expand All @@ -411,28 +411,26 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
SnapshotTS: job.StartTS,
CurAutoIncID: autoIncID,
CurAutoRandID: autoRandID,
CurRowID: rowID,
}
// Call DDL RecoverTable.
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, recoverInfo)
return err
}

func (e *DDLExec) getTableAutoIDsFromSnapshot(job *model.Job) (autoIncID, autoRandID int64, err error) {
func (e *DDLExec) getTableAutoIDsFromSnapshot(job *model.Job) (rowID, autoIncID, autoRandID int64, err error) {
// Get table original autoIDs before table drop.
dom := domain.GetDomain(e.ctx)
m, err := dom.GetSnapshotMeta(job.StartTS)
if err != nil {
return 0, 0, err
return 0, 0, 0, err
}
autoIncID, err = m.GetAutoTableID(job.SchemaID, job.TableID)
rowID, autoIncID, autoRandID, err = m.GetAllAutoIDs(job.SchemaID, job.TableID)
if err != nil {
return 0, 0, errors.Errorf("recover table_id: %d, get original autoIncID from snapshot meta err: %s", job.TableID, err.Error())
err = errors.Errorf("recover table_id: %d, get original autoIncID from snapshot meta err: %s", job.TableID, err.Error())
return
}
autoRandID, err = m.GetAutoRandomID(job.SchemaID, job.TableID)
if err != nil {
return 0, 0, errors.Errorf("recover table_id: %d, get original autoRandID from snapshot meta err: %s", job.TableID, err.Error())
}
return autoIncID, autoRandID, nil
return
}

func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
Expand Down Expand Up @@ -575,7 +573,7 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
return infoschema.ErrTableExists.GenWithStack("Table '%-.192s' already been flashback to '%-.192s', can't be flashback repeatedly", s.Table.Name.O, tbl.Meta().Name.O)
}

autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
rowID, autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
if err != nil {
return err
}
Expand All @@ -586,6 +584,7 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
SnapshotTS: job.StartTS,
CurAutoIncID: autoIncID,
CurAutoRandID: autoRandID,
CurRowID: rowID,
}
// Call DDL RecoverTable.
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, recoverInfo)
Expand Down
12 changes: 7 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,15 @@ func (e *ShowNextRowIDExec) Next(ctx context.Context, req *chunk.Chunk) error {

var colName, idType string
switch alloc.GetType() {
case autoid.RowIDAllocType, autoid.AutoIncrementType:
case autoid.RowIDAllocType:
colName = model.ExtraHandleName.O
tangenta marked this conversation as resolved.
Show resolved Hide resolved
case autoid.AutoIncrementType:
idType = "AUTO_INCREMENT"
if col := tblMeta.GetAutoIncrementColInfo(); col != nil {
colName = col.Name.O
} else {
colName = model.ExtraHandleName.O
col := tblMeta.GetAutoIncrementColInfo()
if col == nil {
return errors.Errorf("auto_increment column not found")
}
colName = col.Name.O
case autoid.AutoRandomType:
idType = "AUTO_RANDOM"
colName = tblMeta.GetPkName().O
Expand Down
5 changes: 1 addition & 4 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3516,15 +3516,12 @@ func (s *testSuite) TestCheckIndex(c *C) {
c.Assert(err, IsNil)
is := s.domain.InfoSchema()
db := model.NewCIStr("test_admin")
dbInfo, ok := is.SchemaByName(db)
c.Assert(ok, IsTrue)
tblName := model.NewCIStr("t")
tbl, err := is.TableByName(db, tblName)
c.Assert(err, IsNil)
tbInfo := tbl.Meta()

alloc := autoid.NewAllocator(s.store, dbInfo.ID, false, autoid.RowIDAllocType)
tb, err := tables.TableFromMeta(autoid.NewAllocators(alloc), tbInfo)
tb, err := tables.TableFromMeta(nil, tbInfo)
c.Assert(err, IsNil)

_, err = se.Execute(context.Background(), "admin check index t c")
Expand Down
Loading