Skip to content

Commit

Permalink
DDL: fix a bug in column charset and collate when create table… (#11492)
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored and zz-jason committed Jul 29, 2019
1 parent 133bb08 commit 29210b9
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 11 deletions.
79 changes: 68 additions & 11 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,34 @@ func typesNeedCharset(tp byte) bool {
return false
}

func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCharset string) error {
func setCharsetCollationFlenDecimal(tp *types.FieldType, specifiedCollates []string, tblCharset string, dbCharset string) error {
tp.Charset = strings.ToLower(tp.Charset)
tp.Collate = strings.ToLower(tp.Collate)
if len(tp.Charset) == 0 {
if typesNeedCharset(tp.Tp) {
var err error
tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset)
if err != nil {
return errors.Trace(err)
if len(specifiedCollates) == 0 {
// Both the charset and collate are not specified.
var err error
tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset)
if err != nil {
return errors.Trace(err)
}
} else {
// The charset is not specified but the collate is.
// We should derive charset from it's collate specified rather than getting from table and db.
// It is handled like mysql's logic, use derived charset to judge conflict with next collate.
for _, spc := range specifiedCollates {
derivedCollation, err := charset.GetCollationByName(spc)
if err != nil {
return errors.Trace(err)
}
if len(tp.Charset) == 0 {
tp.Charset = derivedCollation.CharsetName
} else if tp.Charset != derivedCollation.CharsetName {
return ErrCollationCharsetMismatch.GenWithStackByArgs(derivedCollation.Name, tp.Charset)
}
tp.Collate = derivedCollation.Name
}
}
} else {
tp.Charset = charset.CharsetBin
Expand All @@ -314,10 +333,25 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCh
return errUnsupportedCharset.GenWithStackByArgs(tp.Charset, tp.Collate)
}
if len(tp.Collate) == 0 {
var err error
tp.Collate, err = charset.GetDefaultCollation(tp.Charset)
if err != nil {
return errors.Trace(err)
if len(specifiedCollates) == 0 {
// The charset is specified, but the collate is not.
var err error
tp.Collate, err = charset.GetDefaultCollation(tp.Charset)
if err != nil {
return errors.Trace(err)
}
} else {
// Both the charset and collate are specified.
for _, spc := range specifiedCollates {
derivedCollation, err := charset.GetCollationByName(spc)
if err != nil {
return errors.Trace(err)
}
if tp.Charset != derivedCollation.CharsetName {
return ErrCollationCharsetMismatch.GenWithStackByArgs(derivedCollation.Name, tp.Charset)
}
tp.Collate = derivedCollation.Name
}
}
}
}
Expand All @@ -341,7 +375,10 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCh
// outPriKeyConstraint is the primary key constraint out of column definition. such as: create table t1 (id int , age int, primary key(id));
func buildColumnAndConstraint(ctx sessionctx.Context, offset int,
colDef *ast.ColumnDef, outPriKeyConstraint *ast.Constraint, tblCharset, dbCharset string) (*table.Column, []*ast.Constraint, error) {
if err := setCharsetCollationFlenDecimal(colDef.Tp, tblCharset, dbCharset); err != nil {
// specifiedCollates refers to collates in colDef.Options, should handle them together.
specifiedCollates := extractCollateFromOption(colDef)

if err := setCharsetCollationFlenDecimal(colDef.Tp, specifiedCollates, tblCharset, dbCharset); err != nil {
return nil, nil, errors.Trace(err)
}
col, cts, err := columnDefToCol(ctx, offset, colDef, outPriKeyConstraint)
Expand Down Expand Up @@ -2516,7 +2553,11 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
newCol.FieldType.Charset = col.FieldType.Charset
newCol.FieldType.Collate = col.FieldType.Collate
}
err = setCharsetCollationFlenDecimal(&newCol.FieldType, t.Meta().Charset, schema.Charset)
// specifiedCollates refers to collates in colDef.Option. When setting charset and collate here we
// should take the collate in colDef.Option into consideration rather than handling it separately
specifiedCollates := extractCollateFromOption(specNewColumn)

err = setCharsetCollationFlenDecimal(&newCol.FieldType, specifiedCollates, t.Meta().Charset, schema.Charset)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -3266,3 +3307,19 @@ func buildPartitionInfo(meta *model.TableInfo, d *ddl, spec *ast.AlterTableSpec)
}
return part, nil
}

// extractCollateFromOption take collates(may multiple) in option into consideration
// when handle charset and collate of a column, rather than handling it separately.
func extractCollateFromOption(def *ast.ColumnDef) []string {
specifiedCollates := make([]string, 0, 0)
for i := 0; i < len(def.Options); i++ {
op := def.Options[i]
if op.Tp == ast.ColumnOptionCollate {
specifiedCollates = append(specifiedCollates, op.StrValue)
def.Options = append(def.Options[:i], def.Options[i+1:]...)
// maintain the correct index
i--
}
}
return specifiedCollates
}
67 changes: 67 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,36 @@ func (s *testSuite3) TestCreateTable(c *C) {
}
}

