Skip to content

Commit

Permalink
executor: sync deletable columns to binlog when remove record (#53617) (
Browse files Browse the repository at this point in the history
#56887)

close #53133
  • Loading branch information
ti-chi-bot authored Nov 12, 2024
1 parent dbe64fa commit d36af8f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 1 deletion.
12 changes: 12 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offset.
failpoint.Inject("onAddColumnStateWriteReorg", func() {
OnAddColumnStateWriteReorgForTest()
})
offset, err := LocateOffsetToMove(columnInfo.Offset, pos, tblInfo)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -192,6 +195,9 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}

// OnAddColumnStateWriteReorgForTest is only used for test.
var OnAddColumnStateWriteReorgForTest func()

// CheckAfterPositionExists makes sure the column specified in AFTER clause is exists.
// For example, ALTER TABLE t ADD COLUMN c3 INT AFTER c1.
func CheckAfterPositionExists(tblInfo *model.TableInfo, pos *ast.ColumnPosition) error {
Expand Down Expand Up @@ -265,6 +271,9 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
}
case model.StateWriteOnly:
// write only -> delete only
failpoint.Inject("onDropColumnStateWriteOnly", func() {
OnDropColumnStateWriteOnlyForTest()
})
colInfo.State = model.StateDeleteOnly
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
if len(idxInfos) > 0 {
Expand Down Expand Up @@ -315,6 +324,9 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}

// OnDropColumnStateWriteOnlyForTest is only used for test.
var OnDropColumnStateWriteOnlyForTest func()

func checkDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, bool /* ifExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
Expand Down
48 changes: 48 additions & 0 deletions executor/executor_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -800,6 +803,51 @@ func (m mockPumpClient) PullBinlogs(ctx context.Context, in *binlog.PullBinlogRe
return nil, nil
}

func TestColumnNotMatchError(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.Session().GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&mockPumpClient{})
tk.MustExec("set @@global.tidb_enable_metadata_lock=0")
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("create table t(id int primary key, a int)")
tk.MustExec("insert into t values(1, 2)")

ddl.OnAddColumnStateWriteReorgForTest = func() {
tk.MustExec("begin;")
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/onAddColumnStateWriteReorg", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/onAddColumnStateWriteReorg"))
}()
var wg sync.WaitGroup
wg.Add(1)
go func() {
tk2.MustExec("alter table t add column wait_notify int")
wg.Done()
}()
wg.Wait()
tk.MustExec("delete from t where id=1")
tk.MustGetErrCode("commit", errno.ErrInfoSchemaChanged)

ddl.OnDropColumnStateWriteOnlyForTest = func() {
tk.MustExec("begin;")
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/onDropColumnStateWriteOnly", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/onDropColumnStateWriteOnly"))
}()
wg.Add(1)
go func() {
tk2.MustExec("alter table t drop column wait_notify")
wg.Done()
}()
wg.Wait()
tk.MustExec("delete from t where id=1")
tk.MustGetErrCode("commit", errno.ErrInfoSchemaChanged)
}

func TestInnodbLockWaitTimeout(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
2 changes: 1 addition & 1 deletion table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type
memBuffer.Release(sh)

if shouldWriteBinlog(ctx, t.meta) {
cols := t.Cols()
cols := t.DeletableCols()
colIDs := make([]int64, 0, len(cols)+1)
for _, col := range cols {
colIDs = append(colIDs, col.ID)
Expand Down

0 comments on commit d36af8f

Please sign in to comment.