From 2409c83381b1693f95727d07f697b4cea9f52dad Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 11 Nov 2021 18:23:05 +0800 Subject: [PATCH 1/5] *: fix staled table cache usage inside a transaction (#29443) --- executor/builder.go | 1 + executor/distsql.go | 23 ++++----- executor/mem_reader.go | 46 +++++++++--------- executor/table_reader.go | 6 +-- executor/union_scan.go | 7 +++ planner/core/exhaust_physical_plans.go | 1 + planner/core/logical_plan_builder.go | 10 ++-- planner/core/logical_plans.go | 4 ++ planner/core/physical_plans.go | 2 + planner/core/stats.go | 4 +- sessionctx/stmtctx/stmtctx.go | 37 ++------------- table/table.go | 11 +---- table/tables/cache.go | 47 ++++++++++++------- table/tables/cache_test.go | 65 ++++++++++++++++++++++++-- 14 files changed, 152 insertions(+), 112 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index e10a2b6b11970..29c8d1d0e029c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1041,6 +1041,7 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco return x } us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), reader)} + us.cacheTable = v.CacheTable // Get the handle column index of the below Plan. us.belowHandleCols = v.HandleCols us.mutableRow = chunk.MutRowFromTypes(retTypes(us)) diff --git a/executor/distsql.go b/executor/distsql.go index 391dab353c647..899d2e822f176 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -198,7 +198,7 @@ type IndexReaderExecutor struct { // Close clears all resources hold by current object. func (e *IndexReaderExecutor) Close() error { - if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) { + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { return nil } @@ -210,7 +210,7 @@ func (e *IndexReaderExecutor) Close() error { // Next implements the Executor Next interface. func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) { + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { req.Reset() return nil } @@ -262,13 +262,7 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error { return e.open(ctx, kvRanges) } -func isReadFromCache(tbl table.Table, stmt *stmtctx.StatementContext) bool { - if tbl != nil && tbl.Meta() != nil && tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable { - cond, _ := stmt.GetCacheTable(tbl.Meta().ID) - return cond - } - return false -} + func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { var err error if e.corColInFilter { @@ -285,9 +279,8 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.kvRanges = kvRanges // Treat temporary table as dummy table, avoid sending distsql request to TiKV. // In a test case IndexReaderExecutor is mocked and e.table is nil. - // Cache table is similar with temporary table, if it satisfies the read condition. // Avoid sending distsql request to TIKV. - if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) { + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { return nil } @@ -411,8 +404,8 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { return err } - // Treat temporary table as dummy table, avoid sending distsql request to TiKV. Cache table is similar with temporary table. - if e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) { + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + if e.table.Meta().TempTableType != model.TempTableNone { return nil } @@ -675,7 +668,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup // Close implements Exec Close interface. func (e *IndexLookUpExecutor) Close() error { - if e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) { + if e.table.Meta().TempTableType != model.TempTableNone { return nil } @@ -699,7 +692,7 @@ func (e *IndexLookUpExecutor) Close() error { // Next implements Exec Next interface. func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - if e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) { + if e.table.Meta().TempTableType != model.TempTableNone { req.Reset() return nil } diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 83901d35e58b2..08509832c0ca2 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -46,6 +46,7 @@ type memIndexReader struct { outputOffset []int // belowHandleCols is the handle's position of the below scan plan. belowHandleCols plannercore.HandleCols + cacheTable kv.MemBuffer } func buildMemIndexReader(us *UnionScanExec, idxReader *IndexReaderExecutor) *memIndexReader { @@ -64,6 +65,7 @@ func buildMemIndexReader(us *UnionScanExec, idxReader *IndexReaderExecutor) *mem retFieldTypes: retTypes(us), outputOffset: outputOffset, belowHandleCols: us.belowHandleCols, + cacheTable: us.cacheTable, } } @@ -92,7 +94,7 @@ func (m *memIndexReader) getMemRows() ([][]types.Datum, error) { } mutableRow := chunk.MutRowFromTypes(m.retFieldTypes) - err := iterTxnMemBuffer(m.table.ID, m.ctx, m.kvRanges, func(key, value []byte) error { + err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error { data, err := m.decodeIndexKeyValue(key, value, tps) if err != nil { return err @@ -152,6 +154,7 @@ type memTableReader struct { colIDs map[int64]int buffer allocBuf pkColIDs []int64 + cacheTable kv.MemBuffer } type allocBuf struct { @@ -194,14 +197,15 @@ func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *mem handleBytes: make([]byte, 0, 16), rd: rd, }, - pkColIDs: pkColIDs, + pkColIDs: pkColIDs, + cacheTable: us.cacheTable, } } // TODO: Try to make memXXXReader lazy, There is no need to decode many rows when parent operator only need 1 row. func (m *memTableReader) getMemRows() ([][]types.Datum, error) { mutableRow := chunk.MutRowFromTypes(m.retFieldTypes) - err := iterTxnMemBuffer(m.table.ID, m.ctx, m.kvRanges, func(key, value []byte) error { + err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error { row, err := m.decodeRecordKeyValue(key, value) if err != nil { return err @@ -319,7 +323,7 @@ func hasColVal(data [][]byte, colIDs map[int64]int, id int64) bool { type processKVFunc func(key, value []byte) error -func iterTxnMemBuffer(tblID int64, ctx sessionctx.Context, kvRanges []kv.KeyRange, fn processKVFunc) error { +func iterTxnMemBuffer(ctx sessionctx.Context, cacheTable kv.MemBuffer, kvRanges []kv.KeyRange, fn processKVFunc) error { txn, err := ctx.Txn(true) if err != nil { return err @@ -327,10 +331,16 @@ func iterTxnMemBuffer(tblID int64, ctx sessionctx.Context, kvRanges []kv.KeyRang for _, rg := range kvRanges { iter := txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey) - iter, err = getMemIter(tblID, ctx, iter, rg) + snapCacheIter, err := getSnapIter(ctx, cacheTable, rg) if err != nil { return err } + if snapCacheIter != nil { + iter, err = transaction.NewUnionIter(iter, snapCacheIter, false) + if err != nil { + return err + } + } for ; iter.Valid(); err = iter.Next() { if err != nil { return err @@ -348,7 +358,7 @@ func iterTxnMemBuffer(tblID int64, ctx sessionctx.Context, kvRanges []kv.KeyRang return nil } -func getMemIter(tblID int64, ctx sessionctx.Context, iter kv.Iterator, rg kv.KeyRange) (kv.Iterator, error) { +func getSnapIter(ctx sessionctx.Context, cacheTable kv.MemBuffer, rg kv.KeyRange) (kv.Iterator, error) { var snapCacheIter kv.Iterator tempTableData := ctx.GetSessionVars().TemporaryTableData if tempTableData != nil { @@ -357,25 +367,14 @@ func getMemIter(tblID int64, ctx sessionctx.Context, iter kv.Iterator, rg kv.Key return nil, err } snapCacheIter = snapIter - } - cond, buffer := ctx.GetSessionVars().StmtCtx.GetCacheTable(tblID) - if cond { - cacheIter, err := buffer.(kv.MemBuffer).Iter(rg.StartKey, rg.EndKey) + } else if cacheTable != nil { + cacheIter, err := cacheTable.Iter(rg.StartKey, rg.EndKey) if err != nil { return nil, errors.Trace(err) } snapCacheIter = cacheIter } - - if snapCacheIter == nil { - return iter, nil - } - - newIter, err := transaction.NewUnionIter(iter, snapCacheIter, false) - if err != nil { - return nil, err - } - return newIter, nil + return snapCacheIter, nil } func reverseDatumSlice(rows [][]types.Datum) { @@ -386,7 +385,7 @@ func reverseDatumSlice(rows [][]types.Datum) { func (m *memIndexReader) getMemRowsHandle() ([]kv.Handle, error) { handles := make([]kv.Handle, 0, m.addedRowsLen) - err := iterTxnMemBuffer(m.table.ID, m.ctx, m.kvRanges, func(key, value []byte) error { + err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error { handle, err := tablecodec.DecodeIndexHandle(key, value, len(m.index.Columns)) if err != nil { return err @@ -421,6 +420,8 @@ type memIndexLookUpReader struct { partitionMode bool // if it is accessing a partition table partitionTables []table.PhysicalTable // partition tables to access partitionKVRanges [][]kv.KeyRange // kv ranges for these partition tables + + cacheTable kv.MemBuffer } func buildMemIndexLookUpReader(us *UnionScanExec, idxLookUpReader *IndexLookUpExecutor) *memIndexLookUpReader { @@ -435,6 +436,7 @@ func buildMemIndexLookUpReader(us *UnionScanExec, idxLookUpReader *IndexLookUpEx retFieldTypes: retTypes(us), outputOffset: outputOffset, belowHandleCols: us.belowHandleCols, + cacheTable: us.cacheTable, } return &memIndexLookUpReader{ @@ -450,6 +452,7 @@ func buildMemIndexLookUpReader(us *UnionScanExec, idxLookUpReader *IndexLookUpEx partitionMode: idxLookUpReader.partitionTableMode, partitionKVRanges: idxLookUpReader.partitionKVRanges, partitionTables: idxLookUpReader.prunedPartitions, + cacheTable: us.cacheTable, } } @@ -517,6 +520,7 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) { handleBytes: make([]byte, 0, 16), rd: rd, }, + cacheTable: m.cacheTable, } return memTblReader.getMemRows() diff --git a/executor/table_reader.go b/executor/table_reader.go index 88b7c1751ccdd..958b8cc442061 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -180,7 +180,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Treat temporary table as dummy table, avoid sending distsql request to TiKV. // Calculate the kv ranges here, UnionScan rely on this kv ranges. // cached table and temporary table are similar - if (e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone) || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) { + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { kvReq, err := e.buildKVReq(ctx, firstPartRanges) if err != nil { return err @@ -218,7 +218,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Next fills data into the chunk passed by its caller. // The task was actually done by tableReaderHandler. func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - if (e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone) || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) { + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { // Treat temporary table as dummy table, avoid sending distsql request to TiKV. req.Reset() return nil @@ -263,7 +263,7 @@ func fillExtraPIDColumn(req *chunk.Chunk, extraPIDColumnIndex int, physicalID in // Close implements the Executor Close interface. func (e *TableReaderExecutor) Close() error { - if (e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone) || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) { + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { return nil } diff --git a/executor/union_scan.go b/executor/union_scan.go index e08e90249375c..c796d36bb6d31 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -56,6 +56,9 @@ type UnionScanExec struct { // virtualColumnIndex records all the indices of virtual columns and sort them in definition // to make sure we can compute the virtual column in right order. virtualColumnIndex []int + + // cacheTable not nil means it's reading from cached table. + cacheTable kv.MemBuffer } // Open implements the Executor Open interface. @@ -202,6 +205,10 @@ func (us *UnionScanExec) getOneRow(ctx context.Context) ([]types.Datum, error) { } func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, error) { + if us.cacheTable != nil { + // From cache table, so the snapshot is nil + return nil, nil + } if us.cursor4SnapshotRows < len(us.snapshotRows) { return us.snapshotRows[us.cursor4SnapshotRows], nil } diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 575a177697325..d09f5a00b76d5 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -51,6 +51,7 @@ func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty) us := PhysicalUnionScan{ Conditions: p.conditions, HandleCols: p.handleCols, + CacheTable: p.cacheTable, }.Init(p.ctx, p.stats, p.blockOffset, childProp) return []PhysicalPlan{us}, true, nil } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 9fcd85a6800d3..7e8bd7ba6688c 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -4150,11 +4150,11 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as if err != nil { return nil, err } - // Use the txn of the transaction to determine whether the cache can be read. - buffer, cond := cachedTable.TryGetMemcache(txn.StartTS()) - if cond { - b.ctx.GetSessionVars().StmtCtx.StoreCacheTable(tbl.Meta().ID, buffer) - us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset()) + // Use the TS of the transaction to determine whether the cache can be used. + cacheData := cachedTable.TryReadFromCache(txn.StartTS()) + if cacheData != nil { + sessionVars.StmtCtx.ReadFromTableCache = true + us := LogicalUnionScan{handleCols: handleCols, cacheTable: cacheData}.Init(b.ctx, b.getSelectOffset()) us.SetChildren(ds) result = us } else { diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 173de775558bb..0401ae967283d 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/model" @@ -514,6 +515,9 @@ type LogicalUnionScan struct { conditions []expression.Expression handleCols HandleCols + + // cacheTable not nil means it's reading from cached table. + cacheTable kv.MemBuffer } // DataSource represents a tableScan without condition push down. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 52dd75268fec0..6505760e38b86 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -1179,6 +1179,8 @@ type PhysicalUnionScan struct { Conditions []expression.Expression HandleCols HandleCols + + CacheTable kv.MemBuffer } // ExtractCorrelatedCols implements PhysicalPlan interface. diff --git a/planner/core/stats.go b/planner/core/stats.go index ef2d8c258feae..7f0d62da1dc49 100644 --- a/planner/core/stats.go +++ b/planner/core/stats.go @@ -431,8 +431,8 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * } } } - cond, _ := ds.ctx.GetSessionVars().StmtCtx.GetCacheTable(ds.tableInfo.ID) - if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal && !cond { + readFromTableCache := ds.ctx.GetSessionVars().StmtCtx.ReadFromTableCache + if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal && !readFromTableCache { err := ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil) if err != nil { return nil, err diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 72bf0a5c27ed1..df91df21ff3bb 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -175,11 +175,9 @@ type StatementContext struct { // Map to store all CTE storages of current SQL. // Will clean up at the end of the execution. CTEStorageMap interface{} - // cachedTables is used to store cache table id and a pointer to cache data when it satisfies the cache read condition - cachedTables []struct { - id int64 - memBuffer interface{} // is a point to cache.MemBuffer. in order to avoid import cycle - } + + // If the statement read from table cache, this flag is set. + ReadFromTableCache bool // cache is used to reduce object allocation. cache struct { @@ -331,35 +329,6 @@ func (sc *StatementContext) SetPlanHint(hint string) { sc.planHint = hint } -// StoreCacheTable stores the read condition and a point to cache data of the given key. -func (sc *StatementContext) StoreCacheTable(tblID int64, buffer interface{}) { - for _, data := range sc.cachedTables { - if data.id == tblID { - data.memBuffer = buffer - } - return - } - sc.cachedTables = append(sc.cachedTables, struct { - id int64 - memBuffer interface{} - }{id: tblID, memBuffer: buffer}) -} - -// GetCacheTable gets the read condition and a point to cache data of the given key if it exists -func (sc *StatementContext) GetCacheTable(tblID int64) (bool, interface{}) { - for _, data := range sc.cachedTables { - if data.id == tblID { - return true, data.memBuffer - } - } - return false, nil -} - -// CacheTableUsed is used by test to check whether the last query use table cache. -func (sc *StatementContext) CacheTableUsed() bool { - return len(sc.cachedTables) > 0 -} - // TableEntry presents table in db. type TableEntry struct { DB string diff --git a/table/table.go b/table/table.go index c8717686a9428..f39e9b2d9fa8d 100644 --- a/table/table.go +++ b/table/table.go @@ -252,17 +252,10 @@ var MockTableFromMeta func(tableInfo *model.TableInfo) Table type CachedTable interface { Table - // TryGetMemcache Check if the cache table is readable, if it is readable, - // Return the pointer to the MemBuffer and true otherwise return nil and false - TryGetMemcache(ts uint64) (kv.MemBuffer, bool) + // TryReadFromCache checks if the cache table is readable. + TryReadFromCache(ts uint64) kv.MemBuffer // UpdateLockForRead If you cannot meet the conditions of the read buffer, // you need to update the lock information and read the data from the original table UpdateLockForRead(store kv.Storage, ts uint64) error } - -// CacheData pack the cache data and lease -type CacheData struct { - Lease uint64 - kv.MemBuffer -} diff --git a/table/tables/cache.go b/table/tables/cache.go index 0d84b892259e9..ba8786a65bae3 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -40,6 +40,13 @@ type cachedTable struct { handle StateRemote } +// cacheData pack the cache data and lease. +type cacheData struct { + Start uint64 + Lease uint64 + kv.MemBuffer +} + func leaseFromTS(ts uint64) uint64 { // TODO make this configurable in the following PRs const defaultLeaseDuration time.Duration = 3 * time.Second @@ -58,16 +65,16 @@ func newMemBuffer(store kv.Storage) (kv.MemBuffer, error) { return buffTxn.GetMemBuffer(), nil } -func (c *cachedTable) TryGetMemcache(ts uint64) (kv.MemBuffer, bool) { +func (c *cachedTable) TryReadFromCache(ts uint64) kv.MemBuffer { tmp := c.cacheData.Load() if tmp == nil { - return nil, false + return nil } - data := tmp.(*table.CacheData) - if data.Lease > ts { - return data.MemBuffer, true + data := tmp.(*cacheData) + if ts >= data.Start && ts < data.Lease { + return data } - return nil, false + return nil } var mockStateRemote = struct { @@ -90,42 +97,45 @@ func NewCachedTable(tbl *TableCommon) (table.Table, error) { return ret, nil } -func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) (kv.MemBuffer, error) { +func (c *cachedTable) loadDataFromOriginalTable(store kv.Storage, lease uint64) (kv.MemBuffer, uint64, error) { buffer, err := newMemBuffer(store) if err != nil { - return nil, err + return nil, 0, err } - err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { + var startTS uint64 + err = kv.RunInNewTxn(context.Background(), store, true, func(ctx context.Context, txn kv.Transaction) error { prefix := tablecodec.GenTablePrefix(c.tableID) if err != nil { - return err + return errors.Trace(err) } - if txn.StartTS() >= lease { + startTS = txn.StartTS() + if startTS >= lease { return errors.New("the loaded data is outdated for caching") } it, err := txn.Iter(prefix, prefix.PrefixNext()) if err != nil { - return err + return errors.Trace(err) } defer it.Close() + for it.Valid() && it.Key().HasPrefix(prefix) { value := it.Value() err = buffer.Set(it.Key(), value) if err != nil { - return err + return errors.Trace(err) } err = it.Next() if err != nil { - return err + return errors.Trace(err) } } return nil }) if err != nil { - return nil, err + return nil, 0, err } - return buffer, nil + return buffer, startTS, nil } func (c *cachedTable) UpdateLockForRead(store kv.Storage, ts uint64) error { @@ -137,12 +147,13 @@ func (c *cachedTable) UpdateLockForRead(store kv.Storage, ts uint64) error { return errors.Trace(err) } if succ { - mb, err := c.loadDataFromOriginalTable(store, lease) + mb, startTS, err := c.loadDataFromOriginalTable(store, lease) if err != nil { return errors.Trace(err) } - c.cacheData.Store(&table.CacheData{ + c.cacheData.Store(&cacheData{ + Start: startTS, Lease: lease, MemBuffer: mb, }) diff --git a/table/tables/cache_test.go b/table/tables/cache_test.go index e92c6689aa6ce..4cbd1a3578569 100644 --- a/table/tables/cache_test.go +++ b/table/tables/cache_test.go @@ -53,12 +53,15 @@ func TestCacheTableBasicScan(t *testing.T) { tk.MustQuery("select *from join_t2").Check(testkit.Rows("2")) tk.MustExec("create table join_t3 (id int)") tk.MustExec("insert into join_t3 values(3)") + planUsed := false for i := 0; i < 10; i++ { tk.MustQuery("select *from join_t1 join join_t2").Check(testkit.Rows("1 2")) - if tk.Session().GetSessionVars().StmtCtx.CacheTableUsed() { + if tk.HasPlan("select *from join_t1 join join_t2", "UnionScan") { + planUsed = true break } } + require.True(t, planUsed) result := tk.MustQuery("explain format = 'brief' select *from join_t1 join join_t2") result.Check(testkit.Rows( "HashJoin 100000000.00 root CARTESIAN inner join", @@ -71,10 +74,12 @@ func TestCacheTableBasicScan(t *testing.T) { // Test for join a cache table and a normal table for i := 0; i < 10; i++ { tk.MustQuery("select *from join_t1 join join_t3").Check(testkit.Rows("1 3")) - if tk.Session().GetSessionVars().StmtCtx.CacheTableUsed() { + if tk.HasPlan("select *from join_t1 join join_t3", "UnionScan") { + planUsed = true break } } + require.True(t, planUsed) result = tk.MustQuery("explain format = 'brief' select *from join_t1 join join_t3") result.Check(testkit.Rows( "Projection 100000000.00 root test.join_t1.id, test.join_t3.id", @@ -86,15 +91,17 @@ func TestCacheTableBasicScan(t *testing.T) { " └─TableFullScan 10000.00 cop[tikv] table:join_t3 keep order:false, stats:pseudo")) // Second read will from cache table - for i := 0; i < 10; i++ { + for i := 0; i < 100; i++ { tk.MustQuery("select * from tmp1 where id>4 order by id").Check(testkit.Rows( "5 105 1005", "7 117 1007", "9 109 1009", "10 110 1010", "12 112 1012", "14 114 1014", "16 116 1016", "18 118 1018", )) - if tk.Session().GetSessionVars().StmtCtx.CacheTableUsed() { + if tk.HasPlan("select * from tmp1 where id>4 order by id", "UnionScan") { + planUsed = true break } } + require.True(t, planUsed) result = tk.MustQuery("explain format = 'brief' select * from tmp1 where id>4 order by id") result.Check(testkit.Rows("UnionScan 3333.33 root gt(test.tmp1.id, 4)", "└─TableReader 3333.33 root data:TableRangeScan", @@ -105,10 +112,12 @@ func TestCacheTableBasicScan(t *testing.T) { "5 105 1005", "9 109 1009", "10 110 1010", "12 112 1012", "3 113 1003", "14 114 1014", "16 116 1016", "7 117 1007", "18 118 1018", )) - if tk.Session().GetSessionVars().StmtCtx.CacheTableUsed() { + if tk.HasPlan("select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u", "UnionScan") { + planUsed = true break } } + require.True(t, planUsed) result = tk.MustQuery("explain format = 'brief' select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u") result.Check(testkit.Rows("UnionScan 3333.33 root gt(test.tmp1.u, 101)", "└─IndexLookUp 3333.33 root ", @@ -234,3 +243,49 @@ func TestCacheTableComplexRead(t *testing.T) { tk1.HasPlan("select *from complex_cache where id > 7", "UnionScan") tk1.MustExec("commit") } + +func TestBeginSleepABA(t *testing.T) { + // During the change "cache1 -> no cache -> cache2", + // cache1 and cache2 may be not the same anymore + // A transaction should not only check the cache exists, but also check the cache unchanged. + + store, clean := testkit.CreateMockStore(t) + defer clean() + tk1 := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk2.MustExec("use test") + tk1.MustExec("drop table if exists aba") + tk1.MustExec("create table aba (id int, v int)") + tk1.MustExec("insert into aba values (1, 1)") + tk1.MustExec("alter table aba cache") + tk1.MustQuery("select * from aba").Check(testkit.Rows("1 1")) + + // Begin, read from cache. + tk1.MustExec("begin") + cacheUsed := false + for i := 0; i < 100; i++ { + if tk1.HasPlan("select * from aba", "UnionScan") { + cacheUsed = true + break + } + } + require.True(t, cacheUsed) + + // Another session change the data and make the cache unavailable. + tk2.MustExec("update aba set v = 2") + + // And then make the cache available again. + for i := 0; i < 50; i++ { + tk2.MustQuery("select * from aba").Check(testkit.Rows("1 2")) + if tk2.HasPlan("select * from aba", "UnionScan") { + cacheUsed = true + break + } + time.Sleep(100 * time.Millisecond) + } + require.True(t, cacheUsed) + + // tk1 should not use the staled cache, because the data is changed. + require.False(t, tk1.HasPlan("select * from aba", "UnionScan")) +} From 77f01c389e09d303e07276c119e97b0d51790a82 Mon Sep 17 00:00:00 2001 From: fengou1 <85682690+fengou1@users.noreply.github.com> Date: Thu, 11 Nov 2021 18:45:06 +0800 Subject: [PATCH 2/5] BR: Git action improvement (#29623) --- .github/workflows/compile_br.yaml | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/.github/workflows/compile_br.yaml b/.github/workflows/compile_br.yaml index da5ef43198ffc..acfbc2d27bad5 100644 --- a/.github/workflows/compile_br.yaml +++ b/.github/workflows/compile_br.yaml @@ -13,7 +13,10 @@ on: - '!br/docs/**' - '!br/tests/**' - '!br/docker/**' +#change trigger policy pull_request: + types: + - labeled # <-- branches: - master - 'release-[0-9].[0-9]*' @@ -35,8 +38,25 @@ concurrency: cancel-in-progress: true jobs: + compile-windows: + if: github.event_name == 'push' || github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build' + name: Compile for Windows job + runs-on: windows-latest + steps: + - uses: actions/checkout@v2.1.0 + + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.16 + + - name: Run build + run: make build_tools + compile: + if: github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build' name: Compile for ${{ matrix.os }} / ${{ matrix.target}} + runs-on: ${{ matrix.os }} strategy: matrix: @@ -47,8 +67,6 @@ jobs: - os: ubuntu-latest target: aarch64-unknown-linux-gnu - - os: windows-latest - target: x86_64-pc-windows-msvc steps: - uses: actions/checkout@v2.1.0 @@ -61,6 +79,7 @@ jobs: run: make build_tools compile-freebsd: + if: github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build' name: Compile for FreeBSD job runs-on: ubuntu-latest steps: From 6dcbafbde8bbfb50cf338d9632baa86105e6b6bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Thu, 11 Nov 2021 18:59:00 +0800 Subject: [PATCH 3/5] *: remove old placement code for exchange partition (#29649) * *: remove old placement code for exchange partition * modify some name * fix bug * add some comments Co-authored-by: Arenatlx <314806019@qq.com> Co-authored-by: Ti Chi Robot --- ddl/partition.go | 65 ++++++++++---- ddl/placement_policy_test.go | 158 +++++++++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+), 16 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index a8924f5cd5e51..784e69eb4d9a5 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1397,23 +1397,14 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo // the follow code is a swap function for rules of two partitions // though partitions has exchanged their ID, swap still take effect - bundles := make([]*placement.Bundle, 0, 2) - ptBundle, ptOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(partDef.ID)) - ptOK = ptOK && !ptBundle.IsEmpty() - ntBundle, ntOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(nt.ID)) - ntOK = ntOK && !ntBundle.IsEmpty() - if ptOK && ntOK { - bundles = append(bundles, ptBundle.Clone().Reset(placement.RuleIndexPartition, []int64{nt.ID})) - bundles = append(bundles, ntBundle.Clone().Reset(placement.RuleIndexPartition, []int64{partDef.ID})) - } else if ptOK { - bundles = append(bundles, placement.NewBundle(partDef.ID)) - bundles = append(bundles, ptBundle.Clone().Reset(placement.RuleIndexPartition, []int64{nt.ID})) - } else if ntOK { - bundles = append(bundles, placement.NewBundle(nt.ID)) - bundles = append(bundles, ntBundle.Clone().Reset(placement.RuleIndexPartition, []int64{partDef.ID})) - } - err = infosync.PutRuleBundles(context.TODO(), bundles) + + bundles, err := bundlesForExchangeTablePartition(t, job, pt, partDef, nt) if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil { job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } @@ -1463,6 +1454,48 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, nil } +func bundlesForExchangeTablePartition(t *meta.Meta, job *model.Job, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { + bundles := make([]*placement.Bundle, 0, 3) + + ptBundle, err := newBundleFromTblInfo(t, job, pt) + if err != nil { + return nil, errors.Trace(err) + } + if ptBundle != nil { + bundles = append(bundles, ptBundle) + } + + parBundle, err := newBundleFromPartitionDef(t, job, *newPar) + if err != nil { + return nil, errors.Trace(err) + } + if parBundle != nil { + bundles = append(bundles, parBundle) + } + + ntBundle, err := newBundleFromTblInfo(t, job, nt) + if err != nil { + return nil, errors.Trace(err) + } + if ntBundle != nil { + bundles = append(bundles, ntBundle) + } + + if parBundle == nil && ntBundle != nil { + // newPar.ID is the ID of old table to exchange, so ntBundle != nil means it has some old placement settings. + // We should remove it in this situation + bundles = append(bundles, placement.NewBundle(newPar.ID)) + } + + if parBundle != nil && ntBundle == nil { + // nt.ID is the ID of old partition to exchange, so parBundle != nil means it has some old placement settings. + // We should remove it in this situation + bundles = append(bundles, placement.NewBundle(nt.ID)) + } + + return bundles, nil +} + func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, index int, schemaName, tableName model.CIStr) error { var sql string var paramList []interface{} diff --git a/ddl/placement_policy_test.go b/ddl/placement_policy_test.go index 35e28bf0a2b71..65b5e7994f713 100644 --- a/ddl/placement_policy_test.go +++ b/ddl/placement_policy_test.go @@ -1386,3 +1386,161 @@ func (s *testDBSuite6) TestTruncateTablePartitionWithPlacement(c *C) { " PARTITION `p3` VALUES LESS THAN (100000) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2\" */\n" + ")")) } + +func (s *testDBSuite6) TestExchangePartitionWithPlacement(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set @@tidb_enable_exchange_partition=1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2, tp") + tk.MustExec("drop placement policy if exists p1") + tk.MustExec("drop placement policy if exists p2") + + tk.MustExec("create placement policy p1 primary_region='r1' regions='r1'") + defer tk.MustExec("drop placement policy p1") + + tk.MustExec("create placement policy p2 primary_region='r2' regions='r2'") + defer tk.MustExec("drop placement policy p2") + + policy1, ok := tk.Se.GetInfoSchema().(infoschema.InfoSchema).PolicyByName(model.NewCIStr("p1")) + c.Assert(ok, IsTrue) + + policy2, ok := tk.Se.GetInfoSchema().(infoschema.InfoSchema).PolicyByName(model.NewCIStr("p2")) + c.Assert(ok, IsTrue) + + tk.MustExec(`CREATE TABLE t1 (id INT) placement policy p1`) + defer tk.MustExec("drop table t1") + + tk.MustExec(`CREATE TABLE t2 (id INT)`) + defer tk.MustExec("drop table t2") + + t1, err := tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + c.Assert(err, IsNil) + t1ID := t1.Meta().ID + + t2, err := tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) + c.Assert(err, IsNil) + t2ID := t2.Meta().ID + + tk.MustExec(`CREATE TABLE tp (id INT) primary_region="r1" regions="r1" PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (100), + PARTITION p1 VALUES LESS THAN (1000) placement policy p2, + PARTITION p2 VALUES LESS THAN (10000) primary_region="r1" regions="r1,r2" + );`) + defer tk.MustExec("drop table tp") + + tp, err := tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) + c.Assert(err, IsNil) + tpID := tp.Meta().ID + par0ID := tp.Meta().Partition.Definitions[0].ID + par1ID := tp.Meta().Partition.Definitions[1].ID + par2ID := tp.Meta().Partition.Definitions[2].ID + + // exchange par0, t1 + tk.MustExec("alter table tp exchange partition p0 with table t1") + tk.MustQuery("show create table t1").Check(testkit.Rows("" + + "t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */")) + tk.MustQuery("show create table tp").Check(testkit.Rows("" + + "tp CREATE TABLE `tp` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1\" */\n" + + "PARTITION BY RANGE ( `id` ) (\n" + + " PARTITION `p0` VALUES LESS THAN (100),\n" + + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + + " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" + + ")")) + tp, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) + c.Assert(err, IsNil) + c.Assert(tp.Meta().ID, Equals, tpID) + c.Assert(tp.Meta().Partition.Definitions[0].ID, Equals, t1ID) + c.Assert(tp.Meta().Partition.Definitions[0].DirectPlacementOpts, IsNil) + c.Assert(tp.Meta().Partition.Definitions[0].PlacementPolicyRef, IsNil) + t1, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + c.Assert(err, IsNil) + c.Assert(t1.Meta().ID, Equals, par0ID) + c.Assert(t1.Meta().DirectPlacementOpts, IsNil) + c.Assert(t1.Meta().PlacementPolicyRef.ID, Equals, policy1.ID) + + // exchange par0, t2 + tk.MustExec("alter table tp exchange partition p0 with table t2") + tk.MustQuery("show create table t2").Check(testkit.Rows("" + + "t2 CREATE TABLE `t2` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustQuery("show create table tp").Check(testkit.Rows("" + + "tp CREATE TABLE `tp` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1\" */\n" + + "PARTITION BY RANGE ( `id` ) (\n" + + " PARTITION `p0` VALUES LESS THAN (100),\n" + + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + + " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" + + ")")) + tp, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) + c.Assert(err, IsNil) + c.Assert(tp.Meta().ID, Equals, tpID) + c.Assert(tp.Meta().Partition.Definitions[0].ID, Equals, t2ID) + c.Assert(tp.Meta().Partition.Definitions[0].DirectPlacementOpts, IsNil) + c.Assert(tp.Meta().Partition.Definitions[0].PlacementPolicyRef, IsNil) + t2, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) + c.Assert(err, IsNil) + c.Assert(t2.Meta().ID, Equals, t1ID) + c.Assert(t2.Meta().DirectPlacementOpts, IsNil) + c.Assert(t2.Meta().PlacementPolicyRef, IsNil) + + // exchange par1, t1 + tk.MustExec("alter table tp exchange partition p1 with table t1") + tk.MustQuery("show create table t1").Check(testkit.Rows("" + + "t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */")) + tk.MustQuery("show create table tp").Check(testkit.Rows("" + + "tp CREATE TABLE `tp` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1\" */\n" + + "PARTITION BY RANGE ( `id` ) (\n" + + " PARTITION `p0` VALUES LESS THAN (100),\n" + + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + + " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" + + ")")) + tp, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) + c.Assert(err, IsNil) + c.Assert(tp.Meta().ID, Equals, tpID) + c.Assert(tp.Meta().Partition.Definitions[1].ID, Equals, par0ID) + c.Assert(tp.Meta().Partition.Definitions[1].DirectPlacementOpts, IsNil) + c.Assert(tp.Meta().Partition.Definitions[1].PlacementPolicyRef.ID, Equals, policy2.ID) + t1, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + c.Assert(err, IsNil) + c.Assert(t1.Meta().ID, Equals, par1ID) + c.Assert(t1.Meta().DirectPlacementOpts, IsNil) + c.Assert(t1.Meta().PlacementPolicyRef.ID, Equals, policy1.ID) + + // exchange par2, t2 + tk.MustExec("alter table tp exchange partition p2 with table t2") + tk.MustQuery("show create table t2").Check(testkit.Rows("" + + "t2 CREATE TABLE `t2` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustQuery("show create table tp").Check(testkit.Rows("" + + "tp CREATE TABLE `tp` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1\" */\n" + + "PARTITION BY RANGE ( `id` ) (\n" + + " PARTITION `p0` VALUES LESS THAN (100),\n" + + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + + " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" + + ")")) + tp, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) + c.Assert(err, IsNil) + c.Assert(tp.Meta().ID, Equals, tpID) + c.Assert(tp.Meta().Partition.Definitions[2].ID, Equals, t1ID) + c.Assert(tp.Meta().Partition.Definitions[2].DirectPlacementOpts.PrimaryRegion, Equals, "r1") + c.Assert(tp.Meta().Partition.Definitions[2].DirectPlacementOpts.Regions, Equals, "r1,r2") + c.Assert(tp.Meta().Partition.Definitions[2].PlacementPolicyRef, IsNil) + t2, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) + c.Assert(err, IsNil) + c.Assert(t2.Meta().ID, Equals, par2ID) + c.Assert(t2.Meta().DirectPlacementOpts, IsNil) + c.Assert(t2.Meta().PlacementPolicyRef, IsNil) +} From 6512fe5a497299f3b856988a744067496742f58c Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Thu, 11 Nov 2021 19:15:05 +0800 Subject: [PATCH 4/5] lightning: fix checksum mismatch when parallel import data (#29695) --- br/pkg/lightning/restore/table_restore.go | 3 +- .../lightning_distributed_import/config.toml | 9 +++++ .../distributed_import-schema-create.sql | 1 + .../data1/distributed_import.t-schema.sql | 1 + .../data1/distributed_import.t.csv | 5 +++ .../distributed_import-schema-create.sql | 1 + .../data2/distributed_import.t-schema.sql | 1 + .../data2/distributed_import.t.csv | 5 +++ br/tests/lightning_distributed_import/run.sh | 36 +++++++++++++++++++ 9 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 br/tests/lightning_distributed_import/config.toml create mode 100644 br/tests/lightning_distributed_import/data1/distributed_import-schema-create.sql create mode 100644 br/tests/lightning_distributed_import/data1/distributed_import.t-schema.sql create mode 100644 br/tests/lightning_distributed_import/data1/distributed_import.t.csv create mode 100644 br/tests/lightning_distributed_import/data2/distributed_import-schema-create.sql create mode 100644 br/tests/lightning_distributed_import/data2/distributed_import.t-schema.sql create mode 100644 br/tests/lightning_distributed_import/data2/distributed_import.t.csv create mode 100644 br/tests/lightning_distributed_import/run.sh diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 43416d8157099..8da5a210ce885 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -796,7 +796,8 @@ func (tr *TableRestore) postProcess( nextStage = checkpoints.CheckpointStatusChecksumSkipped } - if err == nil { + // Don't call FinishTable when other lightning will calculate checksum. + if err == nil && !hasDupe && needChecksum { err = metaMgr.FinishTable(ctx) } diff --git a/br/tests/lightning_distributed_import/config.toml b/br/tests/lightning_distributed_import/config.toml new file mode 100644 index 0000000000000..200af8e45dfdc --- /dev/null +++ b/br/tests/lightning_distributed_import/config.toml @@ -0,0 +1,9 @@ +[tikv-importer] +backend = 'local' +duplicate-resolution = 'none' + +[post-restore] +checksum = "required" + +[mydumper.csv] +header = false diff --git a/br/tests/lightning_distributed_import/data1/distributed_import-schema-create.sql b/br/tests/lightning_distributed_import/data1/distributed_import-schema-create.sql new file mode 100644 index 0000000000000..19c586879a959 --- /dev/null +++ b/br/tests/lightning_distributed_import/data1/distributed_import-schema-create.sql @@ -0,0 +1 @@ +create database distributed_import; diff --git a/br/tests/lightning_distributed_import/data1/distributed_import.t-schema.sql b/br/tests/lightning_distributed_import/data1/distributed_import.t-schema.sql new file mode 100644 index 0000000000000..7cf7f72809d33 --- /dev/null +++ b/br/tests/lightning_distributed_import/data1/distributed_import.t-schema.sql @@ -0,0 +1 @@ +create table t(a int primary key, b varchar(255), c double); diff --git a/br/tests/lightning_distributed_import/data1/distributed_import.t.csv b/br/tests/lightning_distributed_import/data1/distributed_import.t.csv new file mode 100644 index 0000000000000..7ee53cee3916e --- /dev/null +++ b/br/tests/lightning_distributed_import/data1/distributed_import.t.csv @@ -0,0 +1,5 @@ +1,a1,1.1 +3,b3,3.3 +5,c5,5.5 +7,d7,7.7 +9,e9,9.9 diff --git a/br/tests/lightning_distributed_import/data2/distributed_import-schema-create.sql b/br/tests/lightning_distributed_import/data2/distributed_import-schema-create.sql new file mode 100644 index 0000000000000..19c586879a959 --- /dev/null +++ b/br/tests/lightning_distributed_import/data2/distributed_import-schema-create.sql @@ -0,0 +1 @@ +create database distributed_import; diff --git a/br/tests/lightning_distributed_import/data2/distributed_import.t-schema.sql b/br/tests/lightning_distributed_import/data2/distributed_import.t-schema.sql new file mode 100644 index 0000000000000..7cf7f72809d33 --- /dev/null +++ b/br/tests/lightning_distributed_import/data2/distributed_import.t-schema.sql @@ -0,0 +1 @@ +create table t(a int primary key, b varchar(255), c double); diff --git a/br/tests/lightning_distributed_import/data2/distributed_import.t.csv b/br/tests/lightning_distributed_import/data2/distributed_import.t.csv new file mode 100644 index 0000000000000..1baadab31feac --- /dev/null +++ b/br/tests/lightning_distributed_import/data2/distributed_import.t.csv @@ -0,0 +1,5 @@ +2,a2,2.2 +4,b4,4.4 +6,c6,6.6 +8,d8,8.8 +10,e10,10.10 diff --git a/br/tests/lightning_distributed_import/run.sh b/br/tests/lightning_distributed_import/run.sh new file mode 100644 index 0000000000000..f640ec3159c75 --- /dev/null +++ b/br/tests/lightning_distributed_import/run.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# +# Copyright 2021 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +LOG_FILE1="$TEST_DIR/lightning-distributed-import1.log" +LOG_FILE2="$TEST_DIR/lightning-distributed-import2.log" + +# let lightning run a bit slow to avoid some table in the first lightning finish too fast. +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(50)" + +run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted1" \ + -d "tests/$TEST_NAME/data1" --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config.toml" & +pid1="$!" + +run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted2" \ + -d "tests/$TEST_NAME/data2" --log-file "$LOG_FILE2" --config "tests/$TEST_NAME/config.toml" & +pid2="$!" + +wait "$pid1" "$pid2" + +run_sql 'select count(*) from distributed_import.t' +check_contains 'count(*): 10' From 58c02d5bb7b25aa810c8279c5eb7de95a0362356 Mon Sep 17 00:00:00 2001 From: Zak Zhao <57036248+joccau@users.noreply.github.com> Date: Thu, 11 Nov 2021 19:41:05 +0800 Subject: [PATCH 5/5] br: Add system table from v5.3.0 into blacklist which shouldn't be restored (#29701) --- br/pkg/restore/systable_restore.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/br/pkg/restore/systable_restore.go b/br/pkg/restore/systable_restore.go index 1f7d6bb12ca9c..f6235d195a850 100644 --- a/br/pkg/restore/systable_restore.go +++ b/br/pkg/restore/systable_restore.go @@ -34,15 +34,16 @@ var unRecoverableTable = map[string]struct{}{ "global_variables": {}, // all user related tables cannot be recovered for now. - "columns_priv": {}, - "db": {}, - "default_roles": {}, - "global_grants": {}, - "global_priv": {}, - "role_edges": {}, - "tables_priv": {}, - "user": {}, - + "column_stats_usage": {}, + "columns_priv": {}, + "db": {}, + "default_roles": {}, + "global_grants": {}, + "global_priv": {}, + "role_edges": {}, + "tables_priv": {}, + "user": {}, + "capture_plan_baselines_blacklist": {}, // gc info don't need to recover. "gc_delete_range": {}, "gc_delete_range_done": {},