Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support the statement of "rename database" #3

Merged
merged 2 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2652,6 +2652,28 @@ func (s *testDBSuite3) TestTruncateTable(c *C) {

}

func (s *testDBSuite3) TestRenameDatabase(c *C) {
s.tk = testkit.NewTestKitWithInit(c, s.store)
s.tk.MustExec("create database db charset utf8 collate utf8_general_ci;")
s.tk.MustExec("use db")
s.tk.MustExec("create table t(c1 int, c2 int)")
ctx := s.tk.Se.(sessionctx.Context)
is := domain.GetDomain(ctx).InfoSchema()
dbInfo, ok := is.SchemaByName(model.NewCIStr("db"))
c.Assert(ok, IsTrue)
oldDatabaseID := dbInfo.ID
s.tk.MustExec("rename database db to newDB")
is = domain.GetDomain(ctx).InfoSchema()
newDBInfo, ok := is.SchemaByName(model.NewCIStr("newDB"))
c.Assert(ok, IsTrue)
c.Assert(oldDatabaseID, Equals, newDBInfo.ID)

// for failure cases
s.tk.MustGetErrCode("rename database db_not_exit to db", errno.ErrBadDB)
s.tk.MustGetErrCode("rename database newDB to test", errno.ErrDBCreateExists)
s.tk.MustGetErrCode("rename database newDB to dbxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", errno.ErrTooLongIdent)
}

func (s *testDBSuite4) TestRenameTable(c *C) {
isAlterTable := false
s.testRenameTable(c, "rename table %s to %s", isAlterTable)
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type DDL interface {
CreateSchema(ctx sessionctx.Context, name model.CIStr, charsetInfo *ast.CharsetOpt) error
AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error
DropSchema(ctx sessionctx.Context, schema model.CIStr) error
RenameSchema(ctx sessionctx.Context, oldDB, newDB model.CIStr) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
Expand Down
27 changes: 27 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ func (d *ddl) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (
return errors.Trace(err)
}

func (d *ddl) RenameSchema(ctx sessionctx.Context, oldDB, newDB model.CIStr) error {
is := d.GetInfoSchemaWithInterceptor(ctx)
oldSchema, ok := is.SchemaByName(oldDB)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
}
_, ok = is.SchemaByName(newDB)
if ok {
return errors.Trace(infoschema.ErrDatabaseExists)
}
if err := checkTooLongSchema(newDB); err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaID: oldSchema.ID,
SchemaName: oldSchema.Name.L,
Type: model.ActionRenameDatabase,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newDB},
}

err := d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
old, ok := is.SchemaByName(schema)
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onRebaseAutoRandomType(d.store, t, job)
case model.ActionRenameTable:
ver, err = onRenameTable(d, t, job)
case model.ActionRenameDatabase:
ver, err = onRenameSchema(t, job)
case model.ActionShardRowID:
ver, err = w.onShardRowID(d, t, job)
case model.ActionModifyTableComment:
Expand Down
8 changes: 8 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
{act: model.ActionAddPrimaryKey, jobIDs: []int64{firstID + 35}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 35)}, cancelState: model.StatePublic},
{act: model.ActionDropPrimaryKey, jobIDs: []int64{firstID + 36}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly},
{act: model.ActionDropPrimaryKey, jobIDs: []int64{firstID + 37}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 37)}, cancelState: model.StateDeleteOnly},

{act: model.ActionRenameDatabase, jobIDs: []int64{firstID + 38}, cancelRetErrs: noErrs, cancelState: model.StateNone},
}

return tests
Expand Down Expand Up @@ -851,6 +853,12 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
testDropIndex(c, ctx, d, dbInfo, tblInfo, idxOrigName)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkDropIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true)

// for rename database
updateTest(&tests[33])
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo1.ID, 0, model.ActionRenameDatabase, []interface{}{"newDB"}, &cancelState)
c.Check(checkErr, IsNil)
testCheckSchemaState(c, d, dbInfo, model.StatePublic)
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
51 changes: 39 additions & 12 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func onCreateSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
dbInfo.ID = schemaID
dbInfo.State = model.StateNone

