From f158c654466df83937e98c3da545df6aa11077c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Wed, 10 Jul 2024 13:43:33 +0800 Subject: [PATCH] table: provide some binlog related methods for binlog in `MutateContext` (#54433) ref pingcap/tidb#54392, ref pingcap/tidb#54397 --- pkg/lightning/backend/kv/session.go | 2 +- pkg/session/bootstrap_test.go | 2 +- pkg/session/session.go | 4 +- pkg/table/context/buffers.go | 13 ++++-- pkg/table/context/buffers_test.go | 25 ++++++---- pkg/table/context/table.go | 8 +++- pkg/table/contextimpl/BUILD.bazel | 18 +++++++- pkg/table/contextimpl/table.go | 22 +++++++-- pkg/table/contextimpl/table_test.go | 71 +++++++++++++++++++++++++++++ pkg/table/tables/BUILD.bazel | 1 + pkg/table/tables/tables.go | 61 ++++++++++++------------- pkg/util/mock/context.go | 15 ++++-- 12 files changed, 184 insertions(+), 58 deletions(-) create mode 100644 pkg/table/contextimpl/table_test.go diff --git a/pkg/lightning/backend/kv/session.go b/pkg/lightning/backend/kv/session.go index c0e6333c7a5aa..c3b296beb4bcf 100644 --- a/pkg/lightning/backend/kv/session.go +++ b/pkg/lightning/backend/kv/session.go @@ -363,7 +363,7 @@ func NewSession(options *encode.SessionOptions, logger log.Logger) *Session { Session: s, PlanCtxExtendedImpl: planctximpl.NewPlanCtxExtendedImpl(s), } - s.tblctx = tbctximpl.NewTableContextImpl(s, s.exprCtx) + s.tblctx = tbctximpl.NewTableContextImpl(s) s.txn.kvPairs = &Pairs{} return s diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index c617f8b83a1d1..646c3ae9fb390 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -156,7 +156,7 @@ func TestBootstrapWithError(t *testing.T) { } se.exprctx = contextsession.NewSessionExprContext(se) se.pctx = newPlanContextImpl(se) - se.tblctx = tbctximpl.NewTableContextImpl(se, se.exprctx) + se.tblctx = tbctximpl.NewTableContextImpl(se) globalVarsAccessor := variable.NewMockGlobalAccessor4Tests() se.GetSessionVars().GlobalVarsAccessor = globalVarsAccessor se.txn.init() diff --git a/pkg/session/session.go b/pkg/session/session.go index a34a19c35e3ec..143452024aae0 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3701,7 +3701,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { s.sessionVars = variable.NewSessionVars(s) s.exprctx = contextsession.NewSessionExprContext(s) s.pctx = newPlanContextImpl(s) - s.tblctx = tbctximpl.NewTableContextImpl(s, s.exprctx) + s.tblctx = tbctximpl.NewTableContextImpl(s) if opt != nil && opt.PreparedPlanCache != nil { s.sessionPlanCache = opt.PreparedPlanCache @@ -3764,7 +3764,7 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er } s.exprctx = contextsession.NewSessionExprContext(s) s.pctx = newPlanContextImpl(s) - s.tblctx = tbctximpl.NewTableContextImpl(s, s.exprctx) + s.tblctx = tbctximpl.NewTableContextImpl(s) s.mu.values = make(map[fmt.Stringer]any) s.lockedTables = make(map[int64]model.TableLockTpInfo) domain.BindDomain(s, dom) diff --git a/pkg/table/context/buffers.go b/pkg/table/context/buffers.go index 9a82bfe5a516a..956b950c618a5 100644 --- a/pkg/table/context/buffers.go +++ b/pkg/table/context/buffers.go @@ -80,10 +80,15 @@ func (b *EncodeRowBuffer) WriteMemBufferEncoded( return memBuffer.SetWithFlags(key, encoded, flags...) } -// GetColDataBuffer returns the buffer for column data. -// TODO: make sure the inner buffer is not used outside directly. -func (b *EncodeRowBuffer) GetColDataBuffer() ([]int64, []types.Datum) { - return b.colIDs, b.row +// EncodeBinlogRowData encodes the row data for binlog and returns the encoded row value. +// The returned slice is not referenced in the buffer, so you can cache and modify them freely. +func (b *EncodeRowBuffer) EncodeBinlogRowData(loc *time.Location, ec errctx.Context) ([]byte, error) { + value, err := tablecodec.EncodeOldRow(loc, b.row, b.colIDs, nil, nil) + err = ec.HandleError(err) + if err != nil { + return nil, err + } + return value, nil } // CheckRowBuffer is used to check row constraints diff --git a/pkg/table/context/buffers_test.go b/pkg/table/context/buffers_test.go index ec3e78ec95aad..92d4d7afbeb87 100644 --- a/pkg/table/context/buffers_test.go +++ b/pkg/table/context/buffers_test.go @@ -17,6 +17,7 @@ package context import ( "testing" "time" + "unsafe" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" @@ -97,6 +98,7 @@ func TestEncodeRow(t *testing.T) { oldFormat: true, }, } { + // test encode and write to mem buffer cfg := RowEncodingConfig{ RowEncoder: &rowcodec.Encoder{Enable: !c.oldFormat}, IsRowLevelChecksumEnabled: c.rowLevelChecksum, @@ -115,9 +117,11 @@ func TestEncodeRow(t *testing.T) { memBuffer := &mockMemBuffer{} if len(c.flags) == 0 { - memBuffer.On("Set", kv.Key("key1"), expectedVal).Return(nil).Once() + memBuffer.On("Set", kv.Key("key1"), expectedVal). + Return(nil).Once() } else { - memBuffer.On("SetWithFlags", kv.Key("key1"), expectedVal, c.flags).Return(nil).Once() + memBuffer.On("SetWithFlags", kv.Key("key1"), expectedVal, c.flags). + Return(nil).Once() } err = buffer.WriteMemBufferEncoded( cfg, c.loc, errctx.StrictNoWarningContext, @@ -125,9 +129,19 @@ func TestEncodeRow(t *testing.T) { ) require.NoError(t, err) memBuffer.AssertExpectations(t) - // the encoding result should be cached as a buffer require.Equal(t, expectedVal, buffer.writeStmtBufs.RowValBuf) + + // test encode val for binlog + expectedVal, err = + tablecodec.EncodeOldRow(c.loc, []types.Datum{d1, d2, d3}, []int64{1, 2, 3}, nil, nil) + require.NoError(t, err) + encoded, err := buffer.EncodeBinlogRowData(c.loc, errctx.StrictNoWarningContext) + require.NoError(t, err) + require.Equal(t, expectedVal, encoded) + // the encoded should not be referenced by any inner buffer + require.True(t, unsafe.SliceData(encoded) != unsafe.SliceData(buffer.writeStmtBufs.RowValBuf)) + require.True(t, unsafe.SliceData(encoded) != unsafe.SliceData(buffer.writeStmtBufs.IndexKeyBuf)) } } @@ -158,11 +172,6 @@ func TestEncodeBufferReserve(t *testing.T) { require.Equal(t, 4, len(buffer.writeStmtBufs.AddRowValues)) addRowValuesCap := cap(buffer.writeStmtBufs.AddRowValues) - // GetColDataBuffer should return the underlying buffer - colIDs, row := buffer.GetColDataBuffer() - require.Equal(t, buffer.colIDs, colIDs) - require.Equal(t, buffer.row, row) - // reset should not shrink the capacity buffer.Reset(2) require.Equal(t, 6, cap(buffer.colIDs)) diff --git a/pkg/table/context/table.go b/pkg/table/context/table.go index cb32a8fa03d9a..02619a2eb92fb 100644 --- a/pkg/table/context/table.go +++ b/pkg/table/context/table.go @@ -47,13 +47,17 @@ type MutateContext interface { // If the active parameter is true, call this function will wait for the pending txn // to become valid. Txn(active bool) (kv.Transaction, error) - // StmtGetMutation gets the binlog mutation for current statement. - StmtGetMutation(int64) *binlog.TableMutation + // BinlogEnabled returns whether the binlog is enabled. + BinlogEnabled() bool + // GetBinlogMutation returns a `binlog.TableMutation` object for a table. + GetBinlogMutation(tblID int64) *binlog.TableMutation // GetDomainInfoSchema returns the latest information schema in domain GetDomainInfoSchema() infoschema.MetaOnlyInfoSchema // TxnRecordTempTable record the temporary table to the current transaction. // This method will be called when the temporary table is modified or should allocate id in the transaction. TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable + // InRestrictedSQL returns whether the current context is used in restricted SQL. + InRestrictedSQL() bool // GetRowEncodingConfig returns the RowEncodingConfig. GetRowEncodingConfig() RowEncodingConfig // GetMutateBuffers returns the MutateBuffers, diff --git a/pkg/table/contextimpl/BUILD.bazel b/pkg/table/contextimpl/BUILD.bazel index 1a35972606361..7e512b82cf7e2 100644 --- a/pkg/table/contextimpl/BUILD.bazel +++ b/pkg/table/contextimpl/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "contextimpl", @@ -12,5 +12,21 @@ go_library( "//pkg/sessionctx/variable", "//pkg/table/context", "//pkg/util/tableutil", + "@com_github_pingcap_tipb//go-binlog", + ], +) + +go_test( + name = "contextimpl_test", + timeout = "short", + srcs = ["table_test.go"], + flaky = True, + deps = [ + ":contextimpl", + "//pkg/sessionctx/binloginfo", + "//pkg/testkit", + "//pkg/util/mock", + "@com_github_pingcap_tipb//go-binlog", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/table/contextimpl/table.go b/pkg/table/contextimpl/table.go index f064a65549808..3c3c073ddeed4 100644 --- a/pkg/table/contextimpl/table.go +++ b/pkg/table/contextimpl/table.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table/context" "github.com/pingcap/tidb/pkg/util/tableutil" + "github.com/pingcap/tipb/go-binlog" ) var _ context.MutateContext = &TableContextImpl{} @@ -29,7 +30,6 @@ var _ context.AllocatorContext = &TableContextImpl{} // TableContextImpl is used to provide context for table operations. type TableContextImpl struct { sessionctx.Context - exprCtx exprctx.ExprContext // mutateBuffers is a memory pool for table related memory allocation that aims to reuse memory // and saves allocation // The buffers are supposed to be used inside AddRecord/UpdateRecord/RemoveRecord. @@ -37,10 +37,9 @@ type TableContextImpl struct { } // NewTableContextImpl creates a new TableContextImpl. -func NewTableContextImpl(sctx sessionctx.Context, exprCtx exprctx.ExprContext) *TableContextImpl { +func NewTableContextImpl(sctx sessionctx.Context) *TableContextImpl { return &TableContextImpl{ Context: sctx, - exprCtx: exprCtx, mutateBuffers: context.NewMutateBuffers(sctx.GetSessionVars().GetWriteStmtBufs()), } } @@ -53,7 +52,22 @@ func (ctx *TableContextImpl) TxnRecordTempTable(tbl *model.TableInfo) tableutil. // GetExprCtx returns the ExprContext func (ctx *TableContextImpl) GetExprCtx() exprctx.ExprContext { - return ctx.exprCtx + return ctx.Context.GetExprCtx() +} + +// InRestrictedSQL returns whether the current context is used in restricted SQL. +func (ctx *TableContextImpl) InRestrictedSQL() bool { + return ctx.vars().StmtCtx.InRestrictedSQL +} + +// BinlogEnabled returns whether the binlog is enabled. +func (ctx *TableContextImpl) BinlogEnabled() bool { + return ctx.vars().BinlogClient != nil +} + +// GetBinlogMutation returns a `binlog.TableMutation` object for a table. +func (ctx *TableContextImpl) GetBinlogMutation(tblID int64) *binlog.TableMutation { + return ctx.Context.StmtGetMutation(tblID) } // GetRowEncodingConfig returns the RowEncodingConfig. diff --git a/pkg/table/contextimpl/table_test.go b/pkg/table/contextimpl/table_test.go new file mode 100644 index 0000000000000..bad7845dbdbf9 --- /dev/null +++ b/pkg/table/contextimpl/table_test.go @@ -0,0 +1,71 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package contextimpl_test + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/sessionctx/binloginfo" + "github.com/pingcap/tidb/pkg/table/contextimpl" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/mock" + "github.com/pingcap/tipb/go-binlog" + "github.com/stretchr/testify/require" +) + +func TestMutateContextImplFields(t *testing.T) { + sctx := mock.NewContext() + sctx.Mutations = make(map[int64]*binlog.TableMutation) + ctx := contextimpl.NewTableContextImpl(sctx) + // expression + require.True(t, sctx.GetExprCtx() == ctx.GetExprCtx()) + // binlog + sctx.GetSessionVars().BinlogClient = nil + require.False(t, ctx.BinlogEnabled()) + sctx.GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&testkit.MockPumpClient{}) + require.True(t, ctx.BinlogEnabled()) + binlogMutation := ctx.GetBinlogMutation(1234) + require.NotNil(t, binlogMutation) + require.Same(t, sctx.StmtGetMutation(1234), binlogMutation) + // restricted SQL + sctx.GetSessionVars().StmtCtx.InRestrictedSQL = false + require.False(t, ctx.InRestrictedSQL()) + sctx.GetSessionVars().StmtCtx.InRestrictedSQL = true + require.True(t, ctx.InRestrictedSQL()) + // encoding config + sctx.GetSessionVars().EnableRowLevelChecksum = true + sctx.GetSessionVars().RowEncoder.Enable = true + sctx.GetSessionVars().InRestrictedSQL = false + cfg := ctx.GetRowEncodingConfig() + require.True(t, cfg.IsRowLevelChecksumEnabled) + require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled) + require.True(t, cfg.IsRowLevelChecksumEnabled) + require.Same(t, &sctx.GetSessionVars().RowEncoder, cfg.RowEncoder) + sctx.GetSessionVars().RowEncoder.Enable = false + cfg = ctx.GetRowEncodingConfig() + require.False(t, cfg.IsRowLevelChecksumEnabled) + require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled) + require.Same(t, &sctx.GetSessionVars().RowEncoder, cfg.RowEncoder) + require.False(t, cfg.IsRowLevelChecksumEnabled) + sctx.GetSessionVars().RowEncoder.Enable = true + sctx.GetSessionVars().InRestrictedSQL = true + require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled) + require.False(t, cfg.IsRowLevelChecksumEnabled) + sctx.GetSessionVars().InRestrictedSQL = false + sctx.GetSessionVars().EnableRowLevelChecksum = false + require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled) + // mutate buffers + require.NotNil(t, ctx.GetMutateBuffers()) +} diff --git a/pkg/table/tables/BUILD.bazel b/pkg/table/tables/BUILD.bazel index c0c5c70ebc5fd..61069d42ee6ec 100644 --- a/pkg/table/tables/BUILD.bazel +++ b/pkg/table/tables/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//pkg/sessionctx/variable", "//pkg/statistics", "//pkg/table", + "//pkg/table/context", "//pkg/tablecodec", "//pkg/types", "//pkg/util", diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index 28324551e2f7a..41254ac2292fc 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/table" + tbctx "github.com/pingcap/tidb/pkg/table/context" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" @@ -520,7 +521,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext mutateBuffers := sctx.GetMutateBuffers() encodeRowBuffer := mutateBuffers.GetEncodeRowBufferWithCap(numColsCap) checkRowBuffer := mutateBuffers.GetCheckRowBufferWithCap(numColsCap) - if shouldWriteBinlog(sctx.GetSessionVars(), t.meta) { + if shouldWriteBinlog(sctx, t.meta) { binlogColIDs = make([]int64, 0, numColsCap) binlogOldRow = make([]types.Datum, 0, numColsCap) binlogNewRow = make([]types.Datum, 0, numColsCap) @@ -563,7 +564,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext encodeRowBuffer.AddColVal(col.ID, value) } checkRowBuffer.AddColVal(value) - if shouldWriteBinlog(sctx.GetSessionVars(), t.meta) && !t.canSkipUpdateBinlog(col, value) { + if shouldWriteBinlog(sctx, t.meta) && !t.canSkipUpdateBinlog(col, value) { binlogColIDs = append(binlogColIDs, col.ID) binlogOldRow = append(binlogOldRow, oldData[col.Offset]) binlogNewRow = append(binlogNewRow, value) @@ -631,7 +632,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext } memBuffer.Release(sh) - if shouldWriteBinlog(sctx.GetSessionVars(), t.meta) { + if shouldWriteBinlog(sctx, t.meta) { if !t.meta.PKIsHandle && !t.meta.IsCommonHandle { binlogColIDs = append(binlogColIDs, model.ExtraHandleID) binlogOldRow = append(binlogOldRow, types.NewIntDatum(h.IntValue())) @@ -899,10 +900,6 @@ func (t *TableCommon) AddRecord(sctx table.MutateContext, r []types.Datum, opts } } - var ( - binlogColIDs []int64 - binlogRow []types.Datum - ) // a reusable buffer to save malloc // Note: The buffer should not be referenced or modified outside this function. // It can only act as a temporary buffer for the current function call. @@ -1056,10 +1053,9 @@ func (t *TableCommon) AddRecord(sctx table.MutateContext, r []types.Datum, opts memBuffer.Release(sh) - if shouldWriteBinlog(sctx.GetSessionVars(), t.meta) { + if shouldWriteBinlog(sctx, t.meta) { // For insert, TiDB and Binlog can use same row and schema. - binlogColIDs, binlogRow = encodeRowBuffer.GetColDataBuffer() - err = t.addInsertBinlog(sctx, recordID, binlogRow, binlogColIDs) + err = t.addInsertBinlog(sctx, recordID, encodeRowBuffer) if err != nil { return nil, err } @@ -1320,7 +1316,7 @@ func (t *TableCommon) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []typ } memBuffer.Release(sh) - if shouldWriteBinlog(ctx.GetSessionVars(), t.meta) { + if shouldWriteBinlog(ctx, t.meta) { cols := t.DeletableCols() colIDs := make([]int64, 0, len(cols)+1) for _, col := range cols { @@ -1362,53 +1358,58 @@ func (t *TableCommon) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []typ return err } -func (t *TableCommon) addInsertBinlog(ctx table.MutateContext, h kv.Handle, row []types.Datum, colIDs []int64) error { - mutation := t.getMutation(ctx) +func (t *TableCommon) addInsertBinlog(ctx table.MutateContext, h kv.Handle, encodeRowBuffer *tbctx.EncodeRowBuffer) error { + evalCtx := ctx.GetExprCtx().GetEvalCtx() + loc, ec := evalCtx.Location(), evalCtx.ErrCtx() handleData, err := h.Data() if err != nil { return err } - pk, err := codec.EncodeValue(ctx.GetSessionVars().StmtCtx.TimeZone(), nil, handleData...) - err = ctx.GetSessionVars().StmtCtx.HandleError(err) + pk, err := codec.EncodeValue(loc, nil, handleData...) + err = ec.HandleError(err) if err != nil { return err } - value, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), row, colIDs, nil, nil) - err = ctx.GetSessionVars().StmtCtx.HandleError(err) + value, err := encodeRowBuffer.EncodeBinlogRowData(loc, ec) if err != nil { return err } bin := append(pk, value...) + mutation := ctx.GetBinlogMutation(t.tableID) mutation.InsertedRows = append(mutation.InsertedRows, bin) mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Insert) return nil } func (t *TableCommon) addUpdateBinlog(ctx table.MutateContext, oldRow, newRow []types.Datum, colIDs []int64) error { - old, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), oldRow, colIDs, nil, nil) - err = ctx.GetSessionVars().StmtCtx.HandleError(err) + evalCtx := ctx.GetExprCtx().GetEvalCtx() + loc, ec := evalCtx.Location(), evalCtx.ErrCtx() + old, err := tablecodec.EncodeOldRow(loc, oldRow, colIDs, nil, nil) + err = ec.HandleError(err) if err != nil { return err } - newVal, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), newRow, colIDs, nil, nil) - err = ctx.GetSessionVars().StmtCtx.HandleError(err) + newVal, err := tablecodec.EncodeOldRow(loc, newRow, colIDs, nil, nil) + err = ec.HandleError(err) if err != nil { return err } bin := append(old, newVal...) - mutation := t.getMutation(ctx) + mutation := ctx.GetBinlogMutation(t.tableID) mutation.UpdatedRows = append(mutation.UpdatedRows, bin) mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Update) return nil } func (t *TableCommon) addDeleteBinlog(ctx table.MutateContext, r []types.Datum, colIDs []int64) error { - data, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), r, colIDs, nil, nil) - err = ctx.GetSessionVars().StmtCtx.HandleError(err) + evalCtx := ctx.GetExprCtx().GetEvalCtx() + loc, ec := evalCtx.Location(), evalCtx.ErrCtx() + data, err := tablecodec.EncodeOldRow(loc, r, colIDs, nil, nil) + err = ec.HandleError(err) if err != nil { return err } - mutation := t.getMutation(ctx) + mutation := ctx.GetBinlogMutation(t.tableID) mutation.DeletedRows = append(mutation.DeletedRows, data) mutation.Sequence = append(mutation.Sequence, binlog.MutationType_DeleteRow) return nil @@ -1723,23 +1724,19 @@ func (t *TableCommon) Type() table.Type { return table.NormalTable } -func shouldWriteBinlog(vars *variable.SessionVars, tblInfo *model.TableInfo) bool { +func shouldWriteBinlog(ctx table.MutateContext, tblInfo *model.TableInfo) bool { failpoint.Inject("forceWriteBinlog", func() { // Just to cover binlog related code in this package, since the `BinlogClient` is // still nil, mutations won't be written to pump on commit. failpoint.Return(true) }) - if vars.BinlogClient == nil { + if !ctx.BinlogEnabled() { return false } if tblInfo.TempTableType != model.TempTableNone { return false } - return !vars.InRestrictedSQL -} - -func (t *TableCommon) getMutation(ctx table.MutateContext) *binlog.TableMutation { - return ctx.StmtGetMutation(t.tableID) + return !ctx.InRestrictedSQL() } func (t *TableCommon) canSkip(col *table.Column, value *types.Datum) bool { diff --git a/pkg/util/mock/context.go b/pkg/util/mock/context.go index 5da3a4ca67efd..c1dfe7aaf1e9a 100644 --- a/pkg/util/mock/context.go +++ b/pkg/util/mock/context.go @@ -68,6 +68,7 @@ type Context struct { sm util.SessionManager is infoschema.MetaOnlyInfoSchema values map[fmt.Stringer]any + Mutations map[int64]*binlog.TableMutation sessionVars *variable.SessionVars tblctx *tbctximpl.TableContextImpl cancel context.CancelFunc @@ -480,8 +481,16 @@ func (*Context) StmtCommit(context.Context) {} func (*Context) StmtRollback(context.Context, bool) {} // StmtGetMutation implements the sessionctx.Context interface. -func (*Context) StmtGetMutation(_ int64) *binlog.TableMutation { - return nil +func (c *Context) StmtGetMutation(tblID int64) *binlog.TableMutation { + if c.Mutations == nil { + return nil + } + m, ok := c.Mutations[tblID] + if !ok { + m = &binlog.TableMutation{} + c.Mutations[tblID] = m + } + return m } // AddTableLock implements the sessionctx.Context interface. @@ -624,7 +633,7 @@ func NewContext() *Context { vars := variable.NewSessionVars(sctx) sctx.sessionVars = vars sctx.SessionExprContext = exprctximpl.NewSessionExprContext(sctx) - sctx.tblctx = tbctximpl.NewTableContextImpl(sctx, sctx) + sctx.tblctx = tbctximpl.NewTableContextImpl(sctx) vars.InitChunkSize = 2 vars.MaxChunkSize = 32 vars.TimeZone = time.UTC