Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: support generated column in loader and binlog replication #42

Merged
merged 20 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions loader/convert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ func bytes2str(bs []byte) string {
func parseInsertStmt(sql []byte, table *tableInfo, columnMapping *cm.Mapping) ([][]string, error) {
var s, e, size int
var rows = make([][]string, 0, 1024)
var VALUES = []byte("VALUES")

// If table has generated column, the dumped SQL file has a different `INSERT INTO` line,
// which provides column names except generated column. such as following:
// INSERT INTO `t1` (`id`,`uid`,`name`,`info`) VALUES
// (1,10001,"Gabriel García Márquez",NULL),
// (2,10002,"Cien años de soledad",NULL);
// otherwise dumped SQL file has content like folloing:
// INSERT INTO `t1` VALUES
// (1,"hello"),
// (2,"world");

for {
sql = sql[s:]
Expand All @@ -56,6 +67,10 @@ func parseInsertStmt(sql []byte, table *tableInfo, columnMapping *cm.Mapping) ([
if sql[e] == '\n' && (sql[e-1] == ',' || sql[e-1] == ';') && sql[e-2] == ')' {
break
}
if sql[e] == '\n' && e-6 > s && bytes.Compare(sql[e-6:e], VALUES) == 0 {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
s = e + 1
continue
}
}
if e == size {
return nil, errors.New("not found cooresponding ending of sql: ')'")
Expand Down Expand Up @@ -236,9 +251,30 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error)
return nil, errors.Errorf("statement %s for %s/%s is not create table statement", statement, schema, table)
}

columns := make([]string, 0, len(ct.Cols))
var (
columns = make([]string, 0, len(ct.Cols))
hasGeneragedCols = false
columnNameFields = ""
)
for _, col := range ct.Cols {
columns = append(columns, col.Name.Name.O)
skip := false
for _, opt := range col.Options {
if opt.Tp == ast.ColumnOptionGenerated {
hasGeneragedCols = true
skip = true
break
}
}
if !skip {
columns = append(columns, col.Name.Name.O)
}
}
if hasGeneragedCols {
var escapeColumns []string
for _, column := range columns {
escapeColumns = append(escapeColumns, fmt.Sprintf("`%s`", column))
}
columnNameFields = "(" + strings.Join(escapeColumns, ",") + ") "
}

dstSchema, dstTable := fetchMatchedLiteral(r, schema, table)
Expand All @@ -248,7 +284,7 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error)
targetSchema: dstSchema,
targetTable: dstTable,
columnNameList: columns,
insertHeadStmt: fmt.Sprintf("INSERT INTO `%s` VALUES", dstTable),
insertHeadStmt: fmt.Sprintf("INSERT INTO `%s` %sVALUES", dstTable, columnNameFields),
}, nil
}

Expand Down
60 changes: 60 additions & 0 deletions loader/convert_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,41 @@ func (t *testConvertDataSuite) TestReassemble(c *C) {
}
}

func (t *testConvertDataSuite) TestReassembleWithGeneratedColumn(c *C) {
table := &tableInfo{
sourceSchema: "test2",
sourceTable: "t3",
targetSchema: "test",
targetTable: "t",
columnNameList: []string{
"id",
"t_json",
},
insertHeadStmt: "INSERT INTO t (`id`,`t_json`) VALUES",
}
sql := `INSERT INTO t1 (id,t_json) VALUES
(10,'{}'),
(9,NULL);
(8,'{"a":123}');
`
expected := "INSERT INTO t (`id`,`t_json`) VALUES(585520728116297738,'{}'),(585520728116297737,NULL),(585520728116297736,'{\"a\":123}');"
rules := []*cm.Rule{
{
PatternSchema: "test*",
PatternTable: "t*",
TargetColumn: "id",
Expression: cm.PartitionID,
Arguments: []string{"1", "test", "t"},
},
}

columnMapping, err := cm.NewMapping(false, rules)
c.Assert(err, IsNil)
query, err := reassemble([]byte(sql), table, columnMapping)
c.Assert(err, IsNil)
c.Assert(query, Equals, expected)
}