err := checkSchemaNotExists(d, t, schemaID, dbInfo)
err := checkSchemaNotExists(d, t, schemaID, dbInfo.Name)
if err != nil {
if infoschema.ErrDatabaseExists.Equal(err) {
// The database already exists, can't create it, we should cancel this job now.
Expand Down Expand Up @@ -63,10 +63,10 @@ func onCreateSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
}
}

func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error {
func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbName model.CIStr) error {
// d.infoHandle maybe nil in some test.
if d.infoHandle == nil {
return checkSchemaNotExistsFromStore(t, schemaID, dbInfo)
return checkSchemaNotExistsFromStore(t, schemaID, dbName)
}
// Try to use memory schema info to check first.
currVer, err := t.GetSchemaVersion()
Expand All @@ -75,35 +75,34 @@ func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model
}
is := d.infoHandle.Get()
if is.SchemaMetaVersion() == currVer {
return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo)
return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbName)
}
return checkSchemaNotExistsFromStore(t, schemaID, dbInfo)
return checkSchemaNotExistsFromStore(t, schemaID, dbName)
}

func checkSchemaNotExistsFromInfoSchema(is infoschema.InfoSchema, schemaID int64, dbInfo *model.DBInfo) error {
func checkSchemaNotExistsFromInfoSchema(is infoschema.InfoSchema, schemaID int64, dbName model.CIStr) error {
// Check database exists by name.
if is.SchemaExists(dbInfo.Name) {
return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbInfo.Name)
if is.SchemaExists(dbName) {
return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbName)
}
// Check database exists by ID.
if _, ok := is.SchemaByID(schemaID); ok {
return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbInfo.Name)
return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbName)
}
return nil
}

