From fb678650e0e51f7fd2475576ad3123e8a1eb766f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 5 Aug 2024 11:45:38 +0800 Subject: [PATCH] executor: sync deletable columns to binlog when remove record (#53617) (#54942) close pingcap/tidb#53133 --- pkg/ddl/column.go | 2 ++ pkg/executor/BUILD.bazel | 1 + pkg/executor/executor_txn_test.go | 40 ++++++++++++++++++++++++ pkg/table/tables/tables.go | 2 +- pkg/testkit/testfailpoint/BUILD.bazel | 12 +++++++ pkg/testkit/testfailpoint/failpoint.go | 43 ++++++++++++++++++++++++++ 6 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 pkg/testkit/testfailpoint/BUILD.bazel create mode 100644 pkg/testkit/testfailpoint/failpoint.go diff --git a/pkg/ddl/column.go b/pkg/ddl/column.go index a7022300c66c5..6871ea0b398fa 100644 --- a/pkg/ddl/column.go +++ b/pkg/ddl/column.go @@ -171,6 +171,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) case model.StateWriteReorganization: // reorganization -> public // Adjust table column offset. + failpoint.InjectCall("onAddColumnStateWriteReorg") offset, err := LocateOffsetToMove(columnInfo.Offset, pos, tblInfo) if err != nil { return ver, errors.Trace(err) @@ -270,6 +271,7 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) } case model.StateWriteOnly: // write only -> delete only + failpoint.InjectCall("onDropColumnStateWriteOnly") colInfo.State = model.StateDeleteOnly tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1) if len(idxInfos) > 0 { diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index db95e1ac3b8e3..c757b84db0b6d 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -433,6 +433,7 @@ go_test( "//pkg/testkit", "//pkg/testkit/external", "//pkg/testkit/testdata", + "//pkg/testkit/testfailpoint", "//pkg/testkit/testmain", "//pkg/testkit/testsetup", "//pkg/types", diff --git a/pkg/executor/executor_txn_test.go b/pkg/executor/executor_txn_test.go index bf63b8d072f8c..76f5773ceca7e 100644 --- a/pkg/executor/executor_txn_test.go +++ b/pkg/executor/executor_txn_test.go @@ -18,12 +18,15 @@ import ( "fmt" "strconv" "strings" + "sync" "testing" "time" + "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/executor" "github.com/pingcap/tidb/pkg/sessionctx/binloginfo" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/stretchr/testify/require" ) @@ -694,3 +697,40 @@ func TestSavepointWithBinlog(t *testing.T) { tk.MustExec("commit") tk.MustQuery("select * from t").Check(testkit.Rows("1 1")) } + +func TestColumnNotMatchError(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.Session().GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&testkit.MockPumpClient{}) + tk.MustExec("set @@global.tidb_enable_metadata_lock=0") + tk.MustExec("use test") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk.MustExec("create table t(id int primary key, a int)") + tk.MustExec("insert into t values(1, 2)") + + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onAddColumnStateWriteReorg", func() { + tk.MustExec("begin;") + }) + var wg sync.WaitGroup + wg.Add(1) + go func() { + tk2.MustExec("alter table t add column wait_notify int") + wg.Done() + }() + wg.Wait() + tk.MustExec("delete from t where id=1") + tk.MustGetErrCode("commit", errno.ErrInfoSchemaChanged) + + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onDropColumnStateWriteOnly", func() { + tk.MustExec("begin;") + }) + wg.Add(1) + go func() { + tk2.MustExec("alter table t drop column wait_notify") + wg.Done() + }() + wg.Wait() + tk.MustExec("delete from t where id=1") + tk.MustGetErrCode("commit", errno.ErrInfoSchemaChanged) +} diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index 38b564b0397a4..f7761787f453e 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -1368,7 +1368,7 @@ func (t *TableCommon) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []typ memBuffer.Release(sh) if shouldWriteBinlog(ctx.GetSessionVars(), t.meta) { - cols := t.Cols() + cols := t.DeletableCols() colIDs := make([]int64, 0, len(cols)+1) for _, col := range cols { colIDs = append(colIDs, col.ID) diff --git a/pkg/testkit/testfailpoint/BUILD.bazel b/pkg/testkit/testfailpoint/BUILD.bazel new file mode 100644 index 0000000000000..16c1fd1d485bc --- /dev/null +++ b/pkg/testkit/testfailpoint/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "testfailpoint", + srcs = ["failpoint.go"], + importpath = "github.com/pingcap/tidb/pkg/testkit/testfailpoint", + visibility = ["//visibility:public"], + deps = [ + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/testkit/testfailpoint/failpoint.go b/pkg/testkit/testfailpoint/failpoint.go new file mode 100644 index 0000000000000..381b53c129c23 --- /dev/null +++ b/pkg/testkit/testfailpoint/failpoint.go @@ -0,0 +1,43 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testfailpoint + +import ( + "testing" + + "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" +) + +// Enable enables fail-point, and disable it when test finished. +func Enable(t testing.TB, name, expr string) { + require.NoError(t, failpoint.Enable(name, expr)) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable(name)) + }) +} + +// EnableCall enables fail-point, and disable it when test finished. +func EnableCall(t testing.TB, name string, fn any) { + require.NoError(t, failpoint.EnableCall(name, fn)) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable(name)) + }) +} + +// Disable disables fail-point. +func Disable(t testing.TB, name string) { + require.NoError(t, failpoint.Disable(name)) +}