Skip to content

Commit

Permalink
*: fix check constraintInfo state change (#44455)
Browse files Browse the repository at this point in the history
ref #41711
  • Loading branch information
fzzf678 authored Jun 16, 2023
1 parent 7a29bec commit e7bcdbc
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 21 deletions.
63 changes: 45 additions & 18 deletions ddl/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/sqlexec"
)
Expand Down Expand Up @@ -86,12 +87,14 @@ func (w *worker) onAddCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (
originalState := constraintInfoInMeta.State
switch constraintInfoInMeta.State {
case model.StateNone:
// none -> write only
job.SchemaState = model.StateWriteOnly
constraintInfoInMeta.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfoInMeta.State)
case model.StateWriteOnly:
// write only -> public
job.SchemaState = model.StateWriteReorganization
constraintInfoInMeta.State = model.StateWriteReorganization
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfoInMeta.State)
case model.StateWriteReorganization:
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfoInMeta, job)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -151,12 +154,10 @@ func onDropCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
originalState := constraintInfo.State
switch constraintInfo.State {
case model.StatePublic:
// public -> write only
job.SchemaState = model.StateWriteOnly
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State)
case model.StateWriteOnly:
// write only -> None
// write only state constraint will still take effect to check the newly inserted data.
// So the dependent column shouldn't be dropped even in this intermediate state.
constraintInfo.State = model.StateNone
Expand Down Expand Up @@ -205,29 +206,50 @@ func checkDropCheckConstraint(t *meta.Meta, job *model.Job) (*model.TableInfo, *
return tblInfo, constraintInfo, nil
}

func (w *worker) onAlterCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
func (w *worker) onAlterCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
dbInfo, tblInfo, constraintInfo, enforced, err := checkAlterCheckConstraint(t, job)
if err != nil {
return ver, errors.Trace(err)
}

// enforced will fetch table data and check the constraint.
if constraintInfo.Enforced != enforced && enforced {
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfo, job)
if enforced {
originalState := constraintInfo.State
switch constraintInfo.State {
case model.StatePublic:
job.SchemaState = model.StateWriteReorganization
constraintInfo.State = model.StateWriteReorganization
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State)
case model.StateWriteReorganization:
job.SchemaState = model.StateWriteOnly
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State)
case model.StateWriteOnly:
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfo, job)
if err != nil {
if !table.ErrCheckConstraintViolated.Equal(err) {
return ver, errors.Trace(err)
}
constraintInfo.Enforced = !enforced
}
constraintInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
}
} else {
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true)
if err != nil {
// check constraint error will cancel the job, job state has been changed
// to cancelled in addTableCheckConstraint.
// update version and tableInfo error will cause retry.
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
}
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true)
if err != nil {
// update version and tableInfo error will cause retry.
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
return ver, err
}