func (t *testConvertDataSuite) TestParseTable(c *C) {
rules := []*router.TableRule{
{"test*", "t*", "test", "t"},
Expand Down Expand Up @@ -128,3 +163,28 @@ func (t *testConvertDataSuite) TestParseTable(c *C) {
c.Assert(err, IsNil)
c.Assert(tableInfo, DeepEquals, expectedTableInfo)
}

func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) {
rules := []*router.TableRule{
{"test*", "t*", "test", "t"},
}

expectedTableInfo := &tableInfo{
sourceSchema: "test1",
sourceTable: "t3",
targetSchema: "test",
targetTable: "t",
columnNameList: []string{
"id",
"t_json",
},
insertHeadStmt: "INSERT INTO `t` (`id`,`t_json`) VALUES",
}

r, err := router.NewTableRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(r, "test1", "t3", "./dumpfile/test1.t3-schema.sql")
c.Assert(err, IsNil)
c.Assert(tableInfo, DeepEquals, expectedTableInfo)
}
9 changes: 9 additions & 0 deletions loader/dumpfile/test1.t3-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*!40101 SET NAMES binary*/;
/*!40014 SET FOREIGN_KEY_CHECKS=0*/;

CREATE TABLE `binlog_1` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`t_json` VARCHAR(100),
`t_json_gen` json comment 'test comment' as (`t_json`) VIRTUAL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;
8 changes: 8 additions & 0 deletions syncer/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ func columnOptionsToSQL(options []*ast.ColumnOption) string {
sql += fmt.Sprintf(" COMMENT '%s'", comment)
case ast.ColumnOptionOnUpdate: // For Timestamp and Datetime only.
sql += " ON UPDATE CURRENT_TIMESTAMP"
case ast.ColumnOptionGenerated:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use restore to implement it later

var store string
if opt.Stored {
store = "STORED"
} else {
store = "VIRTUAL"
}
sql += fmt.Sprintf(" GENERATED ALWAYS AS (%s) %s", opt.Expr.Text(), store)
case ast.ColumnOptionFulltext:
panic("not implemented yet")
default:
Expand Down
23 changes: 15 additions & 8 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type column struct {
NotNull bool
unsigned bool
tp string
extra string
}

type table struct {
Expand Down Expand Up @@ -362,14 +363,15 @@ func getTableColumns(db *Conn, table *table, maxRetry int) error {
// Show an example.
/*
mysql> show columns from test.t;
+-------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| a | int(11) | NO | PRI | NULL | |
| b | int(11) | NO | PRI | NULL | |
| c | int(11) | YES | MUL | NULL | |
| d | int(11) | YES | | NULL | |
+-------+---------+------+-----+---------+-------+
+-------+---------+------+-----+---------+-------------------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------------------+
| a | int(11) | NO | PRI | NULL | |
| b | int(11) | NO | PRI | NULL | |
| c | int(11) | YES | MUL | NULL | |
| d | int(11) | YES | | NULL | |
| d | json | YES | | NULL | VIRTUAL GENERATED |
+-------+---------+------+-----+---------+-------------------+
*/

idx := 0
Expand All @@ -390,6 +392,7 @@ func getTableColumns(db *Conn, table *table, maxRetry int) error {
column.idx = idx
column.name = string(data[0])
column.tp = string(data[1])
column.extra = string(data[5])
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Feb 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update example at L365


if strings.ToLower(string(data[2])) == "no" {
column.NotNull = true
Expand Down Expand Up @@ -456,3 +459,7 @@ func getBinaryLogs(db *sql.DB) ([]binlogSize, error) {
}
return files, nil
}

func (c *column) isGeneratedColumn() bool {
return strings.Contains(c.extra, "VIRTUAL GENERATED") || strings.Contains(c.extra, "STORED GENERATED")
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must extra be uppercase? should we use == strings.TrimSpace(xxx) rather than strings.Contains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to MySQL doc, VIRTUAL GENERATED and STORED GENERATED in Extra field are always uppercase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strings.Contains is used because there may exist other data in Extra?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid some comment options are in it

Copy link
Contributor Author

@amyangfei amyangfei Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://dev.mysql.com/doc/refman/5.7/en/show-columns.html
In MySQL doc, there are only four Extra fields: auto_increment, on update CURRENT_TIMESTAMP, VIRTUAL GENERATED or VIRTUAL STORED

}
54 changes: 54 additions & 0 deletions syncer/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"

. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb-tools/pkg/filter"

"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -86,9 +87,14 @@ func (s *testSyncerSuite) TestGenDDLSQL(c *C) {
c.Assert(err, IsNil)
stmt, err := p.ParseOneStmt(t[0], "", "")
c.Assert(err, IsNil)

sql, err := genDDLSQL(t[0], stmt, originTableNameSingle, targetTableNameSingle, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])

sql, err = genDDLSQL(t[1], stmt, originTableNameSingle, targetTableNameSingle, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])
}

testCase = [][]string{
Expand All @@ -101,9 +107,14 @@ func (s *testSyncerSuite) TestGenDDLSQL(c *C) {
c.Assert(err, IsNil)
stmt, err := p.ParseOneStmt(t[0], "", "")
c.Assert(err, IsNil)

sql, err := genDDLSQL(t[0], stmt, originTableNameDouble, targetTableNameDouble, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])

sql, err = genDDLSQL(t[1], stmt, originTableNameDouble, targetTableNameDouble, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])
}

}
Expand Down Expand Up @@ -338,3 +349,46 @@ func (s *testSyncerSuite) TestIgnoreDMLInQuery(c *C) {
c.Assert(pr.isDDL, Equals, cs.isDDL)
}
}

func (s *testSyncerSuite) TestResolveGeneratedColumnSQL(c *C) {
testCases := []struct {
sql string
expected string
}{
{
"ALTER TABLE `test`.`test` ADD COLUMN d int(11) GENERATED ALWAYS AS (c + 1) VIRTUAL",
"ALTER TABLE `test`.`test` ADD COLUMN `d` int(11) GENERATED ALWAYS AS (c + 1) VIRTUAL",
},
{
"ALTER TABLE `test`.`test` ADD COLUMN d int(11) AS (1 + 1) STORED",
"ALTER TABLE `test`.`test` ADD COLUMN `d` int(11) GENERATED ALWAYS AS (1 + 1) STORED",
},
}

syncer := &Syncer{}
parser, err := utils.GetParser(s.db, false)
c.Assert(err, IsNil)

for _, tc := range testCases {
ast1, err := parser.ParseOneStmt(tc.sql, "", "")
c.Assert(err, IsNil)

sqls, _, _, err := syncer.resolveDDLSQL(tc.sql, parser, "")
c.Assert(err, IsNil)

c.Assert(len(sqls), Equals, 1)
getSQL := sqls[0]
c.Assert(getSQL, Equals, tc.expected)

ast2, err := parser.ParseOneStmt(getSQL, "", "")
c.Assert(err, IsNil)

// compare parsed ast of the resoved SQL with parsed ast of the origin SQL.
// because text fields are not always same, and the difference of text
// makes no sense to the semantics, we just ignore checking it.
atStmt1 := ast1.(*ast.AlterTableStmt)
atStmt2 := ast2.(*ast.AlterTableStmt)
c.Assert(atStmt1.Table, DeepEquals, atStmt2.Table)
c.Assert(atStmt1.Specs, DeepEquals, atStmt2.Specs)
}
}
Loading