Skip to content

Commit

Permalink
address crazycs520's comments #3
Browse files Browse the repository at this point in the history
1. refactor CreateTeableInsertExec to re-use current executors
2. added more test cases
  • Loading branch information
bb7133 committed Dec 6, 2018
1 parent 5bf6773 commit 6cf0fb8
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 178 deletions.
16 changes: 9 additions & 7 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (b *executorBuilder) buildRevoke(revoke *ast.RevokeStmt) Executor {
func (b *executorBuilder) buildDDL(v *plannercore.DDL) Executor {
if b.ctx.GetSessionVars().CreateTableInsertingID != 0 {
// in a 'inserting data from select' state of creating table.
return b.buildCreateTableInsert(v, b.ctx.GetSessionVars().CreateTableInsertingID)
return b.buildTableInserter(v, b.ctx.GetSessionVars().CreateTableInsertingID)
}
return &DDLExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
Expand All @@ -671,8 +671,8 @@ func (b *executorBuilder) buildDDL(v *plannercore.DDL) Executor {
}
}

// buildCreateTableInsert builds a CreateTableInsertExec to insert data when creating table by 'create table ... select'
func (b *executorBuilder) buildCreateTableInsert(v *plannercore.DDL, tableID int64) Executor {
// buildTableInserter builds a CreateTableInsertExec to insert data when creating table by 'create table ... select'
func (b *executorBuilder) buildTableInserter(v *plannercore.DDL, tableID int64) Executor {
stmt, ok := v.Statement.(*ast.CreateTableStmt)
if !ok || v.InsertPlan.SelectPlan == nil {
b.err = errors.Errorf("Unexpected plan: %s", v.Statement.Text())
Expand Down Expand Up @@ -721,11 +721,13 @@ func (b *executorBuilder) buildCreateTableInsert(v *plannercore.DDL, tableID int
return nil
}

return &CreateTableInsertExec{
InsertValues: insertVal,
onDuplicate: stmt.OnDuplicate,
finished: false,
switch stmt.OnDuplicate {
case ast.OnDuplicateCreateTableSelectReplace:
return &CreateTableInsertExec{insert: &ReplaceExec{InsertValues: insertVal}}
default:
return &CreateTableInsertExec{insert: &InsertExec{InsertValues: insertVal}}
}

}

// buildTrace builds a TraceExec for future executing. This method will be called
Expand Down
173 changes: 9 additions & 164 deletions executor/create_table_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,187 +14,32 @@
package executor

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

// CreateTableInsertExec represents an insert executor when creating table, it is basically similar to `InsertExec` except:
// 1. It is instantiated and executed at DDL owner server, during the execution of 'create table' DDL job
// 2. The 'on duplicate' behavior(ERROR/IGNORE/REPLACE) is specified by 'onDuplicate' option
// CreateTableInsertExec represents an insert executor when creating table, it is simply a wrapper of `InsertExec` or
// `ReplaceExec` depends on `onDuplicate` option
// See 'https://dev.mysql.com/doc/refman/5.7/en/create-table-select.html' for more details
type CreateTableInsertExec struct {
*InsertValues

onDuplicate ast.OnDuplicateCreateTableSelectType
baseExecutor

finished bool
}

func (e *CreateTableInsertExec) exec(rows [][]types.Datum) error {
// If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode.
sessVars := e.ctx.GetSessionVars()
defer sessVars.CleanBuffers()

e.rowCount = 0
if !sessVars.LightningMode {
sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(e.ctx.Txn(true), kv.TempTxnMemBufCap)
}

switch e.onDuplicate {
case ast.OnDuplicateCreateTableSelectIgnore:
err := e.batchCheckAndInsert(rows, e.addRecord)
if err != nil {
return errors.Trace(err)
}
case ast.OnDuplicateCreateTableSelectReplace:
err := e.batchCheckAndReplaceInsert(rows)
if err != nil {
return errors.Trace(err)
}
case ast.OnDuplicateCreateTableSelectError:
for _, row := range rows {
if _, err := e.addRecord(row); err != nil {
return errors.Trace(err)
}
}
}
e.finished = true
return nil
insert Executor
}

// Next implements Exec Next interface.
func (e *CreateTableInsertExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.finished {
return nil
}
return errors.Trace(e.insertRowsFromSelect(ctx, e.exec))
return errors.Trace(e.insert.Next(ctx, chk))
}

// Close implements the Executor Close interface.
func (e *CreateTableInsertExec) Close() error {
return e.SelectExec.Close()
return errors.Trace(e.insert.Close())
}

// Open implements the Executor Close interface.
func (e *CreateTableInsertExec) Open(ctx context.Context) error {
return e.SelectExec.Open(ctx)
}

// batchCheckAndReplaceInsert updates multi-rows in batch if they are duplicate with rows in table.
func (e *CreateTableInsertExec) batchCheckAndReplaceInsert(newRows [][]types.Datum) error {
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)
if err != nil {
return errors.Trace(err)
}

// Batch get the to-be-updated rows in storage.
err = e.initDupOldRowValue(e.ctx, e.Table, newRows)
if err != nil {
return errors.Trace(err)
}

for i, r := range e.toBeCheckedRows {
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return errors.Trace(err)
}
err = e.replaceDupRow(r, handle)
if err != nil {
return errors.Trace(err)
}
continue
}
}
for _, uk := range r.uniqueKeys {
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return errors.Trace(err)
}
err = e.replaceDupRow(r, handle)
if err != nil {
return errors.Trace(err)
}
newRows[i] = nil
break
}
}
// If row was checked with no duplicate keys,
// we should do insert the row,
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
newHandle, err := e.addRecord(newRows[i])
if err != nil {
return errors.Trace(err)
}
e.fillBackKeys(e.Table, r, newHandle)
}
}
return nil
}

