Skip to content

Commit

Permalink
fix table lock bug in drop table/schema, truncate table bug
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed May 7, 2019
1 parent fee3c99 commit 5b1a0ca
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 9 deletions.
25 changes: 25 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2867,6 +2867,31 @@ func (s *testDBSuite2) TestLockTables(c *C) {
tk2.MustExec("lock tables t1 write")
tk2.MustExec("lock tables t1 write, t2 read")

// Test lock tables and drop tables
tk.MustExec("unlock tables")
tk2.MustExec("unlock tables")
tk.MustExec("lock tables t1 write, t2 write")
tk.MustExec("drop table t1")
tk.MustExec("create table t1 (a int)")
tk.MustExec("lock tables t1 write, t2 read")

// Test lock tables and drop database.
tk.MustExec("unlock tables")
tk.MustExec("create database test_lock")
tk.MustExec("create table test_lock.t3 (a int)")
tk.MustExec("lock tables t1 write, test_lock.t3 write")
tk.MustExec("drop database test_lock")
tk.MustExec("create table t3 (a int)")
tk.MustExec("lock tables t1 write, t3 read")

// Test lock tables and truncate tables.
tk.MustExec("unlock tables")
tk.MustExec("lock tables t1 write, t2 read")
tk.MustExec("truncate table t1")
tk.MustExec("insert into t1 set a=1")
_, err = tk2.Exec("insert into t1 set a=1")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue)

// Test for lock unsupported schema tables.
_, err = tk2.Exec("lock tables performance_schema.global_status write")
c.Assert(terror.ErrorEqual(err, table.ErrUnsupportedOp), IsTrue)
Expand Down
28 changes: 28 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,24 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
}
tbs := is.SchemaTables(schema)
job := &model.Job{
SchemaID: old.ID,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{},
}

err = d.doDDLJob(ctx, job)
if err == nil {
// clear table locks.
tableLocks := make([]model.TableLockTpInfo, 0)
for _, tb := range tbs {
if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok {
tableLocks = append(tableLocks, model.TableLockTpInfo{SchemaID: old.ID, TableID: tb.Meta().ID})
}
}
ctx.ReleaseTableLock(tableLocks)
}
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
Expand Down Expand Up @@ -2759,6 +2770,12 @@ func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
}

err = d.doDDLJob(ctx, job)
if err == nil {
if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok {
ctx.ReleaseTableLock([]model.TableLockTpInfo{{SchemaID: schema.ID, TableID: tb.Meta().ID}})
}
}

err = d.callHookOnChanged(err)
return errors.Trace(err)
}
Expand Down Expand Up @@ -2802,7 +2819,18 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newTableID},
}
if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok {
// AddTableLock here to avoid this ddl job was execute successful but the session was been kill before return.
ctx.AddTableLock(([]model.TableLockTpInfo{{SchemaID: schema.ID, TableID: newTableID, Tp: tb.Meta().Lock.Tp}}))
}
err = d.doDDLJob(ctx, job)
if err == nil {
if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok {
ctx.ReleaseTableLock([]model.TableLockTpInfo{{SchemaID: schema.ID, TableID: tb.Meta().ID}})
}
} else {
ctx.ReleaseTableLock([]model.TableLockTpInfo{{SchemaID: schema.ID, TableID: newTableID}})
}
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
Expand Down
13 changes: 10 additions & 3 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,12 +443,20 @@ func getTableInfoAndCancelFaultJob(t *meta.Meta, job *model.Job, schemaID int64)
}

func checkTableExistAndCancelNonExistJob(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) {
tableID := job.TableID
tblInfo, err := getTableInfo(t, job.TableID, schemaID)
if err != nil {
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
job.State = model.JobStateCancelled
}
}
return tblInfo, err
}

func getTableInfo(t *meta.Meta, tableID, schemaID int64) (*model.TableInfo, error) {
// Check this table's database.
tblInfo, err := t.GetTable(schemaID, tableID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
job.State = model.JobStateCancelled
return nil, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", schemaID),
))
Expand All @@ -458,7 +466,6 @@ func checkTableExistAndCancelNonExistJob(t *meta.Meta, job *model.Job, schemaID

// Check the table.
if tblInfo == nil {
job.State = model.JobStateCancelled
return nil, errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", schemaID),
fmt.Sprintf("(Table ID %d)", tableID),
Expand Down
14 changes: 12 additions & 2 deletions ddl/table_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,20 @@ func unlockTables(t *meta.Meta, job *model.Job, arg *lockTablesArg) (ver int64,
}
job.SchemaID = arg.UnlockTables[arg.IndexOfUnlock].SchemaID
job.TableID = arg.UnlockTables[arg.IndexOfUnlock].TableID
tbInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
tbInfo, err := getTableInfo(t, job.TableID, job.SchemaID)
if err != nil {
return ver, err
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
err = nil
// The table maybe has been dropped. just ignore this err and go on.
arg.IndexOfUnlock++
job.Args = []interface{}{arg}
return ver, nil
} else {
job.State = model.JobStateCancelled
return ver, err
}
}

err = unlockTable(tbInfo, arg)
if err != nil {
job.State = model.JobStateCancelled
Expand Down
8 changes: 4 additions & 4 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ var (
// ErrTooManyKeyParts returns for too many key parts.
ErrTooManyKeyParts = terror.ClassSchema.New(codeTooManyKeyParts, "Too many key parts specified; max %d parts allowed")
// ErrTableNotLockedForWrite returns for write tables when only hold the table read lock.
ErrTableNotLockedForWrite = terror.ClassOptimizer.New(codeErrTableNotLockedForWrite, mysql.MySQLErrName[mysql.ErrTableNotLockedForWrite])
ErrTableNotLockedForWrite = terror.ClassSchema.New(codeErrTableNotLockedForWrite, mysql.MySQLErrName[mysql.ErrTableNotLockedForWrite])
// ErrTableNotLocked returns when session has explicitly lock tables, then visit unlocked table will return this error.
ErrTableNotLocked = terror.ClassOptimizer.New(codeErrTableNotLocked, mysql.MySQLErrName[mysql.ErrTableNotLocked])
ErrTableNotLocked = terror.ClassSchema.New(codeErrTableNotLocked, mysql.MySQLErrName[mysql.ErrTableNotLocked])
// ErrNonuniqTable returns when none unique tables errors.
ErrNonuniqTable = terror.ClassOptimizer.New(codeErrTableNotLocked, mysql.MySQLErrName[mysql.ErrNonuniqTable])
ErrNonuniqTable = terror.ClassSchema.New(codeErrTableNotLocked, mysql.MySQLErrName[mysql.ErrNonuniqTable])
// ErrTableLocked returns when the table was locked by other session.
ErrTableLocked = terror.ClassOptimizer.New(codeTableLocked, "Table '%s' was locked in %s by %v")
ErrTableLocked = terror.ClassSchema.New(codeTableLocked, "Table '%s' was locked in %s by %v")
)

// InfoSchema is the interface used to retrieve the schema information.
Expand Down

0 comments on commit 5b1a0ca

Please sign in to comment.