Skip to content

Commit

Permalink
executor: fix load data losing connection when batch_dml_size is set (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Feb 19, 2021
1 parent 9aef9af commit d3a4e00
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 4 deletions.
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,8 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
Table: tbl,
Columns: v.Columns,
GenExprs: v.GenCols.Exprs,
isLoadData: true,
txnInUse: sync.Mutex{},
}
loadDataInfo := &LoadDataInfo{
row: make([]types.Datum, 0, len(insertVal.insertColumns)),
Expand Down
19 changes: 15 additions & 4 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -81,6 +82,12 @@ type InsertValues struct {
memTracker *memory.Tracker

stats *InsertRuntimeStat

// LoadData use two goroutines. One for generate batch data,
// The other one for commit task, which will invalid txn.
// We use mutex to protect routine from using invalid txn.
isLoadData bool
txnInUse sync.Mutex
}

type defaultVal struct {
Expand Down Expand Up @@ -868,10 +875,6 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
_, err := e.ctx.Txn(true)
if err != nil {
return types.Datum{}, errors.Trace(err)
}
recordID, err = e.allocAutoRandomID(ctx, &c.FieldType)
if err != nil {
return types.Datum{}, err
Expand Down Expand Up @@ -905,6 +908,14 @@ func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.F
if tables.OverflowShardBits(autoRandomID, tableInfo.AutoRandomBits, layout.TypeBitsLength, layout.HasSignBit) {
return 0, autoid.ErrAutoRandReadFailed
}
if e.isLoadData {
e.txnInUse.Lock()
defer e.txnInUse.Unlock()
}
_, err = e.ctx.Txn(true)
if err != nil {
return 0, err
}
shard := e.ctx.GetSessionVars().TxnCtx.GetShard(tableInfo.AutoRandomBits, layout.TypeBitsLength, layout.HasSignBit, 1)
autoRandomID |= shard
return autoRandomID, nil
Expand Down
3 changes: 3 additions & 0 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error
return errors.New("mock commit one task error")
})
e.Ctx.StmtCommit()
// Make sure process stream routine never use invalid txn
e.txnInUse.Lock()
defer e.txnInUse.Unlock()
// Make sure that there are no retries when committing.
if err = e.Ctx.RefreshTxnCtx(ctx); err != nil {
logutil.Logger(ctx).Error("commit error refresh", zap.Error(err))
Expand Down
55 changes: 55 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,61 @@ func (cli *testServerClient) prepareLoadDataFile(c *C, path string, rows ...stri
c.Assert(err, IsNil)
}

func (cli *testServerClient) runTestLoadDataAutoRandom(c *C) {
path := "/tmp/load_data_txn_error.csv"

fp, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
c.Assert(err, IsNil)
c.Assert(fp, NotNil)

defer func() {
_ = os.Remove(path)
}()

cksum1 := 0
cksum2 := 0
for i := 0; i < 50000; i++ {
n1 := rand.Intn(1000)
n2 := rand.Intn(1000)
str1 := strconv.Itoa(n1)
str2 := strconv.Itoa(n2)
row := str1 + "\t" + str2
_, err := fp.WriteString(row)
c.Assert(err, IsNil)
_, err = fp.WriteString("\n")
c.Assert(err, IsNil)

if i == 0 {
cksum1 = n1
cksum2 = n2
} else {
cksum1 = cksum1 ^ n1
cksum2 = cksum2 ^ n2
}
}

err = fp.Close()
c.Assert(err, IsNil)

cli.runTestsOnNewDB(c, func(config *mysql.Config) {
config.AllowAllFiles = true
config.Params = map[string]string{"sql_mode": "''"}
}, "load_data_batch_dml", func(dbt *DBTest) {
// Set batch size, and check if load data got a invalid txn error.
dbt.mustExec("set @@session.tidb_dml_batch_size = 128")
dbt.mustExec("drop table if exists t")
dbt.mustExec("create table t(c1 bigint auto_random primary key, c2 bigint, c3 bigint)")
dbt.mustExec(fmt.Sprintf("load data local infile %q into table t (c2, c3)", path))
rows := dbt.mustQuery("select count(*) from t")
cli.checkRows(c, rows, "50000")
rows = dbt.mustQuery("select bit_xor(c2), bit_xor(c3) from t")
res := strconv.Itoa(cksum1)
res = res + " "
res = res + strconv.Itoa(cksum2)
cli.checkRows(c, rows, res)
})
}

func (cli *testServerClient) runTestLoadDataForListPartition(c *C) {
path := "/tmp/load_data_list_partition.csv"
defer func() {
Expand Down
6 changes: 6 additions & 0 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ func (ts *tidbTestSerialSuite) TestLoadDataListPartition(c *C) {
ts.runTestLoadDataForListColumnPartition2(c)
}

// Fix issue#22540. Change tidb_dml_batch_size,
// then check if load data into table with auto random column works properly.
func (ts *tidbTestSerialSuite) TestLoadDataAutoRandom(c *C) {
ts.runTestLoadDataAutoRandom(c)
}

func (ts *tidbTestSerialSuite) TestExplainFor(c *C) {
ts.runTestExplainForConn(c)
}
Expand Down

0 comments on commit d3a4e00

Please sign in to comment.