Skip to content

Commit

Permalink
*: refactoring the code of batchChecker #12108 (#12141)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and sre-bot committed Sep 12, 2019
1 parent 299da1b commit ff955ac
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 28 deletions.
6 changes: 3 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type AnalyzeExec struct {
var (
// MaxSampleSize is the size of samples for once analyze.
// It's public for test.
MaxSampleSize = 10000
MaxSampleSize = int64(10000)
// RandSeed is the seed for randing package.
// It's public for test.
RandSeed = int64(1)
Expand Down Expand Up @@ -464,7 +464,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range) (hists []*statis
collectors[i] = &statistics.SampleCollector{
IsMerger: true,
FMSketch: statistics.NewFMSketch(maxSketchSize),
MaxSampleSize: int64(MaxSampleSize),
MaxSampleSize: atomic.LoadInt64(&MaxSampleSize),
CMSketch: statistics.NewCMSketch(defaultCMSketchDepth, defaultCMSketchWidth),
}
}
Expand Down Expand Up @@ -1133,7 +1133,7 @@ func (e *AnalyzeFastExec) buildStats() (hists []*statistics.Histogram, cms []*st
}

randPos := make([]uint64, 0, MaxSampleSize+1)
for i := 0; i < MaxSampleSize; i++ {
for i := 0; i < int(MaxSampleSize); i++ {
randPos = append(randPos, uint64(rander.Int63n(int64(e.rowCount))))
}
sort.Slice(randPos, func(i, j int) bool { return randPos[i] < randPos[j] })
Expand Down
9 changes: 5 additions & 4 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -158,8 +159,8 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) {
dom, err = session.BootstrapSession(store)
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
executor.MaxSampleSize = 20
executor.RandSeed = 123
atomic.StoreInt64(&executor.MaxSampleSize, 20)
atomic.StoreInt64(&executor.RandSeed, 123)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -229,8 +230,8 @@ func (s *testSuite1) TestFastAnalyze(c *C) {
dom, err = session.BootstrapSession(store)
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
executor.MaxSampleSize = 6
executor.RandSeed = 123
atomic.StoreInt64(&executor.MaxSampleSize, 6)
atomic.StoreInt64(&executor.RandSeed, 123)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand Down
44 changes: 44 additions & 0 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,50 @@ func (b *batchChecker) deleteDupKeys(ctx sessionctx.Context, t table.Table, rows
return nil
}

// getOldRowNew gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRowNew(sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle int64,
genExprs []expression.Expression) ([]types.Datum, error) {
oldValue, err := txn.Get(t.RecordKey(handle))
if err != nil {
return nil, err
}

cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, err
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
gIdx := 0
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(sctx, col.ToInfo())
if err != nil {
return nil, err
}
}
}
if col.IsGenerated() {
// only the virtual column needs fill back.
if !col.GeneratedStored {
val, err := genExprs[gIdx].Eval(chunk.MutRowFromDatums(oldRow).ToRow())
if err != nil {
return nil, err
}
oldRow[col.Offset], err = table.CastValue(sctx, val, col.ToInfo())
if err != nil {
return nil, err
}
}
gIdx++
}
}
return oldRow, nil
}

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64,
Expand Down
146 changes: 145 additions & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"context"
"encoding/hex"
"fmt"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
if len(e.OnDuplicate) > 0 {
err := e.batchUpdateDupRows(rows)
err := e.batchUpdateDupRowsNew(ctx, rows)
if err != nil {
return err
}
Expand All @@ -78,6 +79,149 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return nil
}

func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) {
nKeys := 0
for _, r := range rows {
if r.handleKey != nil {
nKeys++
}
nKeys += len(r.uniqueKeys)
}
batchKeys := make([]kv.Key, 0, nKeys)
for _, r := range rows {
if r.handleKey != nil {
batchKeys = append(batchKeys, r.handleKey.newKV.key)
}
for _, k := range r.uniqueKeys {
batchKeys = append(batchKeys, k.newKV.key)
}
}
return txn.BatchGet(batchKeys)
}

func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow, values map[string][]byte) error {
batchKeys := make([]kv.Key, 0, len(rows))
for _, r := range rows {
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}
batchKeys = append(batchKeys, r.t.RecordKey(handle))
}
}
}
_, err := txn.BatchGet(batchKeys)
return err
}

func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
values, err := prefetchUniqueIndices(ctx, txn, rows)
if err != nil {
return err
}
return prefetchConflictedOldRows(ctx, txn, rows, values)
}

