diff --git a/executor/analyze.go b/executor/analyze.go index 40d78666c105b..2c20de123867f 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -72,7 +72,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, chk *chunk.Chunk) error { result := <-resultCh if result.Err != nil { err = result.Err - if errors.Trace(err) == analyzeWorkerPanic { + if errors.Trace(err) == errAnalyzeWorkerPanic { panicCnt++ } log.Error(errors.ErrorStack(err)) @@ -116,7 +116,7 @@ type analyzeTask struct { colExec *AnalyzeColumnsExec } -var analyzeWorkerPanic = errors.New("analyze worker panic") +var errAnalyzeWorkerPanic = errors.New("analyze worker panic") func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<- statistics.AnalyzeResult) { defer func() { @@ -127,7 +127,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<- log.Errorf("analyzeWorker panic stack is:\n%s", buf) metrics.PanicCounter.WithLabelValues(metrics.LabelAnalyze).Inc() resultCh <- statistics.AnalyzeResult{ - Err: analyzeWorkerPanic, + Err: errAnalyzeWorkerPanic, } } }() diff --git a/executor/builder.go b/executor/builder.go index 523323f5b95e2..449f57c651013 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1236,15 +1236,76 @@ func (b *executorBuilder) buildUpdate(v *plan.Update) Executor { b.err = errors.Trace(b.err) return nil } + columns2Handle := buildColumns2Handle(v.Schema(), tblID2table) updateExec := &UpdateExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec), - SelectExec: selExec, - OrderedList: v.OrderedList, - tblID2table: tblID2table, + baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec), + SelectExec: selExec, + OrderedList: v.OrderedList, + tblID2table: tblID2table, + columns2Handle: columns2Handle, } return updateExec } +// cols2Handle represents an mapper from column index to handle index. +type cols2Handle struct { + // start/end represent the ordinal range [start, end) of the consecutive columns. + start, end int32 + // handleOrdinal represents the ordinal of the handle column. + handleOrdinal int32 +} + +// cols2HandleSlice attaches the methods of sort.Interface to []cols2Handle sorting in increasing order. +type cols2HandleSlice []cols2Handle + +// Len implements sort.Interface#Len. +func (c cols2HandleSlice) Len() int { + return len(c) +} + +// Swap implements sort.Interface#Swap. +func (c cols2HandleSlice) Swap(i, j int) { + c[i], c[j] = c[j], c[i] +} + +// Less implements sort.Interface#Less. +func (c cols2HandleSlice) Less(i, j int) bool { + return c[i].start < c[j].start +} + +// findHandle finds the ordinal of the corresponding handle column. +func (c cols2HandleSlice) findHandle(ordinal int32) (int32, bool) { + if c == nil || len(c) == 0 { + return 0, false + } + // find the smallest index of the range that its start great than ordinal. + // @see https://godoc.org/sort#Search + rangeBehindOrdinal := sort.Search(len(c), func(i int) bool { return c[i].start > ordinal }) + if rangeBehindOrdinal == 0 { + return 0, false + } + return c[rangeBehindOrdinal-1].handleOrdinal, true +} + +// buildColumns2Handle builds columns to handle mapping. +func buildColumns2Handle(schema *expression.Schema, tblID2Table map[int64]table.Table) cols2HandleSlice { + if len(schema.TblID2Handle) < 2 { + // skip buildColumns2Handle mapping if there are only single table. + return nil + } + var cols2Handles cols2HandleSlice + for tblID, handleCols := range schema.TblID2Handle { + tbl := tblID2Table[tblID] + for _, handleCol := range handleCols { + offset := getTableOffset(schema, handleCol) + end := offset + len(tbl.WritableCols()) + cols2Handles = append(cols2Handles, cols2Handle{int32(offset), int32(end), int32(handleCol.Index)}) + } + } + sort.Sort(cols2Handles) + return cols2Handles +} + func (b *executorBuilder) buildDelete(v *plan.Delete) Executor { tblID2table := make(map[int64]table.Table) for id := range v.SelectPlan.Schema().TblID2Handle { diff --git a/executor/executor_test.go b/executor/executor_test.go index 737b6a7698dcd..683557c574c44 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2913,7 +2913,7 @@ func (s *testSuite) TestUnsignedDecimalOverflow(c *C) { func (s *testSuite) TestIndexJoinTableDualPanic(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - tk.MustExec("drop table if exists t") + tk.MustExec("drop table if exists a") tk.MustExec("create table a (f1 int, f2 varchar(32), primary key (f1))") tk.MustExec("insert into a (f1,f2) values (1,'a'), (2,'b'), (3,'c')") tk.MustQuery("select a.* from a inner join (select 1 as k1,'k2-1' as k2) as k on a.f1=k.k1;"). @@ -2959,3 +2959,70 @@ func (s *testSuite) TestUnionAutoSignedCast(c *C) { tk.MustQuery("select id, v from t5 union select id, v from t7 union select id, v from t6 order by id"). Check(testkit.Rows("1 1", "2 -1", "3 -1")) } + +func (s *testSuite) TestUpdateJoin(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2, t3, t4, t5") + tk.MustExec("create table t1(k int, v int)") + tk.MustExec("create table t2(k int, v int)") + tk.MustExec("create table t3(id int auto_increment, k int, v int, primary key(id))") + tk.MustExec("create table t4(k int, v int)") + tk.MustExec("create table t5(v int, k int, primary key(k))") + tk.MustExec("insert into t1 values (1, 1)") + tk.MustExec("insert into t4 values (3, 3)") + + // test the normal case that update one row for a single table. + tk.MustExec("update t1 set v = 0 where k = 1") + tk.MustQuery("select k, v from t1 where k = 1").Check(testkit.Rows("1 0")) + + // test the case that the table with auto_increment or none-null columns as the right table of left join. + tk.MustExec("update t1 left join t3 on t1.k = t3.k set t1.v = 1") + tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 1")) + tk.MustQuery("select id, k, v from t3").Check(testkit.Rows()) + + // test left join and the case that the right table has no matching record but has updated the right table columns. + tk.MustExec("update t1 left join t2 on t1.k = t2.k set t1.v = t2.v, t2.v = 3") + tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 ")) + tk.MustQuery("select k, v from t2").Check(testkit.Rows()) + + // test the case that the update operation in the left table references data in the right table while data of the right table columns is modified. + tk.MustExec("update t1 left join t2 on t1.k = t2.k set t2.v = 3, t1.v = t2.v") + tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 ")) + tk.MustQuery("select k, v from t2").Check(testkit.Rows()) + + // test right join and the case that the left table has no matching record but has updated the left table columns. + tk.MustExec("update t2 right join t1 on t2.k = t1.k set t2.v = 4, t1.v = 0") + tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 0")) + tk.MustQuery("select k, v from t2").Check(testkit.Rows()) + + // test the case of right join and left join at the same time. + tk.MustExec("update t1 left join t2 on t1.k = t2.k right join t4 on t4.k = t2.k set t1.v = 4, t2.v = 4, t4.v = 4") + tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 0")) + tk.MustQuery("select k, v from t2").Check(testkit.Rows()) + tk.MustQuery("select k, v from t4").Check(testkit.Rows("3 4")) + + // test normal left join and the case that the right table has matching rows. + tk.MustExec("insert t2 values (1, 10)") + tk.MustExec("update t1 left join t2 on t1.k = t2.k set t2.v = 11") + tk.MustQuery("select k, v from t2").Check(testkit.Rows("1 11")) + + // test the case of continuously joining the same table and updating the unmatching records. + tk.MustExec("update t1 t11 left join t2 on t11.k = t2.k left join t1 t12 on t2.v = t12.k set t12.v = 233, t11.v = 111") + tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 111")) + tk.MustQuery("select k, v from t2").Check(testkit.Rows("1 11")) + + // test the left join case that the left table has records but all records are null. + tk.MustExec("delete from t1") + tk.MustExec("delete from t2") + tk.MustExec("insert into t1 values (null, null)") + tk.MustExec("update t1 left join t2 on t1.k = t2.k set t1.v = 1") + tk.MustQuery("select k, v from t1").Check(testkit.Rows(" 1")) + + // test the case that the right table of left join has an primary key. + tk.MustExec("insert t5 values(0, 0)") + tk.MustExec("update t1 left join t5 on t1.k = t5.k set t1.v = 2") + tk.MustQuery("select k, v from t1").Check(testkit.Rows(" 2")) + tk.MustQuery("select k, v from t5").Check(testkit.Rows("0 0")) + +} diff --git a/executor/update.go b/executor/update.go index 908d29d3fd4a3..ab697d7db3cb4 100644 --- a/executor/update.go +++ b/executor/update.go @@ -39,6 +39,9 @@ type UpdateExec struct { newRowsData [][]types.Datum // The new values to be set. fetched bool cursor int + // columns2Handle stores relationship between column ordinal to its table handle. + // the columns ordinals is present in ordinal range format, @see executor.cols2Handle + columns2Handle cols2HandleSlice } func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) { @@ -62,6 +65,10 @@ func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) { for _, col := range cols { offset := getTableOffset(schema, col) end := offset + len(tbl.WritableCols()) + handleDatum := row[col.Index] + if e.canNotUpdate(handleDatum) { + continue + } handle := row[col.Index].GetInt64() oldData := row[offset:end] newTableData := newData[offset:end] @@ -71,6 +78,7 @@ func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) { // Each matched row is updated once, even if it matches the conditions multiple times. continue } + // Update row changed, _, _, _, err1 := updateRecord(e.ctx, handle, oldData, newTableData, flags, tbl, false) if err1 == nil { @@ -92,6 +100,16 @@ func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) { return []types.Datum{}, nil } +// canNotUpdate checks the handle of a record to decide whether that record +// can not be updated. The handle is NULL only when it is the inner side of an +// outer join: the outer row can not match any inner rows, and in this scenario +// the inner handle field is filled with a NULL value. +// +// This fixes: https://github.com/pingcap/tidb/issues/7176. +func (e *UpdateExec) canNotUpdate(handle types.Datum) bool { + return handle.IsNull() +} + // Next implements the Executor Next interface. func (e *UpdateExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() @@ -163,6 +181,10 @@ func (e *UpdateExec) handleErr(colName model.CIStr, rowIdx int, err error) error func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum) ([]types.Datum, error) { newRowData := types.CopyRow(oldRow) for _, assign := range e.OrderedList { + handleIdx, handleFound := e.columns2Handle.findHandle(int32(assign.Col.Index)) + if handleFound && e.canNotUpdate(oldRow[handleIdx]) { + continue + } val, err := assign.Expr.Eval(chunk.MutRowFromDatums(newRowData).ToRow()) if err1 := e.handleErr(assign.Col.ColName, rowIdx, err); err1 != nil {