// replaceDupRow updates a duplicate row to a new row.
func (e *CreateTableInsertExec) replaceDupRow(row toBeCheckedRow, handle int64) (err error) {
oldRow, err := e.getOldRow(e.ctx, e.Table, handle)
if err != nil {
log.Errorf("[insert on dup] handle is %d for the to-be-inserted row %s", handle, types.DatumsToStrNoErr(row.row))
return errors.Trace(err)
}

// Do update row.
updatedRow, handleChanged, newHandle, err := e.doDupRowReplace(handle, oldRow, row.row)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
if err != nil {
return errors.Trace(err)
}
return e.updateDupKeyValues(handle, newHandle, handleChanged, oldRow, updatedRow)
}

// doDupRowReplace updates the duplicate row.
func (e *CreateTableInsertExec) doDupRowReplace(handle int64, oldRow []types.Datum, newRow []types.Datum) ([]types.Datum, bool, int64, error) {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.ctx.GetSessionVars().CurrInsertValues = chunk.MutRowFromDatums(newRow).ToRow()

_, handleChanged, newHandle, err := updateRecord(e.ctx, handle, oldRow, newRow, assignFlag, e.Table, true)
if err != nil {
return nil, false, 0, errors.Trace(err)
}
return newRow, handleChanged, newHandle, nil
}

// updateDupKeyValues updates the dupKeyValues for further duplicate key check.
func (e *CreateTableInsertExec) updateDupKeyValues(oldHandle int64, newHandle int64,
handleChanged bool, oldRow []types.Datum, updatedRow []types.Datum) error {
// There is only one row per update.
fillBackKeysInRows, err := e.getKeysNeedCheck(e.ctx, e.Table, [][]types.Datum{updatedRow})
if err != nil {
return errors.Trace(err)
}
// Delete old keys and fill back new key-values of the updated row.
err = e.deleteDupKeys(e.ctx, e.Table, [][]types.Datum{oldRow})
if err != nil {
return errors.Trace(err)
}
if handleChanged {
delete(e.dupOldRowValues, string(e.Table.RecordKey(oldHandle)))
e.fillBackKeys(e.Table, fillBackKeysInRows[0], newHandle)
} else {
e.fillBackKeys(e.Table, fillBackKeysInRows[0], oldHandle)
}
return nil
return errors.Trace(e.insert.Open(ctx))
}
18 changes: 16 additions & 2 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
"golang.org/x/net/context"
)

