From d3a4e004dac8affcd62400a9d68900e9ccd1f3d0 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Fri, 19 Feb 2021 11:49:31 +0800 Subject: [PATCH] executor: fix load data losing connection when batch_dml_size is set (#22724) (#22737) --- executor/builder.go | 2 ++ executor/insert_common.go | 19 +++++++++++--- executor/load_data.go | 3 +++ server/server_test.go | 55 +++++++++++++++++++++++++++++++++++++++ server/tidb_test.go | 6 +++++ 5 files changed, 81 insertions(+), 4 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index eac9e8cfa1399..f616a4fb0d22c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -788,6 +788,8 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { Table: tbl, Columns: v.Columns, GenExprs: v.GenCols.Exprs, + isLoadData: true, + txnInUse: sync.Mutex{}, } loadDataInfo := &LoadDataInfo{ row: make([]types.Datum, 0, len(insertVal.insertColumns)), diff --git a/executor/insert_common.go b/executor/insert_common.go index c5d7de4ae06f4..7818d92a97d0d 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "sync" "time" "github.com/opentracing/opentracing-go" @@ -81,6 +82,12 @@ type InsertValues struct { memTracker *memory.Tracker stats *InsertRuntimeStat + + // LoadData use two goroutines. One for generate batch data, + // The other one for commit task, which will invalid txn. + // We use mutex to protect routine from using invalid txn. + isLoadData bool + txnInUse sync.Mutex } type defaultVal struct { @@ -868,10 +875,6 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, // Change NULL to auto id. // Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set. if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 { - _, err := e.ctx.Txn(true) - if err != nil { - return types.Datum{}, errors.Trace(err) - } recordID, err = e.allocAutoRandomID(ctx, &c.FieldType) if err != nil { return types.Datum{}, err @@ -905,6 +908,14 @@ func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.F if tables.OverflowShardBits(autoRandomID, tableInfo.AutoRandomBits, layout.TypeBitsLength, layout.HasSignBit) { return 0, autoid.ErrAutoRandReadFailed } + if e.isLoadData { + e.txnInUse.Lock() + defer e.txnInUse.Unlock() + } + _, err = e.ctx.Txn(true) + if err != nil { + return 0, err + } shard := e.ctx.GetSessionVars().TxnCtx.GetShard(tableInfo.AutoRandomBits, layout.TypeBitsLength, layout.HasSignBit, 1) autoRandomID |= shard return autoRandomID, nil diff --git a/executor/load_data.go b/executor/load_data.go index 429b9f3d48cd3..8dffdd30d71cc 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -289,6 +289,9 @@ func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error return errors.New("mock commit one task error") }) e.Ctx.StmtCommit() + // Make sure process stream routine never use invalid txn + e.txnInUse.Lock() + defer e.txnInUse.Unlock() // Make sure that there are no retries when committing. if err = e.Ctx.RefreshTxnCtx(ctx); err != nil { logutil.Logger(ctx).Error("commit error refresh", zap.Error(err)) diff --git a/server/server_test.go b/server/server_test.go index 6270c25855181..6276a6eeb1fcc 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -520,6 +520,61 @@ func (cli *testServerClient) prepareLoadDataFile(c *C, path string, rows ...stri c.Assert(err, IsNil) } +func (cli *testServerClient) runTestLoadDataAutoRandom(c *C) { + path := "/tmp/load_data_txn_error.csv" + + fp, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + c.Assert(err, IsNil) + c.Assert(fp, NotNil) + + defer func() { + _ = os.Remove(path) + }() + + cksum1 := 0 + cksum2 := 0 + for i := 0; i < 50000; i++ { + n1 := rand.Intn(1000) + n2 := rand.Intn(1000) + str1 := strconv.Itoa(n1) + str2 := strconv.Itoa(n2) + row := str1 + "\t" + str2 + _, err := fp.WriteString(row) + c.Assert(err, IsNil) + _, err = fp.WriteString("\n") + c.Assert(err, IsNil) + + if i == 0 { + cksum1 = n1 + cksum2 = n2 + } else { + cksum1 = cksum1 ^ n1 + cksum2 = cksum2 ^ n2 + } + } + + err = fp.Close() + c.Assert(err, IsNil) + + cli.runTestsOnNewDB(c, func(config *mysql.Config) { + config.AllowAllFiles = true + config.Params = map[string]string{"sql_mode": "''"} + }, "load_data_batch_dml", func(dbt *DBTest) { + // Set batch size, and check if load data got a invalid txn error. + dbt.mustExec("set @@session.tidb_dml_batch_size = 128") + dbt.mustExec("drop table if exists t") + dbt.mustExec("create table t(c1 bigint auto_random primary key, c2 bigint, c3 bigint)") + dbt.mustExec(fmt.Sprintf("load data local infile %q into table t (c2, c3)", path)) + rows := dbt.mustQuery("select count(*) from t") + cli.checkRows(c, rows, "50000") + rows = dbt.mustQuery("select bit_xor(c2), bit_xor(c3) from t") + res := strconv.Itoa(cksum1) + res = res + " " + res = res + strconv.Itoa(cksum2) + cli.checkRows(c, rows, res) + }) +} + func (cli *testServerClient) runTestLoadDataForListPartition(c *C) { path := "/tmp/load_data_list_partition.csv" defer func() { diff --git a/server/tidb_test.go b/server/tidb_test.go index 1a6dbe27b488f..d811bda558811 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -162,6 +162,12 @@ func (ts *tidbTestSerialSuite) TestLoadDataListPartition(c *C) { ts.runTestLoadDataForListColumnPartition2(c) } +// Fix issue#22540. Change tidb_dml_batch_size, +// then check if load data into table with auto random column works properly. +func (ts *tidbTestSerialSuite) TestLoadDataAutoRandom(c *C) { + ts.runTestLoadDataAutoRandom(c) +} + func (ts *tidbTestSerialSuite) TestExplainFor(c *C) { ts.runTestExplainForConn(c) }