Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dml : dml read operation for cache table #29184

Merged
merged 18 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ type IndexReaderExecutor struct {

// Close clears all resources hold by current object.
func (e *IndexReaderExecutor) Close() error {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) {
return nil
}

Expand All @@ -210,7 +210,7 @@ func (e *IndexReaderExecutor) Close() error {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) {
req.Reset()
return nil
}
Expand Down Expand Up @@ -262,7 +262,13 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {

return e.open(ctx, kvRanges)
}

func isReadFromCache(tbl table.Table, stmt *stmtctx.StatementContext) bool {
if tbl != nil && tbl.Meta() != nil && tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cond, _ := stmt.GetCacheTable(tbl.Meta().ID)
return cond
}
return false
}
func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
Expand All @@ -279,7 +285,9 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.kvRanges = kvRanges
// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
// In a test case IndexReaderExecutor is mocked and e.table is nil.
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
// Cache table is similar with temporary table, if it satisfies the read condition.
// Avoid sending distsql request to TIKV.
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) {
return nil
}

Expand Down Expand Up @@ -403,8 +411,8 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
return err
}

// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
if e.table.Meta().TempTableType != model.TempTableNone {
// Treat temporary table as dummy table, avoid sending distsql request to TiKV. Cache table is similar with temporary table.
if e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) {
return nil
}

Expand Down Expand Up @@ -667,7 +675,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
if e.table.Meta().TempTableType != model.TempTableNone {
if e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) {
return nil
}

Expand All @@ -691,7 +699,7 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table.Meta().TempTableType != model.TempTableNone {
if e.table.Meta().TempTableType != model.TempTableNone || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) {
req.Reset()
return nil
}
Expand Down
53 changes: 37 additions & 16 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (m *memIndexReader) getMemRows() ([][]types.Datum, error) {
}

