From fb683bef2c43c1f94c1200af2179281beac614f4 Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 12 Sep 2019 10:00:45 +0800 Subject: [PATCH] executor: refactor union scan and dirty table (#11702) --- executor/mem_reader.go | 116 ++++++++++++++++++++++++++++++---- executor/union_scan.go | 120 ++---------------------------------- executor/union_scan_test.go | 21 +++++++ kv/kv.go | 9 +++ session/txn.go | 12 ++-- sessionctx/context.go | 3 +- store/tikv/2pc.go | 3 + table/index.go | 6 ++ table/table.go | 2 - table/tables/index.go | 40 ++++++++++-- table/tables/tables.go | 34 +++++----- tablecodec/tablecodec.go | 28 +++++++++ util/mock/context.go | 5 +- 13 files changed, 235 insertions(+), 164 deletions(-) mode change 100644 => 100755 session/txn.go diff --git a/executor/mem_reader.go b/executor/mem_reader.go index ad408f0e36ec8..0b7888a53e077 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -17,15 +17,16 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/set" ) type memIndexReader struct { @@ -36,12 +37,9 @@ type memIndexReader struct { desc bool conditions []expression.Expression addedRows [][]types.Datum + addedRowsLen int retFieldTypes []*types.FieldType outputOffset []int - // cache for decode handle. - handleBytes []byte - // memIdxHandles is uses to store the handle ids that has been read by memIndexReader. - memIdxHandles set.Int64Set // belowHandleIndex is the handle's position of the below scan plan. belowHandleIndex int } @@ -62,8 +60,6 @@ func buildMemIndexReader(us *UnionScanExec, idxReader *IndexReaderExecutor) *mem addedRows: make([][]types.Datum, 0, len(us.dirty.addedRows)), retFieldTypes: retTypes(us), outputOffset: outputOffset, - handleBytes: make([]byte, 0, 16), - memIdxHandles: set.NewInt64Set(), belowHandleIndex: us.belowHandleIndex, } } @@ -92,8 +88,6 @@ func (m *memIndexReader) getMemRows() ([][]types.Datum, error) { if err != nil { return err } - handle := data[m.belowHandleIndex].GetInt64() - m.memIdxHandles.Insert(handle) mutableRow.SetDatums(data...) matched, _, err := expression.EvalBool(m.ctx, m.conditions, mutableRow.ToRow()) @@ -150,17 +144,16 @@ type memTableReader struct { } func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *memTableReader { - kvRanges := tblReader.kvRanges colIDs := make(map[int64]int) - for i, col := range tblReader.columns { + for i, col := range us.columns { colIDs[col.ID] = i } return &memTableReader{ ctx: us.ctx, - table: tblReader.table.Meta(), + table: us.table.Meta(), columns: us.columns, - kvRanges: kvRanges, + kvRanges: tblReader.kvRanges, desc: us.desc, conditions: us.conditions, addedRows: make([][]types.Datum, 0, len(us.dirty.addedRows)), @@ -305,3 +298,100 @@ func reverseDatumSlice(rows [][]types.Datum) { rows[i], rows[j] = rows[j], rows[i] } } + +func (m *memIndexReader) getMemRowsHandle() ([]int64, error) { + pkTp := types.NewFieldType(mysql.TypeLonglong) + if m.table.PKIsHandle { + for _, col := range m.table.Columns { + if mysql.HasPriKeyFlag(col.Flag) { + pkTp = &col.FieldType + break + } + } + } + handles := make([]int64, 0, m.addedRowsLen) + err := iterTxnMemBuffer(m.ctx, m.kvRanges, func(key, value []byte) error { + handle, err := tablecodec.DecodeIndexHandle(key, value, len(m.index.Columns), pkTp) + if err != nil { + return err + } + handles = append(handles, handle) + return nil + }) + if err != nil { + return nil, err + } + + if m.desc { + for i, j := 0, len(handles)-1; i < j; i, j = i+1, j-1 { + handles[i], handles[j] = handles[j], handles[i] + } + } + return handles, nil +} + +type memIndexLookUpReader struct { + ctx sessionctx.Context + index *model.IndexInfo + columns []*model.ColumnInfo + table table.Table + desc bool + conditions []expression.Expression + retFieldTypes []*types.FieldType + + idxReader *memIndexReader +} + +func buildMemIndexLookUpReader(us *UnionScanExec, idxLookUpReader *IndexLookUpExecutor) *memIndexLookUpReader { + kvRanges := idxLookUpReader.kvRanges + outputOffset := []int{len(idxLookUpReader.index.Columns)} + memIdxReader := &memIndexReader{ + ctx: us.ctx, + index: idxLookUpReader.index, + table: idxLookUpReader.table.Meta(), + kvRanges: kvRanges, + desc: idxLookUpReader.desc, + addedRowsLen: len(us.dirty.addedRows), + retFieldTypes: retTypes(us), + outputOffset: outputOffset, + belowHandleIndex: us.belowHandleIndex, + } + + return &memIndexLookUpReader{ + ctx: us.ctx, + index: idxLookUpReader.index, + columns: idxLookUpReader.columns, + table: idxLookUpReader.table, + desc: idxLookUpReader.desc, + conditions: us.conditions, + retFieldTypes: retTypes(us), + idxReader: memIdxReader, + } +} + +func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) { + handles, err := m.idxReader.getMemRowsHandle() + if err != nil || len(handles) == 0 { + return nil, err + } + + tblKVRanges := distsql.TableHandlesToKVRanges(getPhysicalTableID(m.table), handles) + colIDs := make(map[int64]int, len(m.columns)) + for i, col := range m.columns { + colIDs[col.ID] = i + } + + memTblReader := &memTableReader{ + ctx: m.ctx, + table: m.table.Meta(), + columns: m.columns, + kvRanges: tblKVRanges, + conditions: m.conditions, + addedRows: make([][]types.Datum, 0, len(handles)), + retFieldTypes: m.retFieldTypes, + colIDs: colIDs, + handleBytes: make([]byte, 0, 16), + } + + return memTblReader.getMemRows() +} diff --git a/executor/union_scan.go b/executor/union_scan.go index 9594697a72ef5..72f7dcf33c386 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -15,7 +15,6 @@ package executor import ( "context" - "sort" "github.com/pingcap/errors" "github.com/pingcap/parser/model" @@ -24,7 +23,6 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/set" ) // DirtyDB stores uncommitted write operations for a transaction. @@ -55,11 +53,10 @@ type DirtyTable struct { // the key is handle. addedRows map[int64]struct{} deletedRows map[int64]struct{} - truncated bool } // AddRow adds a row to the DirtyDB. -func (dt *DirtyTable) AddRow(handle int64, row []types.Datum) { +func (dt *DirtyTable) AddRow(handle int64) { dt.addedRows[handle] = struct{}{} } @@ -69,12 +66,6 @@ func (dt *DirtyTable) DeleteRow(handle int64) { dt.deletedRows[handle] = struct{}{} } -// TruncateTable truncates a table. -func (dt *DirtyTable) TruncateTable() { - dt.addedRows = make(map[int64]struct{}) - dt.truncated = true -} - // GetDirtyDB returns the DirtyDB bind to the context. func GetDirtyDB(ctx sessionctx.Context) *DirtyDB { var udb *DirtyDB @@ -102,9 +93,7 @@ type UnionScanExec struct { // belowHandleIndex is the handle's position of the below scan plan. belowHandleIndex int - addedRows [][]types.Datum - // memIdxHandles is uses to store the handle ids that has been read by memIndexReader. - memIdxHandles set.Int64Set + addedRows [][]types.Datum cursor4AddRows int sortErr error snapshotRows [][]types.Datum @@ -130,10 +119,9 @@ func (us *UnionScanExec) open(ctx context.Context) error { case *IndexReaderExecutor: mIdxReader := buildMemIndexReader(us, x) us.addedRows, err = mIdxReader.getMemRows() - us.memIdxHandles = mIdxReader.memIdxHandles case *IndexLookUpExecutor: - us.memIdxHandles = set.NewInt64Set() - err = us.buildAndSortAddedRows(ctx, x.table) + idxLookup := buildMemIndexLookUpReader(us, x) + us.addedRows, err = idxLookup.getMemRows() } if err != nil { return err @@ -201,9 +189,6 @@ func (us *UnionScanExec) getOneRow(ctx context.Context) ([]types.Datum, error) { } func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, error) { - if us.dirty.truncated { - return nil, nil - } if us.cursor4SnapshotRows < len(us.snapshotRows) { return us.snapshotRows[us.cursor4SnapshotRows], nil } @@ -219,19 +204,11 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err for row := iter.Begin(); row != iter.End(); row = iter.Next() { snapshotHandle := row.GetInt64(us.belowHandleIndex) if _, ok := us.dirty.deletedRows[snapshotHandle]; ok { - err = us.getMissIndexRowsByHandle(ctx, snapshotHandle) - if err != nil { - return nil, err - } continue } if _, ok := us.dirty.addedRows[snapshotHandle]; ok { // If src handle appears in added rows, it means there is conflict and the transaction will fail to // commit, but for simplicity, we don't handle it here. - err = us.getMissIndexRowsByHandle(ctx, snapshotHandle) - if err != nil { - return nil, err - } continue } us.snapshotRows = append(us.snapshotRows, row.GetDatumRow(retTypes(us.children[0]))) @@ -240,30 +217,6 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err return us.snapshotRows[0], nil } -// For index reader and index look up reader, update doesn't write index to txn memBuffer when the idx column -// is unchanged. So the `memIndexReader` and `memIndexLookUpReader` can't read the index from txn memBuffer. -// This function is used to get the missing row by the handle if the handle is in dirtyTable.addedRows. -func (us *UnionScanExec) getMissIndexRowsByHandle(ctx context.Context, handle int64) error { - reader := us.children[0] - switch reader.(type) { - case *TableReaderExecutor: - return nil - } - if _, ok := us.dirty.addedRows[handle]; !ok { - return nil - } - // Don't miss in memBuffer reader. - if us.memIdxHandles.Exist(handle) { - return nil - } - memRow, err := us.getMemRow(ctx, handle) - if memRow == nil || err != nil { - return err - } - us.snapshotRows = append(us.snapshotRows, memRow) - return nil -} - func (us *UnionScanExec) getAddedRow() []types.Datum { var addedRow []types.Datum if us.cursor4AddRows < len(us.addedRows) { @@ -319,71 +272,6 @@ func (us *UnionScanExec) compare(a, b []types.Datum) (int, error) { return cmp, nil } -// rowWithColsInTxn gets the row from the transaction buffer. -func (us *UnionScanExec) rowWithColsInTxn(ctx context.Context, t table.Table, h int64) ([]types.Datum, error) { - key := t.RecordKey(h) - txn, err := us.ctx.Txn(true) - if err != nil { - return nil, err - } - value, err := txn.GetMemBuffer().Get(ctx, key) - if err != nil { - return nil, err - } - colIDs := make(map[int64]int) - for i, col := range us.columns { - colIDs[col.ID] = i - } - return decodeRowData(us.ctx, us.table.Meta(), us.columns, colIDs, h, []byte{}, value) -} - -func (us *UnionScanExec) getMemRow(ctx context.Context, h int64) ([]types.Datum, error) { - data, err := us.rowWithColsInTxn(ctx, us.table, h) - if err != nil { - return nil, err - } - us.mutableRow.SetDatums(data...) - matched, _, err := expression.EvalBool(us.ctx, us.conditions, us.mutableRow.ToRow()) - if err != nil { - return nil, err - } - if !matched { - return nil, nil - } - return data, nil -} - -// TODO: remove `buildAndSortAddedRows` functions and `DirtyTable`. -func (us *UnionScanExec) buildAndSortAddedRows(ctx context.Context, t table.Table) error { - us.addedRows = make([][]types.Datum, 0, len(us.dirty.addedRows)) - mutableRow := chunk.MutRowFromTypes(retTypes(us)) - for h := range us.dirty.addedRows { - us.memIdxHandles.Insert(h) - newData, err := us.rowWithColsInTxn(ctx, t, h) - if err != nil { - return err - } - mutableRow.SetDatums(newData...) - matched, _, err := expression.EvalBool(us.ctx, us.conditions, mutableRow.ToRow()) - if err != nil { - return err - } - if !matched { - continue - } - us.addedRows = append(us.addedRows, newData) - } - if us.desc { - sort.Sort(sort.Reverse(us)) - } else { - sort.Sort(us) - } - if us.sortErr != nil { - return errors.Trace(us.sortErr) - } - return nil -} - // Len implements sort.Interface interface. func (us *UnionScanExec) Len() int { return len(us.addedRows) diff --git a/executor/union_scan_test.go b/executor/union_scan_test.go index 73b6ee04e3c24..9323a4afba653 100644 --- a/executor/union_scan_test.go +++ b/executor/union_scan_test.go @@ -164,6 +164,7 @@ func (s *testSuite4) TestUnionScanForMemBufferReader(c *C) { // update with unchange index column. tk.MustExec("update t set a=a+1") tk.MustQuery("select * from t use index (idx)").Check(testkit.Rows("2 1", "3 2")) + tk.MustQuery("select b from t use index (idx)").Check(testkit.Rows("1", "2")) tk.MustExec("update t set b=b+2 where a=2") tk.MustQuery("select * from t").Check(testkit.Rows("2 3", "3 2")) tk.MustQuery("select * from t use index (idx) order by b desc").Check(testkit.Rows("2 3", "3 2")) @@ -212,4 +213,24 @@ func (s *testSuite4) TestUnionScanForMemBufferReader(c *C) { tk.MustQuery("select a,b from t1 use index(idx) where b>1 and c is not null;").Check(testkit.Rows("3 3")) tk.MustExec("commit") tk.MustExec("admin check table t1;") + + // Test insert and update with untouched index. + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (a int,b int,c int,index idx(b));") + tk.MustExec("begin;") + tk.MustExec("insert into t1 values (1, 1, 1), (2, 2, 2);") + tk.MustExec("update t1 set c=c+1 where a=1;") + tk.MustQuery("select * from t1 use index(idx);").Check(testkit.Rows("1 1 2", "2 2 2")) + tk.MustExec("commit") + tk.MustExec("admin check table t1;") + + // Test insert and update with untouched unique index. + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (a int,b int,c int,unique index idx(b));") + tk.MustExec("begin;") + tk.MustExec("insert into t1 values (1, 1, 1), (2, 2, 2);") + tk.MustExec("update t1 set c=c+1 where a=1;") + tk.MustQuery("select * from t1 use index(idx);").Check(testkit.Rows("1 1 2", "2 2 2")) + tk.MustExec("commit") + tk.MustExec("admin check table t1;") } diff --git a/kv/kv.go b/kv/kv.go index 836e7f12561e1..6e00d19351427 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -62,6 +62,15 @@ const ( PriorityHigh ) +// UnCommitIndexKVFlag uses to indicate the index key/value is no need to commit. +// This is used in the situation of the index key/value was unchanged when do update. +// Usage: +// 1. For non-unique index: normally, the index value is '0'. +// Change the value to '1' indicate the index key/value is no need to commit. +// 2. For unique index: normally, the index value is the record handle ID, 8 bytes. +// Append UnCommitIndexKVFlag to the value indicate the index key/value is no need to commit. +const UnCommitIndexKVFlag byte = '1' + // IsoLevel is the transaction's isolation level. type IsoLevel int diff --git a/session/txn.go b/session/txn.go old mode 100644 new mode 100755 index 75d914ae7313c..76d593e094ce0 --- a/session/txn.go +++ b/session/txn.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tipb/go-binlog" "go.uber.org/zap" @@ -150,7 +149,6 @@ type dirtyTableOperation struct { kind int tid int64 handle int64 - row []types.Datum } var hasMockAutoIDRetry = int64(0) @@ -335,7 +333,7 @@ func keyNeedToLock(k, v []byte) bool { // only need to delete row key. return k[10] == 'r' } - isNonUniqueIndex := len(v) == 1 && v[0] == '0' + isNonUniqueIndex := len(v) == 1 // Put row key and unique index need to lock. return !isNonUniqueIndex } @@ -365,11 +363,9 @@ func mergeToDirtyDB(dirtyDB *executor.DirtyDB, op dirtyTableOperation) { dt := dirtyDB.GetDirtyTable(op.tid) switch op.kind { case table.DirtyTableAddRow: - dt.AddRow(op.handle, op.row) + dt.AddRow(op.handle) case table.DirtyTableDeleteRow: dt.DeleteRow(op.handle) - case table.DirtyTableTruncate: - dt.TruncateTable() } } @@ -475,6 +471,6 @@ func (s *session) StmtGetMutation(tableID int64) *binlog.TableMutation { return st.mutations[tableID] } -func (s *session) StmtAddDirtyTableOP(op int, tid int64, handle int64, row []types.Datum) { - s.txn.dirtyTableOP = append(s.txn.dirtyTableOP, dirtyTableOperation{op, tid, handle, row}) +func (s *session) StmtAddDirtyTableOP(op int, tid int64, handle int64) { + s.txn.dirtyTableOP = append(s.txn.dirtyTableOP, dirtyTableOperation{op, tid, handle}) } diff --git a/sessionctx/context.go b/sessionctx/context.go index 97fd17948d56e..6c0b7e100aebe 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tipb/go-binlog" @@ -81,7 +80,7 @@ type Context interface { // StmtGetMutation gets the binlog mutation for current statement. StmtGetMutation(int64) *binlog.TableMutation // StmtAddDirtyTableOP adds the dirty table operation for current statement. - StmtAddDirtyTableOP(op int, physicalID int64, handle int64, row []types.Datum) + StmtAddDirtyTableOP(op int, physicalID int64, handle int64) // DDLOwnerChecker returns owner.DDLOwnerChecker. DDLOwnerChecker() owner.DDLOwnerChecker // AddTableLock adds table lock to the session lock map. diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index f1e20ecbd9536..d3caabc6c3637 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -205,6 +205,9 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { } err := txn.us.WalkBuffer(func(k kv.Key, v []byte) error { if len(v) > 0 { + if tablecodec.IsUntouchedIndexKValue(k, v) { + return nil + } op := pb.Op_Put if c := txn.us.GetKeyExistErrInfo(k); c != nil { op = pb.Op_Insert diff --git a/table/index.go b/table/index.go index 3912d60003dc2..ad341847ed11e 100644 --- a/table/index.go +++ b/table/index.go @@ -35,6 +35,7 @@ type CreateIdxOpt struct { SkipCheck bool // If true, skip all the unique indices constraint check. kv.AssertionProto // If not nil, check assertion. Ctx context.Context + Untouched bool // If true, the index key/value is no need to commit. } // CreateIdxOptFunc is defined for the Create() method of Index interface. @@ -59,6 +60,11 @@ func WithAssertion(x kv.AssertionProto) CreateIdxOptFunc { } } +// IndexIsUntouched uses to indicate the index kv is untouched. +var IndexIsUntouched CreateIdxOptFunc = func(opt *CreateIdxOpt) { + opt.Untouched = true +} + // WithCtx returns a CreateIdxFunc. // This option is used to pass context.Context. func WithCtx(ctx context.Context) CreateIdxOptFunc { diff --git a/table/table.go b/table/table.go index 0e6294080fdab..f233e2cfa517d 100644 --- a/table/table.go +++ b/table/table.go @@ -47,8 +47,6 @@ const ( DirtyTableAddRow = iota // DirtyTableDeleteRow is the constant for dirty table operation type. DirtyTableDeleteRow - // DirtyTableTruncate is the constant for dirty table operation type. - DirtyTableTruncate ) var ( diff --git a/table/tables/index.go b/table/tables/index.go index cb865ea5add66..f0bc5644fddde 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -204,11 +204,31 @@ func (c *index) Create(sctx sessionctx.Context, rm kv.RetrieverMutator, indexedV if err != nil { return 0, err } + + ctx := opt.Ctx + if opt.Untouched { + txn, err1 := sctx.Txn(true) + if err1 != nil { + return 0, err1 + } + // If the index kv was untouched(unchanged), and the key/value already exists in mem-buffer, + // should not overwrite the key with un-commit flag. + // So if the key exists, just do nothing and return. + _, err = txn.GetMemBuffer().Get(ctx, key) + if err == nil { + return 0, nil + } + } + // save the key buffer to reuse. writeBufs.IndexKeyBuf = key if !distinct { // non-unique index doesn't need store value, write a '0' to reduce space - err = rm.Set(key, []byte{'0'}) + value := []byte{'0'} + if opt.Untouched { + value[0] = kv.UnCommitIndexKVFlag + } + err = rm.Set(key, value) if ss != nil { ss.SetAssertion(key, kv.None) } @@ -216,14 +236,17 @@ func (c *index) Create(sctx sessionctx.Context, rm kv.RetrieverMutator, indexedV } if skipCheck { - err = rm.Set(key, EncodeHandle(h)) + value := EncodeHandle(h) + if opt.Untouched { + value = append(value, kv.UnCommitIndexKVFlag) + } + err = rm.Set(key, value) if ss != nil { ss.SetAssertion(key, kv.None) } return 0, err } - ctx := opt.Ctx if ctx != nil { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("index.Create", opentracing.ChildOf(span.Context())) @@ -236,8 +259,15 @@ func (c *index) Create(sctx sessionctx.Context, rm kv.RetrieverMutator, indexedV var value []byte value, err = rm.Get(ctx, key) - if kv.IsErrNotFound(err) { - err = rm.Set(key, EncodeHandle(h)) + // If (opt.Untouched && err == nil) is true, means the key is exists and exists in TiKV, not in txn mem-buffer, + // then should also write the untouched index key/value to mem-buffer to make sure the data + // is consistent with the index in txn mem-buffer. + if kv.IsErrNotFound(err) || (opt.Untouched && err == nil) { + v := EncodeHandle(h) + if opt.Untouched { + v = append(v, kv.UnCommitIndexKVFlag) + } + err = rm.Set(key, v) if ss != nil { ss.SetAssertion(key, kv.NotExist) } diff --git a/table/tables/tables.go b/table/tables/tables.go index 7dc9f5be0abd1..4d2e705b319f6 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -328,8 +328,8 @@ func (t *tableCommon) UpdateRecord(ctx sessionctx.Context, h int64, oldData, new if err = bs.SaveTo(txn); err != nil { return err } - ctx.StmtAddDirtyTableOP(table.DirtyTableDeleteRow, t.physicalTableID, h, nil) - ctx.StmtAddDirtyTableOP(table.DirtyTableAddRow, t.physicalTableID, h, newData) + ctx.StmtAddDirtyTableOP(table.DirtyTableDeleteRow, t.physicalTableID, h) + ctx.StmtAddDirtyTableOP(table.DirtyTableAddRow, t.physicalTableID, h) if shouldWriteBinlog(ctx) { if !t.meta.PKIsHandle { binlogColIDs = append(binlogColIDs, model.ExtraHandleID) @@ -380,19 +380,21 @@ func (t *tableCommon) rebuildIndices(ctx sessionctx.Context, rm kv.RetrieverMuta } } for _, idx := range t.WritableIndices() { + untouched := true for _, ic := range idx.Meta().Columns { if !touched[ic.Offset] { continue } - newVs, err := idx.FetchValues(newData, nil) - if err != nil { - return err - } - if err := t.buildIndexForRow(ctx, rm, h, newVs, idx, txn); err != nil { - return err - } + untouched = false break } + newVs, err := idx.FetchValues(newData, nil) + if err != nil { + return err + } + if err := t.buildIndexForRow(ctx, rm, h, newVs, idx, txn, untouched); err != nil { + return err + } } return nil } @@ -530,11 +532,9 @@ func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. if err = rm.(*kv.BufferStore).SaveTo(txn); err != nil { return 0, err } + ctx.StmtAddDirtyTableOP(table.DirtyTableAddRow, t.physicalTableID, recordID) } - if !ctx.GetSessionVars().LightningMode { - ctx.StmtAddDirtyTableOP(table.DirtyTableAddRow, t.physicalTableID, recordID, r) - } if shouldWriteBinlog(ctx) { // For insert, TiDB and Binlog can use same row and schema. binlogRow = row @@ -710,7 +710,7 @@ func (t *tableCommon) RemoveRecord(ctx sessionctx.Context, h int64, r []types.Da return err } - ctx.StmtAddDirtyTableOP(table.DirtyTableDeleteRow, t.physicalTableID, h, nil) + ctx.StmtAddDirtyTableOP(table.DirtyTableDeleteRow, t.physicalTableID, h) if shouldWriteBinlog(ctx) { cols := t.Cols() colIDs := make([]int64, 0, len(cols)+1) @@ -831,8 +831,12 @@ func (t *tableCommon) removeRowIndex(sc *stmtctx.StatementContext, rm kv.Retriev } // buildIndexForRow implements table.Table BuildIndexForRow interface. -func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator, h int64, vals []types.Datum, idx table.Index, txn kv.Transaction) error { - if _, err := idx.Create(ctx, rm, vals, h, table.WithAssertion(txn)); err != nil { +func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator, h int64, vals []types.Datum, idx table.Index, txn kv.Transaction, untouched bool) error { + opts := []table.CreateIdxOptFunc{table.WithAssertion(txn)} + if untouched { + opts = append(opts, table.IndexIsUntouched) + } + if _, err := idx.Create(ctx, rm, vals, h, opts...); err != nil { if kv.ErrKeyExists.Equal(err) { // Make error message consistent with MySQL. entryKey, err1 := t.genIndexKeyStr(vals) diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 99e8edd7e5247..97cbea9c815dc 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -560,6 +560,26 @@ func DecodeIndexKV(key, value []byte, colsLen int, pkStatus PrimaryKeyStatus) ([ return values, nil } +// DecodeIndexHandle uses to decode the handle from index key/value. +func DecodeIndexHandle(key, value []byte, colsLen int, pkTp *types.FieldType) (int64, error) { + _, b, err := CutIndexKeyNew(key, colsLen) + if err != nil { + return 0, errors.Trace(err) + } + if len(b) > 0 { + d, err := DecodeColumnValue(b, pkTp, nil) + if err != nil { + return 0, errors.Trace(err) + } + return d.GetInt64(), nil + + } else if len(value) >= 8 { + return DecodeIndexValueAsHandle(value) + } + // Should never execute to here. + return 0, errors.Errorf("no handle in index key: %v, value: %v", key, value) +} + // DecodeIndexValueAsHandle uses to decode index value as handle id. func DecodeIndexValueAsHandle(data []byte) (int64, error) { var h int64 @@ -623,6 +643,14 @@ func GenTableIndexPrefix(tableID int64) kv.Key { return appendTableIndexPrefix(buf, tableID) } +// IsUntouchedIndexKValue uses to check whether the key is index key, and the value is untouched, +// since the untouched index key/value is no need to commit. +func IsUntouchedIndexKValue(k, v []byte) bool { + vLen := len(v) + return (len(k) > 11 && k[0] == 't' && k[10] == 'i') && + ((vLen == 1 || vLen == 9) && v[vLen-1] == kv.UnCommitIndexKVFlag) +} + // GenTablePrefix composes table record and index prefix: "t[tableID]". func GenTablePrefix(tableID int64) kv.Key { buf := make([]byte, 0, len(tablePrefix)+8) diff --git a/util/mock/context.go b/util/mock/context.go index 889b5b2fd6e55..ba63400ddc5e2 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -26,11 +26,10 @@ import ( "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/sqlexec" - binlog "github.com/pingcap/tipb/go-binlog" + "github.com/pingcap/tipb/go-binlog" ) var _ sessionctx.Context = (*Context)(nil) @@ -213,7 +212,7 @@ func (c *Context) StmtGetMutation(tableID int64) *binlog.TableMutation { } // StmtAddDirtyTableOP implements the sessionctx.Context interface. -func (c *Context) StmtAddDirtyTableOP(op int, tid int64, handle int64, row []types.Datum) { +func (c *Context) StmtAddDirtyTableOP(op int, tid int64, handle int64) { } // AddTableLock implements the sessionctx.Context interface.