func (s *testSuite) TestTruncateTable(c *C) {
Expand Down Expand Up @@ -159,7 +158,22 @@ func (s *testSuite) TestCreateTable(c *C) {
r = tk.MustQuery("select * from create_target;")
r.Check(testkit.Rows("5 aa 1", "6 bb 2", "7 bb 3"))

// test duplicate keys
// test 'ignore' and 'replace' keywords
tk.MustExec("drop table if exists create_target;")
_, err = tk.Exec("create table create_target(a int not null) select null as a")
c.Assert(err.Error(), Equals, "[table:1048]Column 'a' cannot be null")
tk.MustExec("create table create_target(a int not null) ignore select null as a")

tk.MustExec("drop table if exists create_target;")
_, err = tk.Exec("create table create_target(a varchar(1)) select 'abcd' as a;")
c.Assert(err.Error(), Equals, "[types:1406]Data Too Long, field len 1, data len 4")
tk.MustExec("create table create_target(a varchar(1)) ignore select 'abcd' as a")

tk.MustExec("drop table if exists create_target;")
_, err = tk.Exec("create table create_target(a datetime) select '20180001' as a;")
c.Assert(err.Error(), Equals, "[types:1292]Incorrect datetime value: '2018-00-01'")
tk.MustExec("create table create_target(a datetime) ignore select '20180001' as a")

tk.MustExec("drop table if exists create_source;")
tk.MustExec("create table create_source(ord int, a int, b int);")
tk.MustExec("insert into create_source values (1, 1, 1);")
Expand Down
14 changes: 9 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,16 +1276,20 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.DividedByZeroAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.IgnoreZeroInDate = !vars.StrictSQLMode || stmt.IgnoreErr
sc.Priority = stmt.Priority
case *ast.CreateTableStmt, *ast.AlterTableStmt:
case *ast.CreateTableStmt:
if ctx.GetSessionVars().CreateTableInsertingID != 0 {
// in a 'inserting data from select' state of creating table.
ignoreError := stmt.OnDuplicate == ast.OnDuplicateCreateTableSelectIgnore
sc.InInsertStmt = true
sc.BadNullAsWarning = !vars.StrictSQLMode
sc.TruncateAsWarning = !vars.StrictSQLMode
sc.DividedByZeroAsWarning = !vars.StrictSQLMode
sc.IgnoreZeroInDate = !vars.StrictSQLMode
sc.DupKeyAsWarning = ignoreError
sc.BadNullAsWarning = !vars.StrictSQLMode || ignoreError
sc.TruncateAsWarning = !vars.StrictSQLMode || ignoreError
sc.DividedByZeroAsWarning = !vars.StrictSQLMode || ignoreError
sc.IgnoreZeroInDate = !vars.StrictSQLMode || ignoreError
}
// Make sure the sql_mode is strict when checking column default value.
case *ast.AlterTableStmt:
// Make sure the sql_mode is strict when checking column default value.
case *ast.LoadDataStmt:
sc.DupKeyAsWarning = true
sc.BadNullAsWarning = true
Expand Down
1 change: 1 addition & 0 deletions planner/core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var (
ErrTablenameNotAllowedHere = terror.ClassOptimizer.New(codeTablenameNotAllowedHere, "Table '%s' from one of the %ss cannot be used in %s")
ErrWrongUsage = terror.ClassOptimizer.New(codeWrongUsage, mysql.MySQLErrName[mysql.ErrWrongUsage])
ErrAmbiguous = terror.ClassOptimizer.New(codeAmbiguous, mysql.MySQLErrName[mysql.ErrNonUniq])
ErrUnknown = terror.ClassOptimizer.New(codeUnknown, mysql.MySQLErrName[mysql.ErrUnknown])
ErrUnknownColumn = terror.ClassOptimizer.New(codeUnknownColumn, mysql.MySQLErrName[mysql.ErrBadField])
ErrUnknownTable = terror.ClassOptimizer.New(codeUnknownTable, mysql.MySQLErrName[mysql.ErrUnknownTable])
ErrWrongArguments = terror.ClassOptimizer.New(codeWrongArguments, mysql.MySQLErrName[mysql.ErrWrongArguments])
Expand Down

0 comments on commit 6cf0fb8

Please sign in to comment.