Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix a bug of update with outer join #7177

Merged
merged 17 commits into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, chk *chunk.Chunk) error {
result := <-resultCh
if result.Err != nil {
err = result.Err
if errors.Trace(err) == analyzeWorkerPanic {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CGO_ENABLED=0 revive -formatter friendly -config revive.toml $(go list ./...| grep -vE "vendor")
  ✘  error-naming  error var analyzeWorkerPanic should have name of the form errFoo  
  /home/robi/Code/go/src/github.com/pingcap/tidb/executor/analyze.go:119:5

✘ 1 problem (1 error, 0 warnings)

Errors:
  1  error-naming  

if errors.Trace(err) == errAnalyzeWorkerPanic {
panicCnt++
}
log.Error(errors.ErrorStack(err))
Expand Down Expand Up @@ -116,7 +116,7 @@ type analyzeTask struct {
colExec *AnalyzeColumnsExec
}

var analyzeWorkerPanic = errors.New("analyze worker panic")
var errAnalyzeWorkerPanic = errors.New("analyze worker panic")

func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<- statistics.AnalyzeResult) {
defer func() {
Expand All @@ -127,7 +127,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<-
log.Errorf("analyzeWorker panic stack is:\n%s", buf)
metrics.PanicCounter.WithLabelValues(metrics.LabelAnalyze).Inc()
resultCh <- statistics.AnalyzeResult{
Err: analyzeWorkerPanic,
Err: errAnalyzeWorkerPanic,
}
}
}()
Expand Down
69 changes: 65 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1236,15 +1236,76 @@ func (b *executorBuilder) buildUpdate(v *plan.Update) Executor {
b.err = errors.Trace(b.err)
return nil
}
columns2Handle := buildColumns2Handle(v.Schema(), tblID2table)
updateExec := &UpdateExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec),
SelectExec: selExec,
OrderedList: v.OrderedList,
tblID2table: tblID2table,
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec),
SelectExec: selExec,
OrderedList: v.OrderedList,
tblID2table: tblID2table,
columns2Handle: columns2Handle,
}
return updateExec
}

// cols2Handle represents an mapper from column index to handle index.
type cols2Handle struct {
// start/end represent the ordinal range [start, end) of the consecutive columns.
start, end int32
// handleOrdinal represents the ordinal of the handle column.
handleOrdinal int32
}

// cols2HandleSlice attaches the methods of sort.Interface to []cols2Handle sorting in increasing order.
type cols2HandleSlice []cols2Handle

// Len implements sort.Interface#Len.
func (c cols2HandleSlice) Len() int {
return len(c)
}

// Swap implements sort.Interface#Swap.
func (c cols2HandleSlice) Swap(i, j int) {
c[i], c[j] = c[j], c[i]
}

// Less implements sort.Interface#Less.
func (c cols2HandleSlice) Less(i, j int) bool {
return c[i].start < c[j].start
}

// findHandle finds the ordinal of the corresponding handle column.
func (c cols2HandleSlice) findHandle(ordinal int32) (int32, bool) {
if c == nil || len(c) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if c == nil,
c.findHandle will panic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receiver is slice(point), so it's ok

return 0, false
}
// find the smallest index of the range that its start great than ordinal.
// @see https://godoc.org/sort#Search
rangeBehindOrdinal := sort.Search(len(c), func(i int) bool { return c[i].start > ordinal })
if rangeBehindOrdinal == 0 {
return 0, false
}
return c[rangeBehindOrdinal-1].handleOrdinal, true
}

// buildColumns2Handle builds columns to handle mapping.
func buildColumns2Handle(schema *expression.Schema, tblID2Table map[int64]table.Table) cols2HandleSlice {
if len(schema.TblID2Handle) < 2 {
// skip buildColumns2Handle mapping if there are only single table.
return nil
}
var cols2Handles cols2HandleSlice
for tblID, handleCols := range schema.TblID2Handle {
tbl := tblID2Table[tblID]
for _, handleCol := range handleCols {
offset := getTableOffset(schema, handleCol)
end := offset + len(tbl.WritableCols())
cols2Handles = append(cols2Handles, cols2Handle{int32(offset), int32(end), int32(handleCol.Index)})
}
}
sort.Sort(cols2Handles)
return cols2Handles
}