mutableRow := chunk.MutRowFromTypes(m.retFieldTypes)
err := iterTxnMemBuffer(m.ctx, m.kvRanges, func(key, value []byte) error {
err := iterTxnMemBuffer(m.table.ID, m.ctx, m.kvRanges, func(key, value []byte) error {
data, err := m.decodeIndexKeyValue(key, value, tps)
if err != nil {
return err
Expand Down Expand Up @@ -201,7 +201,7 @@ func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *mem
// TODO: Try to make memXXXReader lazy, There is no need to decode many rows when parent operator only need 1 row.
func (m *memTableReader) getMemRows() ([][]types.Datum, error) {
mutableRow := chunk.MutRowFromTypes(m.retFieldTypes)
err := iterTxnMemBuffer(m.ctx, m.kvRanges, func(key, value []byte) error {
err := iterTxnMemBuffer(m.table.ID, m.ctx, m.kvRanges, func(key, value []byte) error {
row, err := m.decodeRecordKeyValue(key, value)
if err != nil {
return err
Expand Down Expand Up @@ -319,27 +319,18 @@ func hasColVal(data [][]byte, colIDs map[int64]int, id int64) bool {

type processKVFunc func(key, value []byte) error

func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn processKVFunc) error {
func iterTxnMemBuffer(tblID int64, ctx sessionctx.Context, kvRanges []kv.KeyRange, fn processKVFunc) error {
txn, err := ctx.Txn(true)
if err != nil {
return err
}

tempTableData := ctx.GetSessionVars().TemporaryTableData
for _, rg := range kvRanges {
iter := txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey)
if tempTableData != nil {
snapIter, err := tempTableData.Iter(rg.StartKey, rg.EndKey)
if err != nil {
return err
}

iter, err = transaction.NewUnionIter(iter, snapIter, false)
if err != nil {
return err
}
iter, err = getMemIter(tblID, ctx, iter, rg)
if err != nil {
return err
}
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved

for ; iter.Valid(); err = iter.Next() {
if err != nil {
return err
Expand All @@ -357,6 +348,36 @@ func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn process
return nil
}

func getMemIter(tblID int64, ctx sessionctx.Context, iter kv.Iterator, rg kv.KeyRange) (kv.Iterator, error) {
var snapCacheIter kv.Iterator
tempTableData := ctx.GetSessionVars().TemporaryTableData
if tempTableData != nil {
snapIter, err := tempTableData.Iter(rg.StartKey, rg.EndKey)
if err != nil {
return nil, err
}
snapCacheIter = snapIter
}
cond, buffer := ctx.GetSessionVars().StmtCtx.GetCacheTable(tblID)
if cond {
cacheIter, err := buffer.(kv.MemBuffer).Iter(rg.StartKey, rg.EndKey)
if err != nil {
return nil, errors.Trace(err)
}
snapCacheIter = cacheIter
}

if snapCacheIter == nil {
return iter, nil
}

newIter, err := transaction.NewUnionIter(iter, snapCacheIter, false)
if err != nil {
return nil, err
}
return newIter, nil
}

func reverseDatumSlice(rows [][]types.Datum) {
for i, j := 0, len(rows)-1; i < j; i, j = i+1, j-1 {
rows[i], rows[j] = rows[j], rows[i]
Expand All @@ -365,7 +386,7 @@ func reverseDatumSlice(rows [][]types.Datum) {

func (m *memIndexReader) getMemRowsHandle() ([]kv.Handle, error) {
handles := make([]kv.Handle, 0, m.addedRowsLen)
err := iterTxnMemBuffer(m.ctx, m.kvRanges, func(key, value []byte) error {
err := iterTxnMemBuffer(m.table.ID, m.ctx, m.kvRanges, func(key, value []byte) error {
handle, err := tablecodec.DecodeIndexHandle(key, value, len(m.index.Columns))
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {

// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
// Calculate the kv ranges here, UnionScan rely on this kv ranges.
if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone {
// cached table and temporary table are similar
if (e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone) || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) {
kvReq, err := e.buildKVReq(ctx, firstPartRanges)
if err != nil {
return err
Expand Down Expand Up @@ -217,7 +218,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone {
if (e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone) || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) {
// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
req.Reset()
return nil
Expand Down Expand Up @@ -262,7 +263,7 @@ func fillExtraPIDColumn(req *chunk.Chunk, extraPIDColumnIndex int, physicalID in

// Close implements the Executor Close interface.
func (e *TableReaderExecutor) Close() error {
if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone {
if (e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone) || isReadFromCache(e.table, e.ctx.GetSessionVars().StmtCtx) {
return nil
}

Expand Down
26 changes: 26 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -4142,6 +4143,31 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
us.SetChildren(ds)
result = us
}
// If a table is a cache table, it is judged whether it satisfies the conditions of read cache.
if tableInfo.TableCacheStatusType == model.TableCacheStatusEnable && b.ctx.GetSessionVars().SnapshotTS == 0 && !b.ctx.GetSessionVars().StmtCtx.IsStaleness {
cachedTable := tbl.(table.CachedTable)
txn, err := b.ctx.Txn(true)
if err != nil {
return nil, err
}
// Use the txn of the transaction to determine whether the cache can be read.
// About read lock and read condition feature. will add in the next pr.
buffer, cond := cachedTable.TryGetMemcache(txn.StartTS())
if cond {
b.ctx.GetSessionVars().StmtCtx.StoreCacheTable(tbl.Meta().ID, buffer)
us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset())
us.SetChildren(ds)
result = us
} else {
go func() {
err := cachedTable.UpdateLockForRead(b.ctx, txn.StartTS())
if err != nil {
log.Warn("Update Lock Info Error")
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
}
}()
}
}

if sessionVars.StmtCtx.TblInfo2UnionScan == nil {
sessionVars.StmtCtx.TblInfo2UnionScan = make(map[*model.TableInfo]bool)
}
Expand Down
3 changes: 2 additions & 1 deletion planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema *
}
}
}
if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal {
cond, _ := ds.ctx.GetSessionVars().StmtCtx.GetCacheTable(ds.tableInfo.ID)
if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal && !cond {
err := ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil)
if err != nil {
return nil, err
Expand Down
31 changes: 29 additions & 2 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,10 @@ type StatementContext struct {
IgnoreNoPartition bool
MaybeOverOptimized4PlanCache bool
IgnoreExplainIDSuffix bool

// If the select statement was like 'select * from t as of timestamp ...' or in a stale read transaction
// or is affected by the tidb_read_staleness session variable, then the statement will be makred as isStaleness
// in stmtCtx
IsStaleness bool

// mu struct holds variables that change during execution.
mu struct {
sync.Mutex
Expand Down Expand Up @@ -177,6 +175,11 @@ type StatementContext struct {
// Map to store all CTE storages of current SQL.
// Will clean up at the end of the execution.
CTEStorageMap interface{}
// cachedTables is used to store cache table id and a pointer to cache data when it satisfies the cache read condition
cachedTables []struct {
id int64
memBuffer interface{} // is a point to cache.MemBuffer. in order to avoid import cycle
}

// cache is used to reduce object allocation.
cache struct {
Expand Down Expand Up @@ -328,6 +331,30 @@ func (sc *StatementContext) SetPlanHint(hint string) {
sc.planHint = hint
}

// StoreCacheTable stores the read condition and a point to cache data of the given key.
func (sc *StatementContext) StoreCacheTable(tblID int64, buffer interface{}) {
for _, data := range sc.cachedTables {
if data.id == tblID {
data.memBuffer = buffer
}
return
}
sc.cachedTables = append(sc.cachedTables, struct {
id int64
memBuffer interface{}
}{id: tblID, memBuffer: buffer})
}

// GetCacheTable gets the read condition and a point to cache data of the given key if it exists
func (sc *StatementContext) GetCacheTable(tblID int64) (bool, interface{}) {
for _, data := range sc.cachedTables {
if data.id == tblID {
return true, data.memBuffer
}
}
return false, nil
}

// TableEntry presents table in db.
type TableEntry struct {
DB string
Expand Down
15 changes: 15 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,18 @@ var TableFromMeta func(allocators autoid.Allocators, tblInfo *model.TableInfo) (

// MockTableFromMeta only serves for test.
var MockTableFromMeta func(tableInfo *model.TableInfo) Table

// CachedTable is a Table, and it has a UpdateLockForRead() method
// UpdateLockForRead() according to the reasons for not meeting the read conditions, update the lock information,
// And at the same time reload data from the original table.
type CachedTable interface {
Table

// TryGetMemcache Check if the cache table is readable, if it is readable,
// Return the pointer to the MemBuffer and true otherwise return nil and false
TryGetMemcache(ts uint64) (kv.MemBuffer, bool)

// UpdateLockForRead If you cannot meet the conditions of the read buffer,
// you need to update the lock information and read the data from the original table
UpdateLockForRead(ctx sessionctx.Context, ts uint64) error
}
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
Loading