Skip to content

Commit

Permalink
planner, ddl: convert large varchar for create table in non-strict mo…
Browse files Browse the repository at this point in the history
…de (#31974)

close #30328
  • Loading branch information
unconsolable authored Mar 2, 2022
1 parent 7a12df6 commit a350290
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 11 deletions.
8 changes: 4 additions & 4 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,8 +1212,8 @@ func TestModifyColumn(t *testing.T) {
{"varchar(10) character set gbk", "varchar(255) character set gbk", nil},
}
for _, tt := range tests {
ftA := colDefStrToFieldType(t, tt.origin)
ftB := colDefStrToFieldType(t, tt.to)
ftA := colDefStrToFieldType(t, tt.origin, ctx)
ftB := colDefStrToFieldType(t, tt.to, ctx)
err := checkModifyTypes(ctx, ftA, ftB, false)
if err == nil {
require.NoErrorf(t, tt.err, "origin:%v, to:%v", tt.origin, tt.to)
Expand All @@ -1223,13 +1223,13 @@ func TestModifyColumn(t *testing.T) {
}
}

func colDefStrToFieldType(t *testing.T, str string) *types.FieldType {
func colDefStrToFieldType(t *testing.T, str string, ctx sessionctx.Context) *types.FieldType {
sqlA := "alter table t modify column a " + str
stmt, err := parser.New().ParseOneStmt(sqlA, "", "")
require.NoError(t, err)
colDef := stmt.(*ast.AlterTableStmt).Specs[0].NewColumns[0]
chs, coll := charset.GetDefaultCharsetAndCollate()
col, _, err := buildColumnAndConstraint(nil, 0, colDef, nil, chs, coll)
col, _, err := buildColumnAndConstraint(ctx, 0, colDef, nil, chs, coll)
require.NoError(t, err)
return &col.FieldType
}
Expand Down
23 changes: 23 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7814,3 +7814,26 @@ func (s *testDBSuite1) TestGetTimeZone(c *C) {
c.Assert(tc.offset, Equals, offset, Commentf("sql: %s", tc.tzSQL))
}
}

// for issue #30328
func (s *testDBSuite5) TestTooBigFieldLengthAutoConvert(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

err := tk.ExecToErr("create table i30328_1(a varbinary(70000), b varchar(70000000))")
c.Assert(types.ErrTooBigFieldLength.Equal(err), IsTrue)

// save previous sql_mode and change
r := tk.MustQuery("select @@sql_mode")
defer func(sqlMode string) {
tk.MustExec("set @@sql_mode= '" + sqlMode + "'")
tk.MustExec("drop table if exists i30328_1")
tk.MustExec("drop table if exists i30328_2")
}(r.Rows()[0][0].(string))
tk.MustExec("set @@sql_mode='NO_ENGINE_SUBSTITUTION'")

tk.MustExec("create table i30328_1(a varbinary(70000), b varchar(70000000))")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1246 Converting column 'a' from VARBINARY to BLOB", "Warning 1246 Converting column 'b' from VARCHAR to TEXT"))
tk.MustExec("create table i30328_2(a varchar(200))")
tk.MustExec("alter table i30328_2 modify a varchar(70000000);")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1246 Converting column 'a' from VARCHAR to TEXT"))
}
32 changes: 27 additions & 5 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func typesNeedCharset(tp byte) bool {
return false
}

func setCharsetCollationFlenDecimal(tp *types.FieldType, colCharset, colCollate string) error {
func setCharsetCollationFlenDecimal(tp *types.FieldType, colName, colCharset, colCollate string, sessVars *variable.SessionVars) error {
var err error
if typesNeedCharset(tp.Tp) {
tp.Charset = colCharset
Expand All @@ -618,9 +618,11 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType, colCharset, colCollate
}
} else {
// Adjust the field type for blob/text types if the flen is set.
err = adjustBlobTypesFlen(tp, colCharset)
if err = adjustBlobTypesFlen(tp, colCharset); err != nil {
return err
}
}
return err
return checkTooBigFieldLengthAndTryAutoConvert(tp, colName, sessVars)
}

// buildColumnAndConstraint builds table.Column and ast.Constraint from the parameters.
Expand Down Expand Up @@ -652,7 +654,7 @@ func buildColumnAndConstraint(
return nil, nil, errors.Trace(err)
}

if err := setCharsetCollationFlenDecimal(colDef.Tp, chs, coll); err != nil {
if err := setCharsetCollationFlenDecimal(colDef.Tp, colDef.Name.Name.O, chs, coll, ctx.GetSessionVars()); err != nil {
return nil, nil, errors.Trace(err)
}
col, cts, err := columnDefToCol(ctx, offset, colDef, outPriKeyConstraint)
Expand Down Expand Up @@ -4350,7 +4352,7 @@ func (d *ddl) getModifiableColumnJob(ctx context.Context, sctx sessionctx.Contex
}
}

if err = setCharsetCollationFlenDecimal(&newCol.FieldType, chs, coll); err != nil {
if err = setCharsetCollationFlenDecimal(&newCol.FieldType, newCol.Name.O, chs, coll, sctx.GetSessionVars()); err != nil {
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -6968,3 +6970,23 @@ func (d *ddl) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err error
err = d.doDDLJob(ctx, job)
return d.callHookOnChanged(err)
}

// checkTooBigFieldLengthAndTryAutoConvert will check whether the field length is too big
// in non-strict mode and varchar column. If it is, will try to adjust to blob or text, see issue #30328
func checkTooBigFieldLengthAndTryAutoConvert(tp *types.FieldType, colName string, sessVars *variable.SessionVars) error {
if sessVars != nil && !sessVars.SQLMode.HasStrictMode() && tp.Tp == mysql.TypeVarchar {
err := IsTooBigFieldLength(tp.Flen, colName, tp.Charset)
if err != nil && terror.ErrorEqual(types.ErrTooBigFieldLength, err) {
tp.Tp = mysql.TypeBlob
if err = adjustBlobTypesFlen(tp, tp.Charset); err != nil {
return err
}
if tp.Charset == charset.CharsetBin {
sessVars.StmtCtx.AppendWarning(ErrAutoConvert.GenWithStackByArgs(colName, "VARBINARY", "BLOB"))
} else {
sessVars.StmtCtx.AppendWarning(ErrAutoConvert.GenWithStackByArgs(colName, "VARCHAR", "TEXT"))
}
}
}
return nil
}
3 changes: 3 additions & 0 deletions ddl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,7 @@ var (
errFunctionalIndexOnBlob = dbterror.ClassDDL.NewStd(mysql.ErrFunctionalIndexOnBlob)
// ErrIncompatibleTiFlashAndPlacement when placement and tiflash replica options are set at the same time
ErrIncompatibleTiFlashAndPlacement = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("Placement and tiflash replica options cannot be set at the same time", nil))

// ErrAutoConvert when auto convert happens
ErrAutoConvert = dbterror.ClassDDL.NewStd(mysql.ErrAutoConvert)
)
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,11 @@ error = '''
Incorrect usage of %s and %s
'''

["ddl:1246"]
error = '''
Converting column '%s' from %s to %s
'''

["ddl:1248"]
error = '''
Every derived table must have its own alias
Expand Down
22 changes: 20 additions & 2 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -787,8 +788,11 @@ func (p *preprocessor) checkCreateTableGrammar(stmt *ast.CreateTableStmt) {
countPrimaryKey := 0
for _, colDef := range stmt.Cols {
if err := checkColumn(colDef); err != nil {
p.err = err
return
// Try to convert to BLOB or TEXT, see issue #30328
if !terror.ErrorEqual(err, types.ErrTooBigFieldLength) || !p.hasAutoConvertWarning(colDef) {
p.err = err
return
}
}
isPrimary, err := checkColumnOptions(stmt.TemporaryKeyword != ast.TemporaryNone, colDef.Options)
if err != nil {
Expand Down Expand Up @@ -1759,3 +1763,17 @@ func (p *preprocessor) initTxnContextProviderIfNecessary(node ast.Node) {
InfoSchema: p.ensureInfoSchema(),
})
}

func (p *preprocessor) hasAutoConvertWarning(colDef *ast.ColumnDef) bool {
sessVars := p.ctx.GetSessionVars()
if !sessVars.SQLMode.HasStrictMode() && colDef.Tp.Tp == mysql.TypeVarchar {
colDef.Tp.Tp = mysql.TypeBlob
if colDef.Tp.Charset == charset.CharsetBin {
sessVars.StmtCtx.AppendWarning(ddl.ErrAutoConvert.GenWithStackByArgs(colDef.Name.Name.O, "VARBINARY", "BLOB"))
} else {
sessVars.StmtCtx.AppendWarning(ddl.ErrAutoConvert.GenWithStackByArgs(colDef.Name.Name.O, "VARCHAR", "TEXT"))
}
return true
}
return false
}
23 changes: 23 additions & 0 deletions planner/core/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,26 @@ func TestErrKeyPart0(t *testing.T) {
err = tk.ExecToErr("alter table t add index (b(0))")
require.EqualError(t, err, "[planner:1391]Key part 'b' length cannot be 0")
}

// For issue #30328
func TestLargeVarcharAutoConv(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

is := infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable()})
runSQL(t, tk.Session(), is, "CREATE TABLE t1(a varbinary(70000), b varchar(70000000))", false,
errors.New("[types:1074]Column length too big for column 'a' (max = 65535); use BLOB or TEXT instead"))

tk.MustExec("SET sql_mode = 'NO_ENGINE_SUBSTITUTION'")
runSQL(t, tk.Session(), is, "CREATE TABLE t1(a varbinary(70000), b varchar(70000000));", false, nil)
runSQL(t, tk.Session(), is, "CREATE TABLE t1(a varbinary(70000), b varchar(70000000) charset utf8mb4);", false, nil)
warnCnt := tk.Session().GetSessionVars().StmtCtx.WarningCount()
// It is only 3. For the first stmt, charset of column b is not resolved, so ddl will append a warning for it
require.Equal(t, uint16(3), warnCnt)
warns := tk.Session().GetSessionVars().StmtCtx.GetWarnings()
for i := range warns {
require.True(t, terror.ErrorEqual(warns[i].Err, ddl.ErrAutoConvert))
}
}

0 comments on commit a350290

Please sign in to comment.