func checkSchemaNotExistsFromStore(t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error {
func checkSchemaNotExistsFromStore(t *meta.Meta, schemaID int64, dbName model.CIStr) error {
dbs, err := t.ListDatabases()
if err != nil {
return errors.Trace(err)
}

for _, db := range dbs {
if db.Name.L == dbInfo.Name.L {
if db.Name.L == dbName.L {
if db.ID != schemaID {
return infoschema.ErrDatabaseExists.GenWithStackByArgs(db.Name)
}
dbInfo = db
}
}
return nil
Expand Down Expand Up @@ -140,6 +139,34 @@ func onModifySchemaCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _
return ver, nil
}

func onRenameSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var newDBName model.CIStr
if err := job.DecodeArgs(&newDBName); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if err = checkSchemaNotExistsFromStore(t, job.SchemaID, newDBName); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

dbInfo.Name = newDBName
if err = t.UpdateDatabase(dbInfo); err != nil {
return ver, errors.Trace(err)
}
if ver, err = updateSchemaVersion(t, job); err != nil {
return ver, errors.Trace(err)
}
job.FinishDBJob(model.JobStateDone, model.StatePublic, ver, dbInfo)
return ver, nil
}

func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
err = e.executeFlashbackTable(x)
case *ast.RenameTableStmt:
err = e.executeRenameTable(x)
case *ast.RenameDatabaseStmt:
err = e.executeRenameDatabase(x)
case *ast.TruncateTableStmt:
err = e.executeTruncateTable(x)
case *ast.LockTablesStmt:
Expand Down Expand Up @@ -152,6 +154,19 @@ func (e *DDLExec) executeTruncateTable(s *ast.TruncateTableStmt) error {
return err
}

func (e *DDLExec) executeRenameDatabase(s *ast.RenameDatabaseStmt) error {
oldDB := model.NewCIStr(s.OldDB)
newDB := model.NewCIStr(s.NewDB)

// Protect important system table from been dropped by a mistake.
// I can hardly find a case that a user really need to do this.
if oldDB.L == "mysql" {
return errors.New("Rename 'mysql' database is forbidden")
}

return domain.GetDomain(e.ctx).DDL().RenameSchema(e.ctx, oldDB, newDB)
}

func (e *DDLExec) executeRenameTable(s *ast.RenameTableStmt) error {
if len(s.TableToTables) != 1 {
// Now we only allow one schema changing at the same time.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,5 @@ require (
)

go 1.13

replace github.com/pingcap/parser => github.com/bb7133/parser v0.0.0-20200429032743-4494e5f18a8d
bb7133 marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/appleboy/gin-jwt/v2 v2.6.3/go.mod h1:MfPYA4ogzvOcVkRwAxT7quHOtQmVKDpTwxyUrC2DNw0=
github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw=
github.com/bb7133/parser v0.0.0-20200429032743-4494e5f18a8d h1:cSjNiwPNKJfxrRP+2w6hG9mnWW9eWFYFrCcifRm1D6g=
github.com/bb7133/parser v0.0.0-20200429032743-4494e5f18a8d/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -388,6 +390,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Ya9AIoYBpE=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zimulala/parser v0.0.0-20200428092446-11e0c7903208 h1:+UobXq2Tc+lu10/7YZOFdSXThYlxLc3RkH7jShkmZig=
github.com/zimulala/parser v0.0.0-20200428092446-11e0c7903208/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
Expand Down
21 changes: 21 additions & 0 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
} else if diff.Type == model.ActionModifySchemaCharsetAndCollate {
return nil, b.applyModifySchemaCharsetAndCollate(m, diff)
}
if diff.Type == model.ActionRenameDatabase {
return nil, b.applyRenameSchema(m, diff)
}
roDBInfo, ok := b.is.SchemaByID(diff.SchemaID)
if !ok {
return nil, ErrDatabaseNotExists.GenWithStackByArgs(
Expand Down Expand Up @@ -188,6 +191,24 @@ func (b *Builder) applyModifySchemaCharsetAndCollate(m *meta.Meta, diff *model.S
return nil
}

func (b *Builder) applyRenameSchema(m *meta.Meta, diff *model.SchemaDiff) error {
di, err := m.GetDatabase(diff.SchemaID)
if err != nil {
return errors.Trace(err)
}
oldDB, ok := b.is.SchemaByID(diff.SchemaID)
if di == nil || ok {
// This should never happen.
return ErrDatabaseNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", diff.SchemaID),
)
}

newDbInfo := b.copySchemaTables(oldDB.Name.O)
newDbInfo.Name = di.Name
return nil
}

func (b *Builder) applyDropSchema(schemaID int64) []int64 {
di, ok := b.is.SchemaByID(schemaID)
if !ok {
Expand Down
13 changes: 13 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2666,6 +2666,19 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.CreatePriv, v.Name.Schema.L,
v.Name.Name.L, "", authErr)
case *ast.RenameDatabaseStmt:
if b.ctx.GetSessionVars().User != nil {
authErr = ErrDBaccessDenied.GenWithStackByArgs(b.ctx.GetSessionVars().User.Username,
b.ctx.GetSessionVars().User.Hostname, v.OldDB)
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, v.OldDB,
"", "", authErr)
if b.ctx.GetSessionVars().User != nil {
authErr = ErrDBaccessDenied.GenWithStackByArgs(b.ctx.GetSessionVars().User.Username,
b.ctx.GetSessionVars().User.Hostname, v.NewDB)
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.CreatePriv, v.NewDB,
"", "", authErr)
case *ast.DropDatabaseStmt:
if b.ctx.GetSessionVars().User != nil {
authErr = ErrDBaccessDenied.GenWithStackByArgs(b.ctx.GetSessionVars().User.Username,
Expand Down
12 changes: 12 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
p.checkCreateDatabaseGrammar(node)
case *ast.AlterDatabaseStmt:
p.checkAlterDatabaseGrammar(node)
case *ast.RenameDatabaseStmt:
p.checkRenameDatabaseGrammar(node)
case *ast.DropDatabaseStmt:
p.checkDropDatabaseGrammar(node)
case *ast.ShowStmt:
Expand Down Expand Up @@ -391,6 +393,16 @@ func (p *preprocessor) checkAlterDatabaseGrammar(stmt *ast.AlterDatabaseStmt) {
}
}

func (p *preprocessor) checkRenameDatabaseGrammar(stmt *ast.RenameDatabaseStmt) {
if isIncorrectName(stmt.OldDB) {
p.err = ddl.ErrWrongDBName.GenWithStackByArgs(stmt.OldDB)
return
}
if isIncorrectName(stmt.NewDB) {
p.err = ddl.ErrWrongDBName.GenWithStackByArgs(stmt.NewDB)
}
}

func (p *preprocessor) checkDropDatabaseGrammar(stmt *ast.DropDatabaseStmt) {
if isIncorrectName(stmt.Name) {
p.err = ddl.ErrWrongDBName.GenWithStackByArgs(stmt.Name)
Expand Down