Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table: Add GetStatisticsSupport and GetBinlogSupport for MutateContext #54832

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
func TestInsertOnDuplicateKeyWithBinlog(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
failpoint.Enable("github.com/pingcap/tidb/pkg/table/tables/forceWriteBinlog", "return")
defer failpoint.Disable("github.com/pingcap/tidb/pkg/table/tables/forceWriteBinlog")
failpoint.Enable("github.com/pingcap/tidb/pkg/table/contextimpl/forceWriteBinlog", "return")
defer failpoint.Disable("github.com/pingcap/tidb/pkg/table/contextimpl/forceWriteBinlog")
testInsertOnDuplicateKey(t, tk)
}

Expand Down
22 changes: 18 additions & 4 deletions pkg/table/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ type RowEncodingConfig struct {
RowEncoder *rowcodec.Encoder
}

// BinlogSupport is used for binlog operations
type BinlogSupport interface {
// GetBinlogMutation returns a `binlog.TableMutation` object for a table.
GetBinlogMutation(tblID int64) *binlog.TableMutation
}

// StatisticsSupport is used for statistics update operations.
type StatisticsSupport interface {
// UpdatePhysicalTableDelta updates the physical table delta.
UpdatePhysicalTableDelta(physicalTableID int64, delta int64, count int64, cols variable.DeltaCols)
}

// MutateContext is used to when mutating a table.
type MutateContext interface {
AllocatorContext
Expand All @@ -47,10 +59,6 @@ type MutateContext interface {
// If the active parameter is true, call this function will wait for the pending txn
// to become valid.
Txn(active bool) (kv.Transaction, error)
// BinlogEnabled returns whether the binlog is enabled.
BinlogEnabled() bool
// GetBinlogMutation returns a `binlog.TableMutation` object for a table.
GetBinlogMutation(tblID int64) *binlog.TableMutation
// GetDomainInfoSchema returns the latest information schema in domain
GetDomainInfoSchema() infoschema.MetaOnlyInfoSchema
// TxnRecordTempTable record the temporary table to the current transaction.
Expand All @@ -71,6 +79,12 @@ type MutateContext interface {
// which is a buffer for table related structures that aims to reuse memory and
// saves allocation.
GetMutateBuffers() *MutateBuffers
// GetBinlogSupport returns a `BinlogSupport` if the context supports it.
// If the context does not support binlog, the second return value will be false.
GetBinlogSupport() (BinlogSupport, bool)
// GetStatisticsSupport returns a `StatisticsSupport` if the context supports it.
// If the context does not support statistics update, the second return value will be false.
GetStatisticsSupport() (StatisticsSupport, bool)
}

// AllocatorContext is used to provide context for method `table.Allocators`.
Expand Down
1 change: 1 addition & 0 deletions pkg/table/contextimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/table/context",
"//pkg/util/tableutil",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-binlog",
],
)
Expand Down
46 changes: 36 additions & 10 deletions pkg/table/contextimpl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package contextimpl

