Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb into admin-c…
Browse files Browse the repository at this point in the history
…heck-decoder
  • Loading branch information
crazycs520 committed Oct 17, 2018
2 parents 09ee890 + 732233a commit ccf9135
Show file tree
Hide file tree
Showing 141 changed files with 3,249 additions and 895 deletions.
15 changes: 8 additions & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ required = ["github.com/golang/protobuf/jsonpb"]
[[constraint]]
name = "google.golang.org/grpc"
source = "https://github.com/grpc/grpc-go.git"
version = "1.12.0"
version = "=1.12.2"

[[constraint]]
version = "1.1.0"
Expand Down
5 changes: 3 additions & 2 deletions ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ func (n *TraceStmt) Accept(v Visitor) (Node, bool) {
type ExplainStmt struct {
stmtNode

Stmt StmtNode
Format string
Stmt StmtNode
Format string
Analyze bool
}

// Accept implements Node Accept interface.
Expand Down
7 changes: 4 additions & 3 deletions ast/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ var (
type AnalyzeTableStmt struct {
stmtNode

TableNames []*TableName
IndexNames []model.CIStr
MaxNumBuckets uint64
TableNames []*TableName
PartitionNames []model.CIStr
IndexNames []model.CIStr
MaxNumBuckets uint64

// IndexFlag is true when we only analyze indices for a table.
IndexFlag bool
Expand Down
6 changes: 3 additions & 3 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ create table t(a int primary key, b int, c int, index idx(b));
explain select t.c in (select count(*) from t s ignore index(idx), t t1 where s.a = t.a and s.a = t1.a) from t;
id count task operator info
Projection_11 10000.00 root 9_aux_0
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, count(*))]
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, 7_col_0)]
├─TableReader_15 10000.00 root data:TableScan_14
│ └─TableScan_14 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_20 1.00 root funcs:count(1)
Expand All @@ -281,7 +281,7 @@ Projection_11 10000.00 root 9_aux_0
explain select t.c in (select count(*) from t s use index(idx), t t1 where s.b = t.a and s.a = t1.a) from t;
id count task operator info
Projection_11 10000.00 root 9_aux_0
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, count(*))]
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, 7_col_0)]
├─TableReader_15 10000.00 root data:TableScan_14
│ └─TableScan_14 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_20 1.00 root funcs:count(1)
Expand All @@ -293,7 +293,7 @@ Projection_11 10000.00 root 9_aux_0
explain select t.c in (select count(*) from t s use index(idx), t t1 where s.b = t.a and s.c = t1.a) from t;
id count task operator info
Projection_11 10000.00 root 9_aux_0
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, count(*))]
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, 7_col_0)]
├─TableReader_15 10000.00 root data:TableScan_14
│ └─TableScan_14 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_20 1.00 root funcs:count(1)
Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ var defaultConf = Config{
MemQuotaQuery: 32 << 30,
EnableStreaming: false,
TxnLocalLatches: TxnLocalLatches{
Enabled: false,
Capacity: 10240000,
Enabled: true,
Capacity: 2048000,
},
LowerCaseTableNames: 2,
Log: Log{
Expand Down
4 changes: 2 additions & 2 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ commit-timeout = "41s"
[txn-local-latches]
# Enable local latches for transactions. Enable it when
# there are lots of conflicts between transactions.
enabled = false
capacity = 10240000
enabled = true
capacity = 2048000

[binlog]

Expand Down
104 changes: 97 additions & 7 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@
package ddl

import (
"fmt"

"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -267,27 +272,42 @@ func onSetDefaultValue(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return updateColumn(t, job, newCol, &newCol.Name)
}

func onModifyColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
func (w *worker) onModifyColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
newCol := &model.ColumnInfo{}
oldColName := &model.CIStr{}
pos := &ast.ColumnPosition{}
err := job.DecodeArgs(newCol, oldColName, pos)
var modifyColumnTp byte
err := job.DecodeArgs(newCol, oldColName, pos, &modifyColumnTp)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

return doModifyColumn(t, job, newCol, oldColName, pos)
return w.doModifyColumn(t, job, newCol, oldColName, pos, modifyColumnTp)
}

// doModifyColumn updates the column information and reorders all columns.
func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldName *model.CIStr, pos *ast.ColumnPosition) (ver int64, _ error) {
func (w *worker) doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldName *model.CIStr, pos *ast.ColumnPosition, modifyColumnTp byte) (ver int64, _ error) {
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

oldCol := model.FindColumnInfo(tblInfo.Columns, oldName.L)
if job.IsRollingback() {
ver, err = rollbackModifyColumnJob(t, tblInfo, job, oldCol, modifyColumnTp)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
return ver, nil
}

if oldCol == nil || oldCol.State != model.StatePublic {
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(oldName, tblInfo.Name)
Expand All @@ -308,6 +328,17 @@ func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldN
// }
// }

if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) {
ver, err = modifyColumnFromNull2NotNull(w, t, dbInfo, tblInfo, job, oldCol, newCol)
if err != nil {
return ver, errors.Trace(err)
}
// Introduce the `mysql.HasPreventNullInsertFlag` flag to prevent users from inserting or updating null values.
if !mysql.HasPreventNullInsertFlag(oldCol.Flag) {
return ver, nil
}
}

// We need the latest column's offset and state. This information can be obtained from the store.
newCol.Offset = oldCol.Offset
newCol.State = oldCol.State
Expand All @@ -316,13 +347,14 @@ func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldN
if pos.Tp == ast.ColumnPositionAfter {
if oldName.L == pos.RelativeColumn.Name.L {
// `alter table tableName modify column b int after b` will return ver,ErrColumnNotExists.
job.State = model.JobStateCancelled
// Modified the type definition of 'null' to 'not null' before this, so rollback the job when an error occurs.
job.State = model.JobStateRollingback
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(oldName, tblInfo.Name)
}

relative := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L)
if relative == nil || relative.State != model.StatePublic {
job.State = model.JobStateCancelled
job.State = model.JobStateRollingback
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
}

Expand Down Expand Up @@ -371,14 +403,33 @@ func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldN

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
// Modified the type definition of 'null' to 'not null' before this, so rollBack the job when an error occurs.
job.State = model.JobStateRollingback
return ver, errors.Trace(err)
}

