From 2ee8c99cff49684b67b992a43646c1c3bc06b1e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Fri, 26 Jul 2024 13:19:36 +0800 Subject: [PATCH] table: Add `CachedTableSupport` and `TemporaryTableSupport` for `MutateContext` (#54900) close pingcap/tidb#54397 --- pkg/table/context/BUILD.bazel | 1 + pkg/table/context/table.go | 72 ++++++++++++++++++++++++++--- pkg/table/contextimpl/BUILD.bazel | 4 +- pkg/table/contextimpl/table.go | 60 ++++++++++++++++++++++-- pkg/table/contextimpl/table_test.go | 55 ++++++++++++++++++++++ pkg/table/tables/cache.go | 8 +--- pkg/table/tables/tables.go | 56 ++++++++-------------- 7 files changed, 202 insertions(+), 54 deletions(-) diff --git a/pkg/table/context/BUILD.bazel b/pkg/table/context/BUILD.bazel index 171fecb88de64..c49178bd63055 100644 --- a/pkg/table/context/BUILD.bazel +++ b/pkg/table/context/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/expression/context", "//pkg/infoschema/context", "//pkg/kv", + "//pkg/meta/autoid", "//pkg/parser/model", "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", diff --git a/pkg/table/context/table.go b/pkg/table/context/table.go index f286441782ae6..51a513187639f 100644 --- a/pkg/table/context/table.go +++ b/pkg/table/context/table.go @@ -18,6 +18,7 @@ import ( exprctx "github.com/pingcap/tidb/pkg/expression/context" infoschema "github.com/pingcap/tidb/pkg/infoschema/context" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" @@ -48,6 +49,61 @@ type StatisticsSupport interface { UpdatePhysicalTableDelta(physicalTableID int64, delta int64, count int64, cols variable.DeltaCols) } +// CachedTableSupport is used for cached table operations +type CachedTableSupport interface { + // AddCachedTableHandleToTxn adds a cached handle to the current transaction + // to handle cached table when committing txn. + // The handle argument should implement `table.CachedTable` interface, but here is `any` to avoid import cycle. + AddCachedTableHandleToTxn(tableID int64, handle any) +} + +// TemporaryTableHandler is used by `table.Table` to handle temporary table. +type TemporaryTableHandler struct { + tblInTxn tableutil.TempTable + data variable.TemporaryTableData +} + +// NewTemporaryTableHandler creates a new TemporaryTableHandler +func NewTemporaryTableHandler(tbl tableutil.TempTable, data variable.TemporaryTableData) TemporaryTableHandler { + return TemporaryTableHandler{ + tblInTxn: tbl, + data: data, + } +} + +// Meta returns the meta +func (h *TemporaryTableHandler) Meta() *model.TableInfo { + return h.tblInTxn.GetMeta() +} + +// GetDirtySize returns the size of dirty data in txn of the temporary table +func (h *TemporaryTableHandler) GetDirtySize() int64 { + return h.tblInTxn.GetSize() +} + +// GetCommittedSize returns the committed data size of the temporary table +func (h *TemporaryTableHandler) GetCommittedSize() int64 { + if h.data == nil { + return 0 + } + return h.data.GetTableSize(h.tblInTxn.GetMeta().ID) +} + +// UpdateTxnDeltaSize updates the size of dirty data statistics in txn of the temporary table +func (h *TemporaryTableHandler) UpdateTxnDeltaSize(delta int) { + h.tblInTxn.SetSize(h.tblInTxn.GetSize() + int64(delta)) +} + +// TemporaryTableSupport is used for temporary table operations +type TemporaryTableSupport interface { + // GetTemporaryTableSizeLimit returns the size limit of a temporary table. + GetTemporaryTableSizeLimit() int64 + // AddTemporaryTableToTxn adds a temporary table to txn to mark it is modified + // and txn will handle it when committing. + // It returns a `TemporaryTableHandler` object which provides some extra info for the temporary table. + AddTemporaryTableToTxn(tblInfo *model.TableInfo) (TemporaryTableHandler, bool) +} + // MutateContext is used to when mutating a table. type MutateContext interface { AllocatorContext @@ -62,9 +118,6 @@ type MutateContext interface { Txn(active bool) (kv.Transaction, error) // GetDomainInfoSchema returns the latest information schema in domain GetDomainInfoSchema() infoschema.MetaOnlyInfoSchema - // TxnRecordTempTable record the temporary table to the current transaction. - // This method will be called when the temporary table is modified or should allocate id in the transaction. - TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable // ConnectionID returns the id of the current connection. // If the current environment is not in a query from the client, the return value is 0. ConnectionID() uint64 @@ -90,11 +143,18 @@ type MutateContext interface { // GetStatisticsSupport returns a `StatisticsSupport` if the context supports it. // If the context does not support statistics update, the second return value will be false. GetStatisticsSupport() (StatisticsSupport, bool) + // GetCachedTableSupport returns a `CachedTableSupport` if the context supports it. + // If the context does not support cached table, the second return value will be false. + GetCachedTableSupport() (CachedTableSupport, bool) + // GetTemporaryTableSupport returns a `TemporaryTableSupport` if the context supports it. + // If the context does not support temporary table, the second return value will be false. + GetTemporaryTableSupport() (TemporaryTableSupport, bool) } // AllocatorContext is used to provide context for method `table.Allocators`. type AllocatorContext interface { - // TxnRecordTempTable record the temporary table to the current transaction. - // This method will be called when the temporary table is modified or should allocate id in the transaction. - TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable + // AlternativeAllocators returns an alternative `autoid.Allocators` for the table. + // If the second return value is nil, it means there are no alternative allocators in the context. + // Currently, it provides alternative allocators for temporary tables to alloc IDs in session. + AlternativeAllocators(tbl *model.TableInfo) (autoid.Allocators, bool) } diff --git a/pkg/table/contextimpl/BUILD.bazel b/pkg/table/contextimpl/BUILD.bazel index 3ebf393aacf71..ccc6afe99f553 100644 --- a/pkg/table/contextimpl/BUILD.bazel +++ b/pkg/table/contextimpl/BUILD.bazel @@ -7,13 +7,13 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/expression/context", + "//pkg/meta/autoid", "//pkg/parser/model", "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/table/context", "//pkg/util/intest", - "//pkg/util/tableutil", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_tipb//go-binlog", ], @@ -26,8 +26,10 @@ go_test( flaky = True, deps = [ ":contextimpl", + "//pkg/parser/model", "//pkg/sessionctx/binloginfo", "//pkg/sessionctx/variable", + "//pkg/table", "//pkg/testkit", "//pkg/util/mock", "@com_github_pingcap_tipb//go-binlog", diff --git a/pkg/table/contextimpl/table.go b/pkg/table/contextimpl/table.go index ed6c6ac7425e1..1f701246ab8a8 100644 --- a/pkg/table/contextimpl/table.go +++ b/pkg/table/contextimpl/table.go @@ -17,13 +17,13 @@ package contextimpl import ( "github.com/pingcap/failpoint" exprctx "github.com/pingcap/tidb/pkg/expression/context" + "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table/context" "github.com/pingcap/tidb/pkg/util/intest" - "github.com/pingcap/tidb/pkg/util/tableutil" "github.com/pingcap/tipb/go-binlog" ) @@ -47,10 +47,18 @@ func NewTableContextImpl(sctx sessionctx.Context) *TableContextImpl { } } -// TxnRecordTempTable record the temporary table to the current transaction. -// This method will be called when the temporary table is modified in the transaction. -func (ctx *TableContextImpl) TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable { - return ctx.vars().GetTemporaryTable(tbl) +// AlternativeAllocators implements the AllocatorContext interface +func (ctx *TableContextImpl) AlternativeAllocators(tbl *model.TableInfo) (allocators autoid.Allocators, ok bool) { + // Use an independent allocator for global temporary tables. + if tbl.TempTableType == model.TempTableGlobal { + if tempTbl := ctx.vars().GetTemporaryTable(tbl); tempTbl != nil { + if alloc := tempTbl.GetAutoIDAllocator(); alloc != nil { + return autoid.NewAllocators(false, alloc), true + } + } + // If the session is not in a txn, for example, in "show create table", use the original allocator. + } + return } // GetExprCtx returns the ExprContext @@ -145,6 +153,48 @@ func (ctx *TableContextImpl) UpdatePhysicalTableDelta( } } +// GetCachedTableSupport implements the MutateContext interface. +func (ctx *TableContextImpl) GetCachedTableSupport() (context.CachedTableSupport, bool) { + if ctx.vars().TxnCtx != nil { + return ctx, true + } + return nil, false +} + +// AddCachedTableHandleToTxn implements `CachedTableSupport` interface +func (ctx *TableContextImpl) AddCachedTableHandleToTxn(tableID int64, handle any) { + txnCtx := ctx.vars().TxnCtx + if txnCtx.CachedTables == nil { + txnCtx.CachedTables = make(map[int64]any) + } + if _, ok := txnCtx.CachedTables[tableID]; !ok { + txnCtx.CachedTables[tableID] = handle + } +} + +// GetTemporaryTableSupport implements the MutateContext interface. +func (ctx *TableContextImpl) GetTemporaryTableSupport() (context.TemporaryTableSupport, bool) { + if ctx.vars().TxnCtx == nil { + return nil, false + } + return ctx, true +} + +// GetTemporaryTableSizeLimit implements TemporaryTableSupport interface. +func (ctx *TableContextImpl) GetTemporaryTableSizeLimit() int64 { + return ctx.vars().TMPTableSize +} + +// AddTemporaryTableToTxn implements the TemporaryTableSupport interface. +func (ctx *TableContextImpl) AddTemporaryTableToTxn(tblInfo *model.TableInfo) (context.TemporaryTableHandler, bool) { + vars := ctx.vars() + if tbl := vars.GetTemporaryTable(tblInfo); tbl != nil { + tbl.SetModified(true) + return context.NewTemporaryTableHandler(tbl, vars.TemporaryTableData), true + } + return context.TemporaryTableHandler{}, false +} + func (ctx *TableContextImpl) vars() *variable.SessionVars { return ctx.Context.GetSessionVars() } diff --git a/pkg/table/contextimpl/table_test.go b/pkg/table/contextimpl/table_test.go index 7fea3d657fed5..7c5481b192565 100644 --- a/pkg/table/contextimpl/table_test.go +++ b/pkg/table/contextimpl/table_test.go @@ -17,8 +17,10 @@ package contextimpl_test import ( "testing" + "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx/binloginfo" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/contextimpl" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/mock" @@ -26,6 +28,15 @@ import ( "github.com/stretchr/testify/require" ) +type mockTemporaryData struct { + variable.TemporaryTableData + size int64 +} + +func (m *mockTemporaryData) GetTableSize(tableID int64) int64 { + return tableID*1000000 + m.size +} + func TestMutateContextImplFields(t *testing.T) { sctx := mock.NewContext() sctx.Mutations = make(map[int64]*binlog.TableMutation) @@ -114,4 +125,48 @@ func TestMutateContextImplFields(t *testing.T) { require.Equal(t, int64(1), deltaMap.Delta) require.Equal(t, int64(2), deltaMap.Count) require.Equal(t, map[int64]int64{3: 4, 5: 6}, deltaMap.ColSize) + // cached table support + sctx.GetSessionVars().TxnCtx = nil + cachedTableSupport, ok := ctx.GetCachedTableSupport() + require.False(t, ok) + require.Nil(t, cachedTableSupport) + sctx.GetSessionVars().TxnCtx = txnCtx + cachedTableSupport, ok = ctx.GetCachedTableSupport() + require.True(t, ok) + type mockCachedTable struct { + table.CachedTable + } + handle := &mockCachedTable{} + require.Nil(t, sctx.GetSessionVars().TxnCtx.CachedTables[123]) + cachedTableSupport.AddCachedTableHandleToTxn(123, handle) + cached := sctx.GetSessionVars().TxnCtx.CachedTables[123] + require.Same(t, handle, cached) + // temporary table support + sctx.GetSessionVars().TxnCtx = nil + tempTableSupport, ok := ctx.GetTemporaryTableSupport() + require.False(t, ok) + require.Nil(t, tempTableSupport) + sctx.GetSessionVars().TxnCtx = txnCtx + mockTempData := &mockTemporaryData{} + sctx.GetSessionVars().TemporaryTableData = mockTempData + tempTableSupport, ok = ctx.GetTemporaryTableSupport() + require.True(t, ok) + require.Nil(t, txnCtx.TemporaryTables[456]) + tmpTblHandler, ok := tempTableSupport.AddTemporaryTableToTxn(&model.TableInfo{ + ID: 456, + TempTableType: model.TempTableGlobal, + }) + require.True(t, ok) + require.NotNil(t, tmpTblHandler) + tmpTblTable := txnCtx.TemporaryTables[456] + require.NotNil(t, tmpTblTable) + require.True(t, tmpTblTable.GetModified()) + require.Equal(t, int64(456000000), tmpTblHandler.GetCommittedSize()) + mockTempData.size = 111 + require.Equal(t, int64(456000111), tmpTblHandler.GetCommittedSize()) + require.Equal(t, int64(0), tmpTblHandler.GetDirtySize()) + tmpTblHandler.UpdateTxnDeltaSize(333) + require.Equal(t, int64(333), tmpTblHandler.GetDirtySize()) + tmpTblHandler.UpdateTxnDeltaSize(-1) + require.Equal(t, int64(332), tmpTblHandler.GetDirtySize()) } diff --git a/pkg/table/tables/cache.go b/pkg/table/tables/cache.go index eb15c0b11f7f6..515a2a781f5bc 100644 --- a/pkg/table/tables/cache.go +++ b/pkg/table/tables/cache.go @@ -248,12 +248,8 @@ func (c *cachedTable) AddRecord(sctx table.MutateContext, r []types.Datum, opts } func txnCtxAddCachedTable(sctx table.MutateContext, tid int64, handle *cachedTable) { - txnCtx := sctx.GetSessionVars().TxnCtx - if txnCtx.CachedTables == nil { - txnCtx.CachedTables = make(map[int64]any) - } - if _, ok := txnCtx.CachedTables[tid]; !ok { - txnCtx.CachedTables[tid] = handle + if s, ok := sctx.GetCachedTableSupport(); ok { + s.AddCachedTableHandleToTxn(tid, handle) } } diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index fdb5ae7b413d8..83f58f727c844 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -436,8 +436,8 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext defer memBuffer.Cleanup(sh) if m := t.Meta(); m.TempTableType != model.TempTableNone { - if tmpTable := addTemporaryTable(sctx, m); tmpTable != nil { - if err := checkTempTableSize(sctx, tmpTable, m); err != nil { + if tmpTable, sizeLimit, ok := addTemporaryTable(sctx, m); ok { + if err = checkTempTableSize(tmpTable, sizeLimit); err != nil { return err } defer handleTempTableSize(tmpTable, txn.Size(), txn) @@ -711,32 +711,24 @@ func TryGetCommonPkColumns(tbl table.Table) []*table.Column { return pkCols } -func addTemporaryTable(sctx table.MutateContext, tblInfo *model.TableInfo) tableutil.TempTable { - tempTable := sctx.TxnRecordTempTable(tblInfo) - tempTable.SetModified(true) - return tempTable +func addTemporaryTable(sctx table.MutateContext, tblInfo *model.TableInfo) (tbctx.TemporaryTableHandler, int64, bool) { + if s, ok := sctx.GetTemporaryTableSupport(); ok { + if h, ok := s.AddTemporaryTableToTxn(tblInfo); ok { + return h, s.GetTemporaryTableSizeLimit(), ok + } + } + return tbctx.TemporaryTableHandler{}, 0, false } // The size of a temporary table is calculated by accumulating the transaction size delta. -func handleTempTableSize(t tableutil.TempTable, txnSizeBefore int, txn kv.Transaction) { - txnSizeNow := txn.Size() - delta := txnSizeNow - txnSizeBefore - - oldSize := t.GetSize() - newSize := oldSize + int64(delta) - t.SetSize(newSize) +func handleTempTableSize(t tbctx.TemporaryTableHandler, txnSizeBefore int, txn kv.Transaction) { + t.UpdateTxnDeltaSize(txn.Size() - txnSizeBefore) } -func checkTempTableSize(ctx table.MutateContext, tmpTable tableutil.TempTable, tblInfo *model.TableInfo) error { - tmpTableSize := tmpTable.GetSize() - if tempTableData := ctx.GetSessionVars().TemporaryTableData; tempTableData != nil { - tmpTableSize += tempTableData.GetTableSize(tblInfo.ID) - } - - if tmpTableSize > ctx.GetSessionVars().TMPTableSize { - return table.ErrTempTableFull.GenWithStackByArgs(tblInfo.Name.O) +func checkTempTableSize(tmpTable tbctx.TemporaryTableHandler, sizeLimit int64) error { + if tmpTable.GetCommittedSize()+tmpTable.GetDirtySize() > sizeLimit { + return table.ErrTempTableFull.GenWithStackByArgs(tmpTable.Meta().Name.O) } - return nil } @@ -754,8 +746,8 @@ func (t *TableCommon) AddRecord(sctx table.MutateContext, r []types.Datum, opts } if m := t.Meta(); m.TempTableType != model.TempTableNone { - if tmpTable := addTemporaryTable(sctx, m); tmpTable != nil { - if err := checkTempTableSize(sctx, tmpTable, m); err != nil { + if tmpTable, sizeLimit, ok := addTemporaryTable(sctx, m); ok { + if err = checkTempTableSize(tmpTable, sizeLimit); err != nil { return nil, err } defer handleTempTableSize(tmpTable, txn.Size(), txn) @@ -1210,8 +1202,8 @@ func (t *TableCommon) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []typ } if m := t.Meta(); m.TempTableType != model.TempTableNone { - if tmpTable := addTemporaryTable(ctx, m); tmpTable != nil { - if err := checkTempTableSize(ctx, tmpTable, m); err != nil { + if tmpTable, sizeLimit, ok := addTemporaryTable(ctx, m); ok { + if err = checkTempTableSize(tmpTable, sizeLimit); err != nil { return err } defer handleTempTableSize(tmpTable, txn.Size(), txn) @@ -1636,16 +1628,8 @@ func (t *TableCommon) Allocators(ctx table.AllocatorContext) autoid.Allocators { if ctx == nil { return t.allocs } - - // Use an independent allocator for global temporary tables. - if t.meta.TempTableType == model.TempTableGlobal { - if tbl := ctx.TxnRecordTempTable(t.meta); tbl != nil { - if alloc := tbl.GetAutoIDAllocator(); alloc != nil { - return autoid.NewAllocators(false, alloc) - } - } - // If the session is not in a txn, for example, in "show create table", use the original allocator. - // Otherwise the would be a nil pointer dereference. + if alloc, ok := ctx.AlternativeAllocators(t.meta); ok { + return alloc } return t.allocs }