func checkAlterCheckConstraint(t *meta.Meta, job *model.Job) (*model.DBInfo, *model.TableInfo, *model.ConstraintInfo, bool, error) {
Expand All @@ -250,7 +272,6 @@ func checkAlterCheckConstraint(t *meta.Meta, job *model.Job) (*model.DBInfo, *mo
job.State = model.JobStateCancelled
return nil, nil, nil, false, errors.Trace(err)
}

// do the double check with constraint existence.
constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L)
if constraintInfo == nil {
Expand Down Expand Up @@ -316,6 +337,12 @@ func findDependentColsInExpr(expr ast.ExprNode) map[string]struct{} {
}

func (w *worker) verifyRemainRecordsForCheckConstraint(dbInfo *model.DBInfo, tableInfo *model.TableInfo, constr *model.ConstraintInfo, job *model.Job) error {
// Inject a fail-point to skip the remaining records check.
failpoint.Inject("mockVerifyRemainDataSuccess", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil)
}
})
// Get sessionctx from ddl context resource pool in ddl worker.
var sctx sessionctx.Context
sctx, err := w.sessPool.Get()
Expand Down
221 changes: 218 additions & 3 deletions ddl/constraint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
"sort"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/util/callback"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -893,10 +896,7 @@ func TestAlterConstraintAddDrop(t *testing.T) {
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteOnly {
// StateNone -> StateWriteOnly -> StatePublic
// Node in StateWriteOnly and StatePublic should check the constraint check.
_, checkErr = tk1.Exec("insert into t (a, b) values(5,6) ")
// Don't do the assert in the callback function.
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
Expand All @@ -908,3 +908,218 @@ func TestAlterConstraintAddDrop(t *testing.T) {
require.Errorf(t, err, "[table:3819]Check constraint 'cc' is violated.")
tk.MustExec("drop table if exists t")
}

func TestAlterAddConstraintStateChange(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("insert into t values(12)")

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
onJobUpdatedExportedFunc := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteReorganization {
tk1.MustQuery(fmt.Sprintf("select count(1) from `%s`.`%s` where not %s limit 1", "test", "t", "a > 10")).Check(testkit.Rows("0"))
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
tableCommon, ok := constraintTable.(*tables.TableCommon)
require.True(t, ok)
originCons := tableCommon.Constraints
tableCommon.WritableConstraints = []*table.Constraint{}
tableCommon.Constraints = []*table.Constraint{}
// insert data
tk1.MustExec("insert into t values(1)")
// recover
tableCommon.Constraints = originCons
tableCommon.WritableConstraint()
}
}

//StatNone StateWriteReorganization
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockVerifyRemainDataSuccess", "return(true)"))
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)
tk.MustExec("alter table t add constraint c0 check ( a > 10)")
tk.MustQuery("select * from t").Check(testkit.Rows("12", "1"))
tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL,\nCONSTRAINT `c0` CHECK ((`a` > 10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("alter table t drop constraint c0")
tk.MustExec("delete from t where a = 1")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockVerifyRemainDataSuccess"))
}

func TestAlterAddConstraintStateChange1(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("insert into t values(12)")

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StatNone -> StateWriteOnly
onJobUpdatedExportedFunc1 := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteOnly {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
tableCommon, ok := constraintTable.(*tables.TableCommon)
require.True(t, ok)
originCons := tableCommon.Constraints
tableCommon.WritableConstraints = []*table.Constraint{}
tableCommon.Constraints = []*table.Constraint{}
// insert data
tk1.MustExec("insert into t values(1)")
// recover
tableCommon.Constraints = originCons
tableCommon.WritableConstraint()
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc1)
d.SetHook(callback)
tk.MustGetErrMsg("alter table t add constraint c1 check ( a > 10)", "[ddl:3819]Check constraint 'c1' is violated.")
tk.MustQuery("select * from t").Check(testkit.Rows("12", "1"))
tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("delete from t where a = 1")
}

func TestAlterAddConstraintStateChange2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("insert into t values(12)")

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StateWriteOnly -> StateWriteReorganization
onJobUpdatedExportedFunc2 := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteReorganization {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
tableCommon, ok := constraintTable.(*tables.TableCommon)
require.True(t, ok)
tableCommon.Constraints[0].State = model.StateWriteOnly
tableCommon.WritableConstraints = []*table.Constraint{}
// insert data
tk1.MustGetErrMsg("insert into t values(1)", "[table:3819]Check constraint 'c2' is violated.")
// recover
tableCommon.Constraints[0].State = model.StateWriteReorganization
tableCommon.WritableConstraint()
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc2)
d.SetHook(callback)
tk.MustExec("alter table t add constraint c2 check ( a > 10)")
tk.MustQuery("select * from t").Check(testkit.Rows("12"))
tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL,\nCONSTRAINT `c2` CHECK ((`a` > 10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("alter table t drop constraint c2")
}

func TestAlterAddConstraintStateChange3(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("insert into t values(12)")

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StateWriteReorganization -> StatePublic
onJobUpdatedExportedFunc3 := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StatePublic {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
tableCommon, ok := constraintTable.(*tables.TableCommon)
require.True(t, ok)
tableCommon.Constraints[0].State = model.StateWriteReorganization
tableCommon.WritableConstraints = []*table.Constraint{}
// insert data
tk1.MustGetErrMsg("insert into t values(1)", "[table:3819]Check constraint 'c3' is violated.")
// recover
tableCommon.Constraints[0].State = model.StatePublic
tableCommon.WritableConstraint()
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc3)
d.SetHook(callback)
tk.MustExec("alter table t add constraint c3 check ( a > 10)")
tk.MustQuery("select * from t").Check(testkit.Rows("12"))
tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL,\nCONSTRAINT `c3` CHECK ((`a` > 10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}

func TestAlterEnforcedConstraintStateChange(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, constraint c1 check (a > 10) not enforced)")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("insert into t values(12)")

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StateWriteReorganization -> StatePublic
onJobUpdatedExportedFunc3 := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteReorganization {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
tableCommon, ok := constraintTable.(*tables.TableCommon)
require.True(t, ok)
tableCommon.Constraints[0].State = model.StateWriteOnly
tableCommon.WritableConstraints = []*table.Constraint{}
// insert data
tk1.MustGetErrMsg("insert into t values(1)", "[table:3819]Check constraint 'c1' is violated.")
// recover
tableCommon.Constraints[0].State = model.StateWriteReorganization
tableCommon.WritableConstraint()
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc3)
d.SetHook(callback)
tk.MustExec("alter table t alter constraint c1 enforced")
tk.MustQuery("select * from t").Check(testkit.Rows("12"))
}

0 comments on commit e7bcdbc

Please sign in to comment.