job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

// checkForNullValue ensure there are no null values of the column of this table.
// `isDataTruncated` indicates whether the new field and the old field type are the same, in order to be compatible with mysql.
func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, table, oldCol, newCol model.CIStr) error {
sql := fmt.Sprintf("select count(*) from `%s`.`%s` where `%s` is null limit 1;", schema.L, table.L, oldCol.L)
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
if err != nil {
return errors.Trace(err)
}
rowCount := rows[0].GetInt64(0)
if rowCount != 0 {
if isDataTruncated {
return errInvalidUseOfNull
}
return ErrWarnDataTruncated.GenWithStackByArgs(newCol.L, rowCount)
}
return nil
}

func updateColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldColName *model.CIStr) (ver int64, _ error) {
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
Expand Down Expand Up @@ -423,3 +474,42 @@ func checkAddColumnTooManyColumns(oldCols int) error {
}
return nil
}

// rollbackModifyColumnJob rollbacks the job when an error occurs.
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) {
var err error
if modifyColumnTp == mysql.TypeNull {
// field NotNullFlag flag reset.
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.NotNullFlag
// field PreventNullInsertFlag flag reset.
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.PreventNullInsertFlag
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
}
return ver, nil
}

// modifyColumnFromNull2NotNull modifies the type definitions of 'null' to 'not null'.
func modifyColumnFromNull2NotNull(w *worker, t *meta.Meta, dbInfo *model.DBInfo, tblInfo *model.TableInfo, job *model.Job, oldCol, newCol *model.ColumnInfo) (ver int64, _ error) {
// Get sessionctx from context resource pool.
var ctx sessionctx.Context
ctx, err := w.sessPool.get()
if err != nil {
return ver, errors.Trace(err)
}
defer w.sessPool.put(ctx)

// If there is a null value inserted, it cannot be modified and needs to be rollback.
err = checkForNullValue(ctx, oldCol.Tp == newCol.Tp, dbInfo.Name, tblInfo.Name, oldCol.Name, newCol.Name)
if err != nil {
job.State = model.JobStateRollingback
return ver, errors.Trace(err)
}

// Prevent this field from inserting null values.
tblInfo.Columns[oldCol.Offset].Flag |= mysql.PreventNullInsertFlag
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
return ver, errors.Trace(err)
}
12 changes: 12 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,18 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelColumnModifyingDefinition(c *C) {
sql1 := "insert into t(b) values (null);"
sql2 := "alter table t change b b2 bigint not null;"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
if err2 != nil {
c.Assert(err2.Error(), Equals, "[ddl:1265]Data truncated for column 'b2' at row 1")
}
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}

func (s *testStateChangeSuite) TestParallelChangeColumnName(c *C) {
sql1 := "ALTER TABLE t CHANGE a aa int;"
sql2 := "ALTER TABLE t CHANGE b aa int;"
Expand Down
Loading

0 comments on commit ccf9135

Please sign in to comment.