Skip to content

Commit

Permalink
table: provide some binlog related methods for binlog in `MutateConte…
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Jul 10, 2024
1 parent 53dcc79 commit f158c65
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func NewSession(options *encode.SessionOptions, logger log.Logger) *Session {
Session: s,
PlanCtxExtendedImpl: planctximpl.NewPlanCtxExtendedImpl(s),
}
s.tblctx = tbctximpl.NewTableContextImpl(s, s.exprCtx)
s.tblctx = tbctximpl.NewTableContextImpl(s)
s.txn.kvPairs = &Pairs{}

return s
Expand Down
2 changes: 1 addition & 1 deletion pkg/session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestBootstrapWithError(t *testing.T) {
}
se.exprctx = contextsession.NewSessionExprContext(se)
se.pctx = newPlanContextImpl(se)
se.tblctx = tbctximpl.NewTableContextImpl(se, se.exprctx)
se.tblctx = tbctximpl.NewTableContextImpl(se)
globalVarsAccessor := variable.NewMockGlobalAccessor4Tests()
se.GetSessionVars().GlobalVarsAccessor = globalVarsAccessor
se.txn.init()
Expand Down
4 changes: 2 additions & 2 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3701,7 +3701,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
s.sessionVars = variable.NewSessionVars(s)
s.exprctx = contextsession.NewSessionExprContext(s)
s.pctx = newPlanContextImpl(s)
s.tblctx = tbctximpl.NewTableContextImpl(s, s.exprctx)
s.tblctx = tbctximpl.NewTableContextImpl(s)

if opt != nil && opt.PreparedPlanCache != nil {
s.sessionPlanCache = opt.PreparedPlanCache
Expand Down Expand Up @@ -3764,7 +3764,7 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
}
s.exprctx = contextsession.NewSessionExprContext(s)
s.pctx = newPlanContextImpl(s)
s.tblctx = tbctximpl.NewTableContextImpl(s, s.exprctx)
s.tblctx = tbctximpl.NewTableContextImpl(s)
s.mu.values = make(map[fmt.Stringer]any)
s.lockedTables = make(map[int64]model.TableLockTpInfo)
domain.BindDomain(s, dom)
Expand Down
13 changes: 9 additions & 4 deletions pkg/table/context/buffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,15 @@ func (b *EncodeRowBuffer) WriteMemBufferEncoded(
return memBuffer.SetWithFlags(key, encoded, flags...)
}

// GetColDataBuffer returns the buffer for column data.
// TODO: make sure the inner buffer is not used outside directly.
func (b *EncodeRowBuffer) GetColDataBuffer() ([]int64, []types.Datum) {
return b.colIDs, b.row
// EncodeBinlogRowData encodes the row data for binlog and returns the encoded row value.
// The returned slice is not referenced in the buffer, so you can cache and modify them freely.
func (b *EncodeRowBuffer) EncodeBinlogRowData(loc *time.Location, ec errctx.Context) ([]byte, error) {
value, err := tablecodec.EncodeOldRow(loc, b.row, b.colIDs, nil, nil)
err = ec.HandleError(err)
if err != nil {
return nil, err
}
return value, nil
}

// CheckRowBuffer is used to check row constraints
Expand Down
25 changes: 17 additions & 8 deletions pkg/table/context/buffers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package context
import (
"testing"
"time"
"unsafe"

"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -97,6 +98,7 @@ func TestEncodeRow(t *testing.T) {
oldFormat: true,
},
} {
// test encode and write to mem buffer
cfg := RowEncodingConfig{
RowEncoder: &rowcodec.Encoder{Enable: !c.oldFormat},
IsRowLevelChecksumEnabled: c.rowLevelChecksum,
Expand All @@ -115,19 +117,31 @@ func TestEncodeRow(t *testing.T) {

memBuffer := &mockMemBuffer{}
if len(c.flags) == 0 {
memBuffer.On("Set", kv.Key("key1"), expectedVal).Return(nil).Once()
memBuffer.On("Set", kv.Key("key1"), expectedVal).
Return(nil).Once()
} else {
memBuffer.On("SetWithFlags", kv.Key("key1"), expectedVal, c.flags).Return(nil).Once()
memBuffer.On("SetWithFlags", kv.Key("key1"), expectedVal, c.flags).
Return(nil).Once()
}
err = buffer.WriteMemBufferEncoded(
cfg, c.loc, errctx.StrictNoWarningContext,
memBuffer, kv.Key("key1"), c.flags...,
)
require.NoError(t, err)
memBuffer.AssertExpectations(t)

// the encoding result should be cached as a buffer
require.Equal(t, expectedVal, buffer.writeStmtBufs.RowValBuf)

// test encode val for binlog
expectedVal, err =
tablecodec.EncodeOldRow(c.loc, []types.Datum{d1, d2, d3}, []int64{1, 2, 3}, nil, nil)
require.NoError(t, err)
encoded, err := buffer.EncodeBinlogRowData(c.loc, errctx.StrictNoWarningContext)
require.NoError(t, err)
require.Equal(t, expectedVal, encoded)
// the encoded should not be referenced by any inner buffer
require.True(t, unsafe.SliceData(encoded) != unsafe.SliceData(buffer.writeStmtBufs.RowValBuf))
require.True(t, unsafe.SliceData(encoded) != unsafe.SliceData(buffer.writeStmtBufs.IndexKeyBuf))
}
}

Expand Down Expand Up @@ -158,11 +172,6 @@ func TestEncodeBufferReserve(t *testing.T) {
require.Equal(t, 4, len(buffer.writeStmtBufs.AddRowValues))
addRowValuesCap := cap(buffer.writeStmtBufs.AddRowValues)

// GetColDataBuffer should return the underlying buffer
colIDs, row := buffer.GetColDataBuffer()
require.Equal(t, buffer.colIDs, colIDs)
require.Equal(t, buffer.row, row)

// reset should not shrink the capacity
buffer.Reset(2)
require.Equal(t, 6, cap(buffer.colIDs))
Expand Down
8 changes: 6 additions & 2 deletions pkg/table/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,17 @@ type MutateContext interface {
// If the active parameter is true, call this function will wait for the pending txn
// to become valid.
Txn(active bool) (kv.Transaction, error)
// StmtGetMutation gets the binlog mutation for current statement.
StmtGetMutation(int64) *binlog.TableMutation
// BinlogEnabled returns whether the binlog is enabled.
BinlogEnabled() bool
// GetBinlogMutation returns a `binlog.TableMutation` object for a table.
GetBinlogMutation(tblID int64) *binlog.TableMutation
// GetDomainInfoSchema returns the latest information schema in domain
GetDomainInfoSchema() infoschema.MetaOnlyInfoSchema
// TxnRecordTempTable record the temporary table to the current transaction.
// This method will be called when the temporary table is modified or should allocate id in the transaction.
TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable
// InRestrictedSQL returns whether the current context is used in restricted SQL.
InRestrictedSQL() bool
// GetRowEncodingConfig returns the RowEncodingConfig.
GetRowEncodingConfig() RowEncodingConfig
// GetMutateBuffers returns the MutateBuffers,
Expand Down
18 changes: 17 additions & 1 deletion pkg/table/contextimpl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "contextimpl",
Expand All @@ -12,5 +12,21 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/table/context",
"//pkg/util/tableutil",
"@com_github_pingcap_tipb//go-binlog",
],
)

go_test(
name = "contextimpl_test",
timeout = "short",
srcs = ["table_test.go"],
flaky = True,
deps = [
":contextimpl",
"//pkg/sessionctx/binloginfo",
"//pkg/testkit",
"//pkg/util/mock",
"@com_github_pingcap_tipb//go-binlog",
"@com_github_stretchr_testify//require",
],
)
22 changes: 18 additions & 4 deletions pkg/table/contextimpl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table/context"
"github.com/pingcap/tidb/pkg/util/tableutil"
"github.com/pingcap/tipb/go-binlog"
)

var _ context.MutateContext = &TableContextImpl{}
Expand All @@ -29,18 +30,16 @@ var _ context.AllocatorContext = &TableContextImpl{}
// TableContextImpl is used to provide context for table operations.
type TableContextImpl struct {
sessionctx.Context
exprCtx exprctx.ExprContext
// mutateBuffers is a memory pool for table related memory allocation that aims to reuse memory
// and saves allocation
// The buffers are supposed to be used inside AddRecord/UpdateRecord/RemoveRecord.
mutateBuffers *context.MutateBuffers
}

// NewTableContextImpl creates a new TableContextImpl.
func NewTableContextImpl(sctx sessionctx.Context, exprCtx exprctx.ExprContext) *TableContextImpl {
func NewTableContextImpl(sctx sessionctx.Context) *TableContextImpl {
return &TableContextImpl{
Context: sctx,
exprCtx: exprCtx,
mutateBuffers: context.NewMutateBuffers(sctx.GetSessionVars().GetWriteStmtBufs()),
}
}
Expand All @@ -53,7 +52,22 @@ func (ctx *TableContextImpl) TxnRecordTempTable(tbl *model.TableInfo) tableutil.

// GetExprCtx returns the ExprContext
func (ctx *TableContextImpl) GetExprCtx() exprctx.ExprContext {
return ctx.exprCtx
return ctx.Context.GetExprCtx()
}

// InRestrictedSQL returns whether the current context is used in restricted SQL.
func (ctx *TableContextImpl) InRestrictedSQL() bool {
return ctx.vars().StmtCtx.InRestrictedSQL
}

// BinlogEnabled returns whether the binlog is enabled.
func (ctx *TableContextImpl) BinlogEnabled() bool {
return ctx.vars().BinlogClient != nil
}

// GetBinlogMutation returns a `binlog.TableMutation` object for a table.
func (ctx *TableContextImpl) GetBinlogMutation(tblID int64) *binlog.TableMutation {
return ctx.Context.StmtGetMutation(tblID)
}

// GetRowEncodingConfig returns the RowEncodingConfig.
Expand Down
71 changes: 71 additions & 0 deletions pkg/table/contextimpl/table_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package contextimpl_test

import (
"testing"

"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
"github.com/pingcap/tidb/pkg/table/contextimpl"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tipb/go-binlog"
"github.com/stretchr/testify/require"
)

func TestMutateContextImplFields(t *testing.T) {
sctx := mock.NewContext()
sctx.Mutations = make(map[int64]*binlog.TableMutation)
ctx := contextimpl.NewTableContextImpl(sctx)
// expression
require.True(t, sctx.GetExprCtx() == ctx.GetExprCtx())
// binlog
sctx.GetSessionVars().BinlogClient = nil
require.False(t, ctx.BinlogEnabled())
sctx.GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&testkit.MockPumpClient{})
require.True(t, ctx.BinlogEnabled())
binlogMutation := ctx.GetBinlogMutation(1234)
require.NotNil(t, binlogMutation)
require.Same(t, sctx.StmtGetMutation(1234), binlogMutation)
// restricted SQL
sctx.GetSessionVars().StmtCtx.InRestrictedSQL = false
require.False(t, ctx.InRestrictedSQL())
sctx.GetSessionVars().StmtCtx.InRestrictedSQL = true
require.True(t, ctx.InRestrictedSQL())
// encoding config
sctx.GetSessionVars().EnableRowLevelChecksum = true
sctx.GetSessionVars().RowEncoder.Enable = true
sctx.GetSessionVars().InRestrictedSQL = false
cfg := ctx.GetRowEncodingConfig()
require.True(t, cfg.IsRowLevelChecksumEnabled)
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
require.True(t, cfg.IsRowLevelChecksumEnabled)
require.Same(t, &sctx.GetSessionVars().RowEncoder, cfg.RowEncoder)
sctx.GetSessionVars().RowEncoder.Enable = false
cfg = ctx.GetRowEncodingConfig()
require.False(t, cfg.IsRowLevelChecksumEnabled)
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
require.Same(t, &sctx.GetSessionVars().RowEncoder, cfg.RowEncoder)
require.False(t, cfg.IsRowLevelChecksumEnabled)
sctx.GetSessionVars().RowEncoder.Enable = true
sctx.GetSessionVars().InRestrictedSQL = true
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
require.False(t, cfg.IsRowLevelChecksumEnabled)
sctx.GetSessionVars().InRestrictedSQL = false
sctx.GetSessionVars().EnableRowLevelChecksum = false
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
// mutate buffers
require.NotNil(t, ctx.GetMutateBuffers())
}
1 change: 1 addition & 0 deletions pkg/table/tables/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/table",
"//pkg/table/context",
"//pkg/tablecodec",
"//pkg/types",
"//pkg/util",
Expand Down
Loading

0 comments on commit f158c65

Please sign in to comment.