Skip to content

Commit 956e83d

Browse files
authored
executor: sync deletable columns to binlog when remove record (#53617) (#53700)
close #53133
1 parent 236b1b7 commit 956e83d

File tree

3 files changed

+63
-1
lines changed

3 files changed

+63
-1
lines changed

ddl/column.go

+12
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
169169
case model.StateWriteReorganization:
170170
// reorganization -> public
171171
// Adjust table column offset.
172+
failpoint.Inject("onAddColumnStateWriteReorg", func() {
173+
OnAddColumnStateWriteReorgForTest()
174+
})
172175
offset, err := LocateOffsetToMove(columnInfo.Offset, pos, tblInfo)
173176
if err != nil {
174177
return ver, errors.Trace(err)
@@ -190,6 +193,9 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
190193
return ver, errors.Trace(err)
191194
}
192195

196+
// OnAddColumnStateWriteReorgForTest is only used for test.
197+
var OnAddColumnStateWriteReorgForTest func()
198+
193199
// CheckAfterPositionExists makes sure the column specified in AFTER clause is exists.
194200
// For example, ALTER TABLE t ADD COLUMN c3 INT AFTER c1.
195201
func CheckAfterPositionExists(tblInfo *model.TableInfo, pos *ast.ColumnPosition) error {
@@ -263,6 +269,9 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
263269
}
264270
case model.StateWriteOnly:
265271
// write only -> delete only
272+
failpoint.Inject("onDropColumnStateWriteOnly", func() {
273+
OnDropColumnStateWriteOnlyForTest()
274+
})
266275
colInfo.State = model.StateDeleteOnly
267276
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
268277
if len(idxInfos) > 0 {
@@ -313,6 +322,9 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
313322
return ver, errors.Trace(err)
314323
}
315324

325+
// OnDropColumnStateWriteOnlyForTest is only used for test.
326+
var OnDropColumnStateWriteOnlyForTest func()
327+
316328
func checkDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, bool /* ifExists */, error) {
317329
schemaID := job.SchemaID
318330
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)

executor/executor_txn_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ import (
1919
"fmt"
2020
"strconv"
2121
"strings"
22+
"sync"
2223
"testing"
2324
"time"
2425

26+
"github.com/pingcap/failpoint"
27+
"github.com/pingcap/tidb/ddl"
28+
"github.com/pingcap/tidb/errno"
2529
"github.com/pingcap/tidb/executor"
2630
"github.com/pingcap/tidb/sessionctx/binloginfo"
2731
"github.com/pingcap/tidb/testkit"
@@ -798,3 +802,48 @@ func (m mockPumpClient) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogR
798802
func (m mockPumpClient) PullBinlogs(ctx context.Context, in *binlog.PullBinlogReq, opts ...grpc.CallOption) (binlog.Pump_PullBinlogsClient, error) {
799803
return nil, nil
800804
}
805+
806+
func TestColumnNotMatchError(t *testing.T) {
807+
store := testkit.CreateMockStore(t)
808+
tk := testkit.NewTestKit(t, store)
809+
tk.Session().GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&mockPumpClient{})
810+
tk.MustExec("set @@global.tidb_enable_metadata_lock=0")
811+
tk.MustExec("use test")
812+
tk2 := testkit.NewTestKit(t, store)
813+
tk2.MustExec("use test")
814+
tk.MustExec("create table t(id int primary key, a int)")
815+
tk.MustExec("insert into t values(1, 2)")
816+
817+
ddl.OnAddColumnStateWriteReorgForTest = func() {
818+
tk.MustExec("begin;")
819+
}
820+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/onAddColumnStateWriteReorg", "return"))
821+
defer func() {
822+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/onAddColumnStateWriteReorg"))
823+
}()
824+
var wg sync.WaitGroup
825+
wg.Add(1)
826+
go func() {
827+
tk2.MustExec("alter table t add column wait_notify int")
828+
wg.Done()
829+
}()
830+
wg.Wait()
831+
tk.MustExec("delete from t where id=1")
832+
tk.MustGetErrCode("commit", errno.ErrInfoSchemaChanged)
833+
834+
ddl.OnDropColumnStateWriteOnlyForTest = func() {
835+
tk.MustExec("begin;")
836+
}
837+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/onDropColumnStateWriteOnly", "return"))
838+
defer func() {
839+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/onDropColumnStateWriteOnly"))
840+
}()
841+
wg.Add(1)
842+
go func() {
843+
tk2.MustExec("alter table t drop column wait_notify")
844+
wg.Done()
845+
}()
846+
wg.Wait()
847+
tk.MustExec("delete from t where id=1")
848+
tk.MustGetErrCode("commit", errno.ErrInfoSchemaChanged)
849+
}

table/tables/tables.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1175,7 +1175,8 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type
11751175
memBuffer.Release(sh)
11761176

11771177
if shouldWriteBinlog(ctx, t.meta) {
1178-
cols := t.Cols()
1178+
cols := t.DeletableCols()
1179+
11791180
colIDs := make([]int64, 0, len(cols)+1)
11801181
for _, col := range cols {
11811182
colIDs = append(colIDs, col.ID)

0 commit comments

Comments
 (0)