// updateDupRowNew updates a duplicate row to a new row.
func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRowNew(e.ctx, txn, row.t, handle, e.GenExprs)
if err != nil {
return err
}

_, _, _, err = e.doDupRowUpdate(handle, oldRow, row.row, e.OnDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
return err
}

func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
toBeCheckedRows, err := e.getKeysNeedCheck(e.ctx, e.Table, newRows)
if err != nil {
return err
}

txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
if err == nil {
continue
}
if !kv.IsErrNotFound(err) {
return err
}
}

for _, uk := range r.uniqueKeys {
val, err := txn.Get(uk.newKV.key)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return err
}
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
// handle points to nothing.
logutil.Logger(ctx).Error("get old row failed when insert on dup",
zap.String("uniqueKey", hex.EncodeToString(uk.newKV.key)),
zap.Int64("handle", handle),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
}
return err
}

newRows[i] = nil
break
}

// If row was checked with no duplicate keys,
// we should do insert the row,
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
_, err := e.addRecord(newRows[i])
if err != nil {
return err
}
}
}
return nil
}

// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)
Expand Down
40 changes: 26 additions & 14 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,20 +287,6 @@ func (s *testSuite3) TestAllowInvalidDates(c *C) {
runWithMode("ALLOW_INVALID_DATES")
}

func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec(`create table t1 (a int,b int,primary key(a,b)) partition by range(a) (partition p0 values less than (100),partition p1 values less than (1000))`)
tk.MustExec(`insert into t1 set a=1, b=1`)
tk.MustExec(`insert into t1 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t1`).Check(testkit.Rows("1 1"))

tk.MustExec(`create table t2 (a int,b int,primary key(a,b)) partition by hash(a) partitions 4`)
tk.MustExec(`insert into t2 set a=1,b=1;`)
tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1"))
}

func (s *testSuite3) TestInsertWithAutoidSchema(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
Expand Down Expand Up @@ -561,3 +547,29 @@ func (s *testSuite3) TestInsertWithAutoidSchema(c *C) {
}

}

func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec(`create table t1 (a int,b int,primary key(a,b)) partition by range(a) (partition p0 values less than (100),partition p1 values less than (1000))`)
tk.MustExec(`insert into t1 set a=1, b=1`)
tk.MustExec(`insert into t1 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t1`).Check(testkit.Rows("1 1"))

tk.MustExec(`create table t2 (a int,b int,primary key(a,b)) partition by hash(a) partitions 4`)
tk.MustExec(`insert into t2 set a=1,b=1;`)
tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1"))

tk.MustExec(`CREATE TABLE t3 (a int, b int, c int, d int, e int,
PRIMARY KEY (a,b),
UNIQUE KEY (b,c,d)
) PARTITION BY RANGE ( b ) (
PARTITION p0 VALUES LESS THAN (4),
PARTITION p1 VALUES LESS THAN (7),
PARTITION p2 VALUES LESS THAN (11)
)`)
tk.MustExec("insert into t3 values (1,2,3,4,5)")
tk.MustExec("insert into t3 values (1,2,3,4,5),(6,2,3,4,6) on duplicate key update e = e + values(e)")
tk.MustQuery("select * from t3").Check(testkit.Rows("1 2 3 4 16"))
}
17 changes: 17 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,21 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
tk2.MustExec("commit")
_, err := tk.Exec("commit")
c.Check(err, NotNil)

// Update snapshotTS after a conflict, invalidate snapshot cache.
tk.MustExec("truncate table conflict")
tk.MustExec("insert into conflict values (1, 2)")
tk.MustExec("begin pessimistic")
// This SQL use BatchGet and cache data in the txn snapshot.
// It can be changed to other SQLs that use BatchGet.
tk.MustExec("insert ignore into conflict values (1, 2)")

tk2.MustExec("update conflict set c = c - 1")

// Make the txn update its forUpdateTS.
tk.MustQuery("select * from conflict where id = 1 for update").Check(testkit.Rows("1 1"))
// Cover a bug that the txn snapshot doesn't invalidate cache after ts change.
tk.MustExec("insert into conflict values (1, 999) on duplicate key update c = c + 2")
tk.MustExec("commit")
tk.MustQuery("select * from conflict").Check(testkit.Rows("1 3"))
}
Loading

0 comments on commit ff955ac

Please sign in to comment.