import (
"github.com/pingcap/failpoint"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand Down Expand Up @@ -75,16 +76,6 @@ func (ctx *TableContextImpl) EnableMutationChecker() bool {
return ctx.vars().EnableMutationChecker
}

// BinlogEnabled returns whether the binlog is enabled.
func (ctx *TableContextImpl) BinlogEnabled() bool {
return ctx.vars().BinlogClient != nil
}

// GetBinlogMutation returns a `binlog.TableMutation` object for a table.
func (ctx *TableContextImpl) GetBinlogMutation(tblID int64) *binlog.TableMutation {
return ctx.Context.StmtGetMutation(tblID)
}

// GetRowEncodingConfig returns the RowEncodingConfig.
func (ctx *TableContextImpl) GetRowEncodingConfig() context.RowEncodingConfig {
vars := ctx.vars()
Expand All @@ -99,6 +90,41 @@ func (ctx *TableContextImpl) GetMutateBuffers() *context.MutateBuffers {
return ctx.mutateBuffers
}

// GetBinlogSupport implements the MutateContext interface.
func (ctx *TableContextImpl) GetBinlogSupport() (context.BinlogSupport, bool) {
failpoint.Inject("forceWriteBinlog", func() {
// Just to cover binlog related code in this package, since the `BinlogClient` is
// still nil, mutations won't be written to pump on commit.
failpoint.Return(ctx, true)
})
if ctx.vars().BinlogClient != nil {
return ctx, true
}
return nil, false
}

// GetBinlogMutation implements the BinlogSupport interface.
func (ctx *TableContextImpl) GetBinlogMutation(tblID int64) *binlog.TableMutation {
return ctx.Context.StmtGetMutation(tblID)
}

// GetStatisticsSupport implements the MutateContext interface.
func (ctx *TableContextImpl) GetStatisticsSupport() (context.StatisticsSupport, bool) {
if ctx.vars().TxnCtx != nil {
return ctx, true
}
return nil, false
}

// UpdatePhysicalTableDelta implements the StatisticsSupport interface.
func (ctx *TableContextImpl) UpdatePhysicalTableDelta(
physicalTableID int64, delta int64, count int64, cols variable.DeltaCols,
) {
if txnCtx := ctx.vars().TxnCtx; txnCtx != nil {
txnCtx.UpdateDeltaForTable(physicalTableID, delta, count, cols)
}
}

func (ctx *TableContextImpl) vars() *variable.SessionVars {
return ctx.Context.GetSessionVars()
}
31 changes: 28 additions & 3 deletions pkg/table/contextimpl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ func TestMutateContextImplFields(t *testing.T) {
require.True(t, sctx.GetExprCtx() == ctx.GetExprCtx())
// binlog
sctx.GetSessionVars().BinlogClient = nil
require.False(t, ctx.BinlogEnabled())
binlogSupport, ok := ctx.GetBinlogSupport()
require.False(t, ok)
require.Nil(t, binlogSupport)
sctx.GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&testkit.MockPumpClient{})
require.True(t, ctx.BinlogEnabled())
binlogMutation := ctx.GetBinlogMutation(1234)
binlogSupport, ok = ctx.GetBinlogSupport()
require.True(t, ok)
require.NotNil(t, binlogSupport)
binlogMutation := binlogSupport.GetBinlogMutation(1234)
require.NotNil(t, binlogMutation)
require.Same(t, sctx.StmtGetMutation(1234), binlogMutation)
// ConnectionID
Expand Down Expand Up @@ -82,4 +86,25 @@ func TestMutateContextImplFields(t *testing.T) {
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
// mutate buffers
require.NotNil(t, ctx.GetMutateBuffers())
// statistics support
txnCtx := sctx.GetSessionVars().TxnCtx
txnCtx.TableDeltaMap = make(map[int64]variable.TableDelta)
sctx.GetSessionVars().TxnCtx = nil
statisticsSupport, ok := ctx.GetStatisticsSupport()
require.False(t, ok)
require.Nil(t, statisticsSupport)
sctx.GetSessionVars().TxnCtx = txnCtx
statisticsSupport, ok = ctx.GetStatisticsSupport()
require.True(t, ok)
require.NotNil(t, statisticsSupport)
require.Equal(t, 0, len(txnCtx.TableDeltaMap))
statisticsSupport.UpdatePhysicalTableDelta(
12, 1, 2, variable.DeltaColsMap(map[int64]int64{3: 4, 5: 6}),
)
require.Equal(t, 1, len(txnCtx.TableDeltaMap))
deltaMap := txnCtx.TableDeltaMap[12]
require.Equal(t, int64(12), deltaMap.TableID)
require.Equal(t, int64(1), deltaMap.Delta)
require.Equal(t, int64(2), deltaMap.Count)
require.Equal(t, map[int64]int64{3: 4, 5: 6}, deltaMap.ColSize)
}
Loading