diff --git a/ddl/ddl.go b/ddl/ddl.go index c1df201bd9315..2e49ee75b4e2f 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -280,9 +280,8 @@ type DDL interface { // ddl is used to handle the statements that define the structure or schema of the database. type ddl struct { - m sync.RWMutex - infoHandle *infoschema.Handle - quitCh chan struct{} + m sync.RWMutex + quitCh chan struct{} *ddlCtx workers map[workerType]*worker @@ -300,6 +299,7 @@ type ddlCtx struct { ddlEventCh chan<- *util.Event lease time.Duration // lease is schema lease. binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog. + infoHandle *infoschema.Handle // hook may be modified. mu struct { @@ -380,12 +380,12 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, ownerManager: manager, schemaSyncer: syncer, binlogCli: binloginfo.GetPumpsClient(), + infoHandle: infoHandle, } ddlCtx.mu.hook = hook ddlCtx.mu.interceptor = &BaseInterceptor{} d := &ddl{ - infoHandle: infoHandle, - ddlCtx: ddlCtx, + ddlCtx: ddlCtx, } d.start(ctx, ctxPool) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 62ed299cc45b1..6a9ed39f5a01e 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -487,7 +487,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, switch job.Type { case model.ActionCreateSchema: - ver, err = onCreateSchema(t, job) + ver, err = onCreateSchema(d, t, job) case model.ActionDropSchema: ver, err = onDropSchema(t, job) case model.ActionCreateTable: @@ -523,7 +523,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, case model.ActionRebaseAutoID: ver, err = onRebaseAutoID(d.store, t, job) case model.ActionRenameTable: - ver, err = onRenameTable(t, job) + ver, err = onRenameTable(d, t, job) case model.ActionShardRowID: ver, err = w.onShardRowID(d, t, job) case model.ActionModifyTableComment: diff --git a/ddl/schema.go b/ddl/schema.go index ac875e158059b..51df4ef1f4b7b 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/tidb/meta" ) -func onCreateSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) { +func onCreateSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID dbInfo := &model.DBInfo{} if err := job.DecodeArgs(dbInfo); err != nil { @@ -32,20 +32,13 @@ func onCreateSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) { dbInfo.ID = schemaID dbInfo.State = model.StateNone - dbs, err := t.ListDatabases() + err := checkSchemaNotExists(d, t, schemaID, dbInfo) if err != nil { - return ver, errors.Trace(err) - } - - for _, db := range dbs { - if db.Name.L == dbInfo.Name.L { - if db.ID != schemaID { - // The database already exists, can't create it, we should cancel this job now. - job.State = model.JobStateCancelled - return ver, infoschema.ErrDatabaseExists.GenWithStackByArgs(db.Name) - } - dbInfo = db + if infoschema.ErrDatabaseExists.Equal(err) { + // The database already exists, can't create it, we should cancel this job now. + job.State = model.JobStateCancelled } + return ver, errors.Trace(err) } ver, err = updateSchemaVersion(t, job) @@ -70,6 +63,52 @@ func onCreateSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) { } } +func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error { + // d.infoHandle maybe nil in some test. + if d.infoHandle == nil { + return checkSchemaNotExistsFromStore(t, schemaID, dbInfo) + } + // Try to use memory schema info to check first. + currVer, err := t.GetSchemaVersion() + if err != nil { + return err + } + is := d.infoHandle.Get() + if is.SchemaMetaVersion() == currVer { + return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo) + } + return checkSchemaNotExistsFromStore(t, schemaID, dbInfo) +} + +func checkSchemaNotExistsFromInfoSchema(is infoschema.InfoSchema, schemaID int64, dbInfo *model.DBInfo) error { + // Check database exists by name. + if is.SchemaExists(dbInfo.Name) { + return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbInfo.Name) + } + // Check database exists by ID. + if _, ok := is.SchemaByID(schemaID); ok { + return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbInfo.Name) + } + return nil +} + +func checkSchemaNotExistsFromStore(t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error { + dbs, err := t.ListDatabases() + if err != nil { + return errors.Trace(err) + } + + for _, db := range dbs { + if db.Name.L == dbInfo.Name.L { + if db.ID != schemaID { + return infoschema.ErrDatabaseExists.GenWithStackByArgs(db.Name) + } + dbInfo = db + } + } + return nil +} + func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) { dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job) if err != nil { diff --git a/ddl/table.go b/ddl/table.go index 179b11235b26b..e0f5764d65d9d 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -52,9 +52,11 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) } tbInfo.State = model.StateNone - err := checkTableNotExists(t, job, schemaID, tbInfo.Name.L) + err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L) if err != nil { - job.State = model.JobStateCancelled + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) { + job.State = model.JobStateCancelled + } return ver, errors.Trace(err) } @@ -96,11 +98,18 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) return ver, errors.Trace(err) } tbInfo.State = model.StateNone - err := checkTableNotExists(t, job, schemaID, tbInfo.Name.L) + err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L) if err != nil { - if infoschema.ErrDatabaseNotExists.Equal(err) || !orReplace { + if infoschema.ErrDatabaseNotExists.Equal(err) { job.State = model.JobStateCancelled return ver, errors.Trace(err) + } else if infoschema.ErrTableExists.Equal(err) { + if !orReplace { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + } else { + return ver, errors.Trace(err) } } ver, err = updateSchemaVersion(t, job) @@ -193,8 +202,11 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in return ver, errors.Trace(err) } - err = checkTableNotExists(t, job, schemaID, tblInfo.Name.L) + err = checkTableNotExists(d, t, schemaID, tblInfo.Name.L) if err != nil { + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) { + job.State = model.JobStateCancelled + } return ver, errors.Trace(err) } @@ -529,7 +541,7 @@ func verifyNoOverflowShardBits(s *sessionPool, tbl table.Table, shardRowIDBits u return nil } -func onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) { +func onRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { var oldSchemaID int64 var tableName model.CIStr if err := job.DecodeArgs(&oldSchemaID, &tableName); err != nil { @@ -543,9 +555,11 @@ func onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) { return ver, errors.Trace(err) } newSchemaID := job.SchemaID - err = checkTableNotExists(t, job, newSchemaID, tableName.L) + err = checkTableNotExists(d, t, newSchemaID, tableName.L) if err != nil { - job.State = model.JobStateCancelled + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) { + job.State = model.JobStateCancelled + } return ver, errors.Trace(err) } @@ -665,7 +679,37 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _ return ver, nil } -func checkTableNotExists(t *meta.Meta, job *model.Job, schemaID int64, tableName string) error { +func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error { + // d.infoHandle maybe nil in some test. + if d.infoHandle == nil { + return checkTableNotExistsFromStore(t, schemaID, tableName) + } + // Try to use memory schema info to check first. + currVer, err := t.GetSchemaVersion() + if err != nil { + return err + } + is := d.infoHandle.Get() + if is.SchemaMetaVersion() == currVer { + return checkTableNotExistsFromInfoSchema(is, schemaID, tableName) + } + + return checkTableNotExistsFromStore(t, schemaID, tableName) +} + +func checkTableNotExistsFromInfoSchema(is infoschema.InfoSchema, schemaID int64, tableName string) error { + // Check this table's database. + schema, ok := is.SchemaByID(schemaID) + if !ok { + return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + } + if is.TableExists(schema.Name, model.NewCIStr(tableName)) { + return infoschema.ErrTableExists.GenWithStackByArgs(tableName) + } + return nil +} + +func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string) error { // Check this table's database. tables, err := t.ListTables(schemaID) if err != nil {