func (b *executorBuilder) buildDelete(v *plan.Delete) Executor {
tblID2table := make(map[int64]table.Table)
for id := range v.SelectPlan.Schema().TblID2Handle {
Expand Down
69 changes: 68 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2913,7 +2913,7 @@ func (s *testSuite) TestUnsignedDecimalOverflow(c *C) {
func (s *testSuite) TestIndexJoinTableDualPanic(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists a")
tk.MustExec("create table a (f1 int, f2 varchar(32), primary key (f1))")
tk.MustExec("insert into a (f1,f2) values (1,'a'), (2,'b'), (3,'c')")
tk.MustQuery("select a.* from a inner join (select 1 as k1,'k2-1' as k2) as k on a.f1=k.k1;").
Expand Down Expand Up @@ -2959,3 +2959,70 @@ func (s *testSuite) TestUnionAutoSignedCast(c *C) {
tk.MustQuery("select id, v from t5 union select id, v from t7 union select id, v from t6 order by id").
Check(testkit.Rows("1 1", "2 -1", "3 -1"))
}

func (s *testSuite) TestUpdateJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, t3, t4, t5")
tk.MustExec("create table t1(k int, v int)")
tk.MustExec("create table t2(k int, v int)")
tk.MustExec("create table t3(id int auto_increment, k int, v int, primary key(id))")
tk.MustExec("create table t4(k int, v int)")
tk.MustExec("create table t5(v int, k int, primary key(k))")
tk.MustExec("insert into t1 values (1, 1)")
tk.MustExec("insert into t4 values (3, 3)")

// test the normal case that update one row for a single table.
tk.MustExec("update t1 set v = 0 where k = 1")
tk.MustQuery("select k, v from t1 where k = 1").Check(testkit.Rows("1 0"))

// test the case that the table with auto_increment or none-null columns as the right table of left join.
tk.MustExec("update t1 left join t3 on t1.k = t3.k set t1.v = 1")
tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 1"))
tk.MustQuery("select id, k, v from t3").Check(testkit.Rows())

// test left join and the case that the right table has no matching record but has updated the right table columns.
tk.MustExec("update t1 left join t2 on t1.k = t2.k set t1.v = t2.v, t2.v = 3")
tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 <nil>"))
tk.MustQuery("select k, v from t2").Check(testkit.Rows())

// test the case that the update operation in the left table references data in the right table while data of the right table columns is modified.
tk.MustExec("update t1 left join t2 on t1.k = t2.k set t2.v = 3, t1.v = t2.v")
tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 <nil>"))
tk.MustQuery("select k, v from t2").Check(testkit.Rows())

// test right join and the case that the left table has no matching record but has updated the left table columns.
tk.MustExec("update t2 right join t1 on t2.k = t1.k set t2.v = 4, t1.v = 0")
tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 0"))
tk.MustQuery("select k, v from t2").Check(testkit.Rows())

// test the case of right join and left join at the same time.
tk.MustExec("update t1 left join t2 on t1.k = t2.k right join t4 on t4.k = t2.k set t1.v = 4, t2.v = 4, t4.v = 4")
tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 0"))
tk.MustQuery("select k, v from t2").Check(testkit.Rows())
tk.MustQuery("select k, v from t4").Check(testkit.Rows("3 4"))

// test normal left join and the case that the right table has matching rows.
tk.MustExec("insert t2 values (1, 10)")
tk.MustExec("update t1 left join t2 on t1.k = t2.k set t2.v = 11")
tk.MustQuery("select k, v from t2").Check(testkit.Rows("1 11"))

// test the case of continuously joining the same table and updating the unmatching records.
tk.MustExec("update t1 t11 left join t2 on t11.k = t2.k left join t1 t12 on t2.v = t12.k set t12.v = 233, t11.v = 111")
tk.MustQuery("select k, v from t1").Check(testkit.Rows("1 111"))
tk.MustQuery("select k, v from t2").Check(testkit.Rows("1 11"))

// test the left join case that the left table has records but all records are null.
tk.MustExec("delete from t1")
tk.MustExec("delete from t2")
tk.MustExec("insert into t1 values (null, null)")
tk.MustExec("update t1 left join t2 on t1.k = t2.k set t1.v = 1")
tk.MustQuery("select k, v from t1").Check(testkit.Rows("<nil> 1"))

// test the case that the right table of left join has an primary key.
tk.MustExec("insert t5 values(0, 0)")
tk.MustExec("update t1 left join t5 on t1.k = t5.k set t1.v = 2")
tk.MustQuery("select k, v from t1").Check(testkit.Rows("<nil> 2"))
tk.MustQuery("select k, v from t5").Check(testkit.Rows("0 0"))

}
22 changes: 22 additions & 0 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type UpdateExec struct {
newRowsData [][]types.Datum // The new values to be set.
fetched bool
cursor int
// columns2Handle stores relationship between column ordinal to its table handle.
// the columns ordinals is present in ordinal range format, @see executor.cols2Handle
columns2Handle cols2HandleSlice
}

func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) {
Expand All @@ -62,6 +65,10 @@ func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) {
for _, col := range cols {
offset := getTableOffset(schema, col)
end := offset + len(tbl.WritableCols())
handleDatum := row[col.Index]
if e.canNotUpdate(handleDatum) {
continue
}
handle := row[col.Index].GetInt64()
oldData := row[offset:end]
newTableData := newData[offset:end]
Expand All @@ -71,6 +78,7 @@ func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) {
// Each matched row is updated once, even if it matches the conditions multiple times.
continue
}

// Update row
changed, _, _, _, err1 := updateRecord(e.ctx, handle, oldData, newTableData, flags, tbl, false)
if err1 == nil {
Expand All @@ -92,6 +100,16 @@ func (e *UpdateExec) exec(schema *expression.Schema) ([]types.Datum, error) {
return []types.Datum{}, nil
}

// canNotUpdate checks the handle of a record to decide whether that record
// can not be updated. The handle is NULL only when it is the inner side of an
// outer join: the outer row can not match any inner rows, and in this scenario
// the inner handle field is filled with a NULL value.
//
// This fixes: https://github.com/pingcap/tidb/issues/7176.
func (e *UpdateExec) canNotUpdate(handle types.Datum) bool {
return handle.IsNull()
}

// Next implements the Executor Next interface.
func (e *UpdateExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
Expand Down Expand Up @@ -163,6 +181,10 @@ func (e *UpdateExec) handleErr(colName model.CIStr, rowIdx int, err error) error
func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum) ([]types.Datum, error) {
newRowData := types.CopyRow(oldRow)
for _, assign := range e.OrderedList {
handleIdx, handleFound := e.columns2Handle.findHandle(int32(assign.Col.Index))
if handleFound && e.canNotUpdate(oldRow[handleIdx]) {
continue
}
val, err := assign.Expr.Eval(chunk.MutRowFromDatums(newRowData).ToRow())

if err1 := e.handleErr(assign.Col.ColName, rowIdx, err); err1 != nil {
Expand Down