Skip to content

Commit

Permalink
table: Add GetStatisticsSupport and GetBinlogSupport for `MutateC…
Browse files Browse the repository at this point in the history
…ontext` (#54832)

ref #54397
  • Loading branch information
lcwangchao authored Jul 24, 2024
1 parent 04e5ff9 commit d67421d
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 85 deletions.
4 changes: 2 additions & 2 deletions pkg/executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
func TestInsertOnDuplicateKeyWithBinlog(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
failpoint.Enable("github.com/pingcap/tidb/pkg/table/tables/forceWriteBinlog", "return")
defer failpoint.Disable("github.com/pingcap/tidb/pkg/table/tables/forceWriteBinlog")
failpoint.Enable("github.com/pingcap/tidb/pkg/table/contextimpl/forceWriteBinlog", "return")
defer failpoint.Disable("github.com/pingcap/tidb/pkg/table/contextimpl/forceWriteBinlog")
testInsertOnDuplicateKey(t, tk)
}

Expand Down
22 changes: 18 additions & 4 deletions pkg/table/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ type RowEncodingConfig struct {
RowEncoder *rowcodec.Encoder
}

// BinlogSupport is used for binlog operations
type BinlogSupport interface {
// GetBinlogMutation returns a `binlog.TableMutation` object for a table.
GetBinlogMutation(tblID int64) *binlog.TableMutation
}

// StatisticsSupport is used for statistics update operations.
type StatisticsSupport interface {
// UpdatePhysicalTableDelta updates the physical table delta.
UpdatePhysicalTableDelta(physicalTableID int64, delta int64, count int64, cols variable.DeltaCols)
}

// MutateContext is used to when mutating a table.
type MutateContext interface {
AllocatorContext
Expand All @@ -47,10 +59,6 @@ type MutateContext interface {
// If the active parameter is true, call this function will wait for the pending txn
// to become valid.
Txn(active bool) (kv.Transaction, error)
// BinlogEnabled returns whether the binlog is enabled.
BinlogEnabled() bool
// GetBinlogMutation returns a `binlog.TableMutation` object for a table.
GetBinlogMutation(tblID int64) *binlog.TableMutation
// GetDomainInfoSchema returns the latest information schema in domain
GetDomainInfoSchema() infoschema.MetaOnlyInfoSchema
// TxnRecordTempTable record the temporary table to the current transaction.
Expand All @@ -71,6 +79,12 @@ type MutateContext interface {
// which is a buffer for table related structures that aims to reuse memory and
// saves allocation.
GetMutateBuffers() *MutateBuffers
// GetBinlogSupport returns a `BinlogSupport` if the context supports it.
// If the context does not support binlog, the second return value will be false.
GetBinlogSupport() (BinlogSupport, bool)
// GetStatisticsSupport returns a `StatisticsSupport` if the context supports it.
// If the context does not support statistics update, the second return value will be false.
GetStatisticsSupport() (StatisticsSupport, bool)
}

// AllocatorContext is used to provide context for method `table.Allocators`.
Expand Down
1 change: 1 addition & 0 deletions pkg/table/contextimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/table/context",
"//pkg/util/tableutil",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-binlog",
],
)
Expand Down
46 changes: 36 additions & 10 deletions pkg/table/contextimpl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package contextimpl

import (
"github.com/pingcap/failpoint"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand Down Expand Up @@ -75,16 +76,6 @@ func (ctx *TableContextImpl) EnableMutationChecker() bool {
return ctx.vars().EnableMutationChecker
}

// BinlogEnabled returns whether the binlog is enabled.
func (ctx *TableContextImpl) BinlogEnabled() bool {
return ctx.vars().BinlogClient != nil
}

// GetBinlogMutation returns a `binlog.TableMutation` object for a table.
func (ctx *TableContextImpl) GetBinlogMutation(tblID int64) *binlog.TableMutation {
return ctx.Context.StmtGetMutation(tblID)
}

// GetRowEncodingConfig returns the RowEncodingConfig.
func (ctx *TableContextImpl) GetRowEncodingConfig() context.RowEncodingConfig {
vars := ctx.vars()
Expand All @@ -99,6 +90,41 @@ func (ctx *TableContextImpl) GetMutateBuffers() *context.MutateBuffers {
return ctx.mutateBuffers
}

// GetBinlogSupport implements the MutateContext interface.
func (ctx *TableContextImpl) GetBinlogSupport() (context.BinlogSupport, bool) {
failpoint.Inject("forceWriteBinlog", func() {
// Just to cover binlog related code in this package, since the `BinlogClient` is
// still nil, mutations won't be written to pump on commit.
failpoint.Return(ctx, true)
})
if ctx.vars().BinlogClient != nil {
return ctx, true
}
return nil, false
}

// GetBinlogMutation implements the BinlogSupport interface.
func (ctx *TableContextImpl) GetBinlogMutation(tblID int64) *binlog.TableMutation {
return ctx.Context.StmtGetMutation(tblID)
}

// GetStatisticsSupport implements the MutateContext interface.
func (ctx *TableContextImpl) GetStatisticsSupport() (context.StatisticsSupport, bool) {
if ctx.vars().TxnCtx != nil {
return ctx, true
}
return nil, false
}

// UpdatePhysicalTableDelta implements the StatisticsSupport interface.
func (ctx *TableContextImpl) UpdatePhysicalTableDelta(
physicalTableID int64, delta int64, count int64, cols variable.DeltaCols,
) {
if txnCtx := ctx.vars().TxnCtx; txnCtx != nil {
txnCtx.UpdateDeltaForTable(physicalTableID, delta, count, cols)
}
}

func (ctx *TableContextImpl) vars() *variable.SessionVars {
return ctx.Context.GetSessionVars()
}
31 changes: 28 additions & 3 deletions pkg/table/contextimpl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ func TestMutateContextImplFields(t *testing.T) {
require.True(t, sctx.GetExprCtx() == ctx.GetExprCtx())
// binlog
sctx.GetSessionVars().BinlogClient = nil
require.False(t, ctx.BinlogEnabled())
binlogSupport, ok := ctx.GetBinlogSupport()
require.False(t, ok)
require.Nil(t, binlogSupport)
sctx.GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&testkit.MockPumpClient{})
require.True(t, ctx.BinlogEnabled())
binlogMutation := ctx.GetBinlogMutation(1234)
binlogSupport, ok = ctx.GetBinlogSupport()
require.True(t, ok)
require.NotNil(t, binlogSupport)
binlogMutation := binlogSupport.GetBinlogMutation(1234)
require.NotNil(t, binlogMutation)
require.Same(t, sctx.StmtGetMutation(1234), binlogMutation)
// ConnectionID
Expand Down Expand Up @@ -82,4 +86,25 @@ func TestMutateContextImplFields(t *testing.T) {
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
// mutate buffers
require.NotNil(t, ctx.GetMutateBuffers())
// statistics support
txnCtx := sctx.GetSessionVars().TxnCtx
txnCtx.TableDeltaMap = make(map[int64]variable.TableDelta)
sctx.GetSessionVars().TxnCtx = nil
statisticsSupport, ok := ctx.GetStatisticsSupport()
require.False(t, ok)
require.Nil(t, statisticsSupport)
sctx.GetSessionVars().TxnCtx = txnCtx
statisticsSupport, ok = ctx.GetStatisticsSupport()
require.True(t, ok)
require.NotNil(t, statisticsSupport)
require.Equal(t, 0, len(txnCtx.TableDeltaMap))
statisticsSupport.UpdatePhysicalTableDelta(
12, 1, 2, variable.DeltaColsMap(map[int64]int64{3: 4, 5: 6}),
)
require.Equal(t, 1, len(txnCtx.TableDeltaMap))
deltaMap := txnCtx.TableDeltaMap[12]
require.Equal(t, int64(12), deltaMap.TableID)
require.Equal(t, int64(1), deltaMap.Delta)
require.Equal(t, int64(2), deltaMap.Count)
require.Equal(t, map[int64]int64{3: 4, 5: 6}, deltaMap.ColSize)
}
Loading

0 comments on commit d67421d

Please sign in to comment.