From ff955acfa8b716b1e2158333babdf4d7dbce7d58 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 12 Sep 2019 17:15:45 +0800 Subject: [PATCH] *: refactoring the code of batchChecker #12108 (#12141) --- executor/analyze.go | 6 +- executor/analyze_test.go | 9 ++- executor/batch_checker.go | 44 +++++++++++ executor/insert.go | 146 +++++++++++++++++++++++++++++++++++- executor/insert_test.go | 40 ++++++---- session/pessimistic_test.go | 17 +++++ store/tikv/snapshot.go | 39 ++++++++++ store/tikv/txn.go | 2 +- table/tables/tables.go | 6 +- 9 files changed, 281 insertions(+), 28 deletions(-) diff --git a/executor/analyze.go b/executor/analyze.go index 2e55fde99b43b..4cbee01c0df48 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -63,7 +63,7 @@ type AnalyzeExec struct { var ( // MaxSampleSize is the size of samples for once analyze. // It's public for test. - MaxSampleSize = 10000 + MaxSampleSize = int64(10000) // RandSeed is the seed for randing package. // It's public for test. RandSeed = int64(1) @@ -464,7 +464,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range) (hists []*statis collectors[i] = &statistics.SampleCollector{ IsMerger: true, FMSketch: statistics.NewFMSketch(maxSketchSize), - MaxSampleSize: int64(MaxSampleSize), + MaxSampleSize: atomic.LoadInt64(&MaxSampleSize), CMSketch: statistics.NewCMSketch(defaultCMSketchDepth, defaultCMSketchWidth), } } @@ -1133,7 +1133,7 @@ func (e *AnalyzeFastExec) buildStats() (hists []*statistics.Histogram, cms []*st } randPos := make([]uint64, 0, MaxSampleSize+1) - for i := 0; i < MaxSampleSize; i++ { + for i := 0; i < int(MaxSampleSize); i++ { randPos = append(randPos, uint64(rander.Int63n(int64(e.rowCount)))) } sort.Slice(randPos, func(i, j int) bool { return randPos[i] < randPos[j] }) diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 6509192073006..b53d5545a560b 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -18,6 +18,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" . "github.com/pingcap/check" @@ -158,8 +159,8 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) { dom, err = session.BootstrapSession(store) c.Assert(err, IsNil) tk := testkit.NewTestKit(c, store) - executor.MaxSampleSize = 20 - executor.RandSeed = 123 + atomic.StoreInt64(&executor.MaxSampleSize, 20) + atomic.StoreInt64(&executor.RandSeed, 123) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -229,8 +230,8 @@ func (s *testSuite1) TestFastAnalyze(c *C) { dom, err = session.BootstrapSession(store) c.Assert(err, IsNil) tk := testkit.NewTestKit(c, store) - executor.MaxSampleSize = 6 - executor.RandSeed = 123 + atomic.StoreInt64(&executor.MaxSampleSize, 6) + atomic.StoreInt64(&executor.RandSeed, 123) tk.MustExec("use test") tk.MustExec("drop table if exists t") diff --git a/executor/batch_checker.go b/executor/batch_checker.go index 12d2b7c63a834..ed7eee3fa9dac 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -284,6 +284,50 @@ func (b *batchChecker) deleteDupKeys(ctx sessionctx.Context, t table.Table, rows return nil } +// getOldRowNew gets the table record row from storage for batch check. +// t could be a normal table or a partition, but it must not be a PartitionedTable. +func (b *batchChecker) getOldRowNew(sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle int64, + genExprs []expression.Expression) ([]types.Datum, error) { + oldValue, err := txn.Get(t.RecordKey(handle)) + if err != nil { + return nil, err + } + + cols := t.WritableCols() + oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue) + if err != nil { + return nil, err + } + // Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue. + gIdx := 0 + for _, col := range cols { + if col.State != model.StatePublic && oldRow[col.Offset].IsNull() { + _, found := oldRowMap[col.ID] + if !found { + oldRow[col.Offset], err = table.GetColOriginDefaultValue(sctx, col.ToInfo()) + if err != nil { + return nil, err + } + } + } + if col.IsGenerated() { + // only the virtual column needs fill back. + if !col.GeneratedStored { + val, err := genExprs[gIdx].Eval(chunk.MutRowFromDatums(oldRow).ToRow()) + if err != nil { + return nil, err + } + oldRow[col.Offset], err = table.CastValue(sctx, val, col.ToInfo()) + if err != nil { + return nil, err + } + } + gIdx++ + } + } + return oldRow, nil +} + // getOldRow gets the table record row from storage for batch check. // t could be a normal table or a partition, but it must not be a PartitionedTable. func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64, diff --git a/executor/insert.go b/executor/insert.go index 3b559e2699246..3469b61cbfcd3 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -15,6 +15,7 @@ package executor import ( "context" + "encoding/hex" "fmt" "github.com/opentracing/opentracing-go" @@ -59,7 +60,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { // If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword, // the to-be-insert rows will be check on duplicate keys and update to the new rows. if len(e.OnDuplicate) > 0 { - err := e.batchUpdateDupRows(rows) + err := e.batchUpdateDupRowsNew(ctx, rows) if err != nil { return err } @@ -78,6 +79,149 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { return nil } +func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) { + nKeys := 0 + for _, r := range rows { + if r.handleKey != nil { + nKeys++ + } + nKeys += len(r.uniqueKeys) + } + batchKeys := make([]kv.Key, 0, nKeys) + for _, r := range rows { + if r.handleKey != nil { + batchKeys = append(batchKeys, r.handleKey.newKV.key) + } + for _, k := range r.uniqueKeys { + batchKeys = append(batchKeys, k.newKV.key) + } + } + return txn.BatchGet(batchKeys) +} + +func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow, values map[string][]byte) error { + batchKeys := make([]kv.Key, 0, len(rows)) + for _, r := range rows { + for _, uk := range r.uniqueKeys { + if val, found := values[string(uk.newKV.key)]; found { + handle, err := tables.DecodeHandle(val) + if err != nil { + return err + } + batchKeys = append(batchKeys, r.t.RecordKey(handle)) + } + } + } + _, err := txn.BatchGet(batchKeys) + return err +} + +func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + values, err := prefetchUniqueIndices(ctx, txn, rows) + if err != nil { + return err + } + return prefetchConflictedOldRows(ctx, txn, rows, values) +} + +// updateDupRowNew updates a duplicate row to a new row. +func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error { + oldRow, err := e.getOldRowNew(e.ctx, txn, row.t, handle, e.GenExprs) + if err != nil { + return err + } + + _, _, _, err = e.doDupRowUpdate(handle, oldRow, row.row, e.OnDuplicate) + if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) { + e.ctx.GetSessionVars().StmtCtx.AppendWarning(err) + return nil + } + return err +} + +func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]types.Datum) error { + // Get keys need to be checked. + toBeCheckedRows, err := e.getKeysNeedCheck(e.ctx, e.Table, newRows) + if err != nil { + return err + } + + txn, err := e.ctx.Txn(true) + if err != nil { + return err + } + + // Use BatchGet to fill cache. + // It's an optimization and could be removed without affecting correctness. + if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil { + return err + } + + for i, r := range toBeCheckedRows { + if r.handleKey != nil { + handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key) + if err != nil { + return err + } + + err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate) + if err == nil { + continue + } + if !kv.IsErrNotFound(err) { + return err + } + } + + for _, uk := range r.uniqueKeys { + val, err := txn.Get(uk.newKV.key) + if err != nil { + if kv.IsErrNotFound(err) { + continue + } + return err + } + handle, err := tables.DecodeHandle(val) + if err != nil { + return err + } + + err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate) + if err != nil { + if kv.IsErrNotFound(err) { + // Data index inconsistent? A unique key provide the handle information, but the + // handle points to nothing. + logutil.Logger(ctx).Error("get old row failed when insert on dup", + zap.String("uniqueKey", hex.EncodeToString(uk.newKV.key)), + zap.Int64("handle", handle), + zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row))) + } + return err + } + + newRows[i] = nil + break + } + + // If row was checked with no duplicate keys, + // we should do insert the row, + // and key-values should be filled back to dupOldRowValues for the further row check, + // due to there may be duplicate keys inside the insert statement. + if newRows[i] != nil { + _, err := e.addRecord(newRows[i]) + if err != nil { + return err + } + } + } + return nil +} + // batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table. func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error { err := e.batchGetInsertKeys(e.ctx, e.Table, newRows) diff --git a/executor/insert_test.go b/executor/insert_test.go index 224b071163049..1ad3ab5dae696 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -287,20 +287,6 @@ func (s *testSuite3) TestAllowInvalidDates(c *C) { runWithMode("ALLOW_INVALID_DATES") } -func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec(`use test`) - tk.MustExec(`create table t1 (a int,b int,primary key(a,b)) partition by range(a) (partition p0 values less than (100),partition p1 values less than (1000))`) - tk.MustExec(`insert into t1 set a=1, b=1`) - tk.MustExec(`insert into t1 set a=1,b=1 on duplicate key update a=1,b=1`) - tk.MustQuery(`select * from t1`).Check(testkit.Rows("1 1")) - - tk.MustExec(`create table t2 (a int,b int,primary key(a,b)) partition by hash(a) partitions 4`) - tk.MustExec(`insert into t2 set a=1,b=1;`) - tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`) - tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1")) -} - func (s *testSuite3) TestInsertWithAutoidSchema(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec(`use test`) @@ -561,3 +547,29 @@ func (s *testSuite3) TestInsertWithAutoidSchema(c *C) { } } + +func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec(`use test`) + tk.MustExec(`create table t1 (a int,b int,primary key(a,b)) partition by range(a) (partition p0 values less than (100),partition p1 values less than (1000))`) + tk.MustExec(`insert into t1 set a=1, b=1`) + tk.MustExec(`insert into t1 set a=1,b=1 on duplicate key update a=1,b=1`) + tk.MustQuery(`select * from t1`).Check(testkit.Rows("1 1")) + + tk.MustExec(`create table t2 (a int,b int,primary key(a,b)) partition by hash(a) partitions 4`) + tk.MustExec(`insert into t2 set a=1,b=1;`) + tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`) + tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1")) + + tk.MustExec(`CREATE TABLE t3 (a int, b int, c int, d int, e int, + PRIMARY KEY (a,b), + UNIQUE KEY (b,c,d) +) PARTITION BY RANGE ( b ) ( + PARTITION p0 VALUES LESS THAN (4), + PARTITION p1 VALUES LESS THAN (7), + PARTITION p2 VALUES LESS THAN (11) +)`) + tk.MustExec("insert into t3 values (1,2,3,4,5)") + tk.MustExec("insert into t3 values (1,2,3,4,5),(6,2,3,4,6) on duplicate key update e = e + values(e)") + tk.MustQuery("select * from t3").Check(testkit.Rows("1 2 3 4 16")) +} diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 2d2f233680760..3dbe8aa7566f7 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -374,4 +374,21 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) { tk2.MustExec("commit") _, err := tk.Exec("commit") c.Check(err, NotNil) + + // Update snapshotTS after a conflict, invalidate snapshot cache. + tk.MustExec("truncate table conflict") + tk.MustExec("insert into conflict values (1, 2)") + tk.MustExec("begin pessimistic") + // This SQL use BatchGet and cache data in the txn snapshot. + // It can be changed to other SQLs that use BatchGet. + tk.MustExec("insert ignore into conflict values (1, 2)") + + tk2.MustExec("update conflict set c = c - 1") + + // Make the txn update its forUpdateTS. + tk.MustQuery("select * from conflict where id = 1 for update").Check(testkit.Rows("1 1")) + // Cover a bug that the txn snapshot doesn't invalidate cache after ts change. + tk.MustExec("insert into conflict values (1, 999) on duplicate key update c = c + 2") + tk.MustExec("commit") + tk.MustQuery("select * from conflict").Check(testkit.Rows("1 3")) } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 08df06751ac66..34b8da9518462 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -56,6 +56,11 @@ type tikvSnapshot struct { syncLog bool keyOnly bool vars *kv.Variables + + // Cache the result of BatchGet. + // The invariance is that calling BatchGet multiple times using the same start ts, + // the result should not change. + cached map[string][]byte } // newTiKVSnapshot creates a snapshot of an TiKV store. @@ -68,6 +73,12 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot { } } +func (s *tikvSnapshot) setSnapshotTS(ts uint64) { + // Invalidate cache if the snapshotTS change! + s.version.Ver = ts + s.cached = nil +} + func (s *tikvSnapshot) SetPriority(priority int) { s.priority = pb.CommandPri(priority) } @@ -75,7 +86,20 @@ func (s *tikvSnapshot) SetPriority(priority int) { // BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs. // The map will not contain nonexistent keys. func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { + // Check the cached value first. m := make(map[string][]byte) + if s.cached != nil { + tmp := keys[:0] + for _, key := range keys { + if val, ok := s.cached[string(key)]; ok { + m[string(key)] = val + } else { + tmp = append(tmp, key) + } + } + keys = tmp + } + if len(keys) == 0 { return m, nil } @@ -107,6 +131,14 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { return nil, errors.Trace(err) } + // Update the cache. + if s.cached == nil { + s.cached = make(map[string][]byte, len(m)) + } + for key, value := range m { + s.cached[key] = value + } + return m, nil } @@ -234,6 +266,13 @@ func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) { } func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { + // Check the cached values first. + if s.cached != nil { + if value, ok := s.cached[string(k)]; ok { + return value, nil + } + } + sender := NewRegionRequestSender(s.store.regionCache, s.store.client) req := &tikvrpc.Request{ diff --git a/store/tikv/txn.go b/store/tikv/txn.go index e5634fbf51d03..7d81d22624003 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -227,7 +227,7 @@ func (txn *tikvTxn) SetOption(opt kv.Option, val interface{}) { case kv.KeyOnly: txn.snapshot.keyOnly = val.(bool) case kv.SnapshotTS: - txn.snapshot.version.Ver = val.(uint64) + txn.snapshot.setSnapshotTS(val.(uint64)) } } diff --git a/table/tables/tables.go b/table/tables/tables.go index e56d77677df71..290f5c599a6e4 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -665,11 +665,7 @@ func DecodeRawRowData(ctx sessionctx.Context, meta *model.TableInfo, h int64, co // Row implements table.Table Row interface. func (t *tableCommon) Row(ctx sessionctx.Context, h int64) ([]types.Datum, error) { - r, err := t.RowWithCols(ctx, h, t.Cols()) - if err != nil { - return nil, err - } - return r, nil + return t.RowWithCols(ctx, h, t.Cols()) } // RemoveRecord implements table.Table RemoveRecord interface.