// test multiple collate specified in column when create.
tk.MustExec("drop table if exists test_multiple_column_collate;")
tk.MustExec("create table test_multiple_column_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
t, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("test_multiple_column_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8")
c.Assert(t.Cols()[0].Collate, Equals, "utf8_general_ci")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

tk.MustExec("drop table if exists test_multiple_column_collate;")
tk.MustExec("create table test_multiple_column_collate (a char(1) charset utf8 collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
t, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("test_multiple_column_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8")
c.Assert(t.Cols()[0].Collate, Equals, "utf8_general_ci")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

// test Err case for multiple collate specified in column when create.
tk.MustExec("drop table if exists test_err_multiple_collate;")
_, err = tk.Exec("create table test_err_multiple_collate (a char(1) charset utf8mb4 collate utf8_unicode_ci collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8_unicode_ci", "utf8mb4").Error())

tk.MustExec("drop table if exists test_err_multiple_collate;")
_, err = tk.Exec("create table test_err_multiple_collate (a char(1) collate utf8_unicode_ci collate utf8mb4_general_ci) charset utf8mb4 collate utf8mb4_bin")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8mb4_general_ci", "utf8").Error())

// table option is auto-increment
tk.MustExec("drop table if exists create_auto_increment_test;")
tk.MustExec("create table create_auto_increment_test (id int not null auto_increment, name varchar(255), primary key(id)) auto_increment = 999;")
Expand Down Expand Up @@ -322,6 +352,43 @@ func (s *testSuite3) TestAlterTableModifyColumn(c *C) {
_, err = tk.Exec("alter table alter_view modify column c2 text")
c.Assert(err.Error(), Equals, ddl.ErrWrongObject.GenWithStackByArgs("test", "alter_view", "BASE TABLE").Error())
tk.MustExec("drop view alter_view")

// test multiple collate modification in column.
tk.MustExec("drop table if exists modify_column_multiple_collate")
tk.MustExec("create table modify_column_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table modify_column_multiple_collate modify column a char(1) collate utf8mb4_bin;")
c.Assert(err, IsNil)
t, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("modify_column_multiple_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8mb4")
c.Assert(t.Cols()[0].Collate, Equals, "utf8mb4_bin")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

tk.MustExec("drop table if exists modify_column_multiple_collate;")
tk.MustExec("create table modify_column_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table modify_column_multiple_collate modify column a char(1) charset utf8mb4 collate utf8mb4_bin;")
c.Assert(err, IsNil)
t, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("modify_column_multiple_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8mb4")
c.Assert(t.Cols()[0].Collate, Equals, "utf8mb4_bin")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

// test Err case for multiple collate modification in column.
tk.MustExec("drop table if exists err_modify_multiple_collate;")
tk.MustExec("create table err_modify_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table err_modify_multiple_collate modify column a char(1) charset utf8mb4 collate utf8_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8_bin", "utf8mb4").Error())

tk.MustExec("drop table if exists err_modify_multiple_collate;")
tk.MustExec("create table err_modify_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table err_modify_multiple_collate modify column a char(1) collate utf8_bin collate utf8mb4_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8mb4_bin", "utf8").Error())

}

func (s *testSuite3) TestDefaultDBAfterDropCurDB(c *C) {
Expand Down

0 comments on commit 29210b9

Please sign in to comment.