Skip to content

Commit

Permalink
*: support generated column in loader and binlog replication (pingcap#42
Browse files Browse the repository at this point in the history
)
  • Loading branch information
amyangfei authored Feb 21, 2019
1 parent 6daa3ea commit 6c5eb1e
Show file tree
Hide file tree
Showing 15 changed files with 551 additions and 25 deletions.
42 changes: 39 additions & 3 deletions loader/convert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ func bytes2str(bs []byte) string {
func parseInsertStmt(sql []byte, table *tableInfo, columnMapping *cm.Mapping) ([][]string, error) {
var s, e, size int
var rows = make([][]string, 0, 1024)
var VALUES = []byte("VALUES")

// If table has generated column, the dumped SQL file has a different `INSERT INTO` line,
// which provides column names except generated column. such as following:
// INSERT INTO `t1` (`id`,`uid`,`name`,`info`) VALUES
// (1,10001,"Gabriel García Márquez",NULL),
// (2,10002,"Cien años de soledad",NULL);
// otherwise dumped SQL file has content like folloing:
// INSERT INTO `t1` VALUES
// (1,"hello"),
// (2,"world");

for {
sql = sql[s:]
Expand All @@ -56,6 +67,10 @@ func parseInsertStmt(sql []byte, table *tableInfo, columnMapping *cm.Mapping) ([
if sql[e] == '\n' && (sql[e-1] == ',' || sql[e-1] == ';') && sql[e-2] == ')' {
break
}
if sql[e] == '\n' && e-6 > s && bytes.Compare(sql[e-6:e], VALUES) == 0 {
s = e + 1
continue
}
}
if e == size {
return nil, errors.New("not found cooresponding ending of sql: ')'")
Expand Down Expand Up @@ -236,9 +251,30 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error)
return nil, errors.Errorf("statement %s for %s/%s is not create table statement", statement, schema, table)
}

columns := make([]string, 0, len(ct.Cols))
var (
columns = make([]string, 0, len(ct.Cols))
hasGeneragedCols = false
columnNameFields = ""
)
for _, col := range ct.Cols {
columns = append(columns, col.Name.Name.O)
skip := false
for _, opt := range col.Options {
if opt.Tp == ast.ColumnOptionGenerated {
hasGeneragedCols = true
skip = true
break
}
}
if !skip {
columns = append(columns, col.Name.Name.O)
}
}
if hasGeneragedCols {
var escapeColumns []string
for _, column := range columns {
escapeColumns = append(escapeColumns, fmt.Sprintf("`%s`", column))
}
columnNameFields = "(" + strings.Join(escapeColumns, ",") + ") "
}

dstSchema, dstTable := fetchMatchedLiteral(r, schema, table)
Expand All @@ -248,7 +284,7 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error)
targetSchema: dstSchema,
targetTable: dstTable,
columnNameList: columns,
insertHeadStmt: fmt.Sprintf("INSERT INTO `%s` VALUES", dstTable),
insertHeadStmt: fmt.Sprintf("INSERT INTO `%s` %sVALUES", dstTable, columnNameFields),
}, nil
}

Expand Down
60 changes: 60 additions & 0 deletions loader/convert_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,41 @@ func (t *testConvertDataSuite) TestReassemble(c *C) {
}
}

func (t *testConvertDataSuite) TestReassembleWithGeneratedColumn(c *C) {
table := &tableInfo{
sourceSchema: "test2",
sourceTable: "t3",
targetSchema: "test",
targetTable: "t",
columnNameList: []string{
"id",
"t_json",
},
insertHeadStmt: "INSERT INTO t (`id`,`t_json`) VALUES",
}
sql := `INSERT INTO t1 (id,t_json) VALUES
(10,'{}'),
(9,NULL);
(8,'{"a":123}');
`
expected := "INSERT INTO t (`id`,`t_json`) VALUES(585520728116297738,'{}'),(585520728116297737,NULL),(585520728116297736,'{\"a\":123}');"
rules := []*cm.Rule{
{
PatternSchema: "test*",
PatternTable: "t*",
TargetColumn: "id",
Expression: cm.PartitionID,
Arguments: []string{"1", "test", "t"},
},
}

columnMapping, err := cm.NewMapping(false, rules)
c.Assert(err, IsNil)
query, err := reassemble([]byte(sql), table, columnMapping)
c.Assert(err, IsNil)
c.Assert(query, Equals, expected)
}

func (t *testConvertDataSuite) TestParseTable(c *C) {
rules := []*router.TableRule{
{"test*", "t*", "test", "t"},
Expand Down Expand Up @@ -128,3 +163,28 @@ func (t *testConvertDataSuite) TestParseTable(c *C) {
c.Assert(err, IsNil)
c.Assert(tableInfo, DeepEquals, expectedTableInfo)
}

func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) {
rules := []*router.TableRule{
{"test*", "t*", "test", "t"},
}

expectedTableInfo := &tableInfo{
sourceSchema: "test1",
sourceTable: "t3",
targetSchema: "test",
targetTable: "t",
columnNameList: []string{
"id",
"t_json",
},
insertHeadStmt: "INSERT INTO `t` (`id`,`t_json`) VALUES",
}

r, err := router.NewTableRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(r, "test1", "t3", "./dumpfile/test1.t3-schema.sql")
c.Assert(err, IsNil)
c.Assert(tableInfo, DeepEquals, expectedTableInfo)
}
9 changes: 9 additions & 0 deletions loader/dumpfile/test1.t3-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*!40101 SET NAMES binary*/;
/*!40014 SET FOREIGN_KEY_CHECKS=0*/;

CREATE TABLE `binlog_1` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`t_json` VARCHAR(100),
`t_json_gen` json comment 'test comment' as (`t_json`) VIRTUAL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;
8 changes: 8 additions & 0 deletions syncer/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ func columnOptionsToSQL(options []*ast.ColumnOption) string {
sql += fmt.Sprintf(" COMMENT '%s'", comment)
case ast.ColumnOptionOnUpdate: // For Timestamp and Datetime only.
sql += " ON UPDATE CURRENT_TIMESTAMP"
case ast.ColumnOptionGenerated:
var store string
if opt.Stored {
store = "STORED"
} else {
store = "VIRTUAL"
}
sql += fmt.Sprintf(" GENERATED ALWAYS AS (%s) %s", opt.Expr.Text(), store)
case ast.ColumnOptionFulltext:
panic("not implemented yet")
default:
Expand Down
23 changes: 15 additions & 8 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type column struct {
NotNull bool
unsigned bool
tp string
extra string
}

type table struct {
Expand Down Expand Up @@ -362,14 +363,15 @@ func getTableColumns(db *Conn, table *table, maxRetry int) error {
// Show an example.
/*
mysql> show columns from test.t;
+-------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| a | int(11) | NO | PRI | NULL | |
| b | int(11) | NO | PRI | NULL | |
| c | int(11) | YES | MUL | NULL | |
| d | int(11) | YES | | NULL | |
+-------+---------+------+-----+---------+-------+
+-------+---------+------+-----+---------+-------------------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------------------+
| a | int(11) | NO | PRI | NULL | |
| b | int(11) | NO | PRI | NULL | |
| c | int(11) | YES | MUL | NULL | |
| d | int(11) | YES | | NULL | |
| d | json | YES | | NULL | VIRTUAL GENERATED |
+-------+---------+------+-----+---------+-------------------+
*/

idx := 0
Expand All @@ -390,6 +392,7 @@ func getTableColumns(db *Conn, table *table, maxRetry int) error {
column.idx = idx
column.name = string(data[0])
column.tp = string(data[1])
column.extra = string(data[5])

if strings.ToLower(string(data[2])) == "no" {
column.NotNull = true
Expand Down Expand Up @@ -456,3 +459,7 @@ func getBinaryLogs(db *sql.DB) ([]binlogSize, error) {
}
return files, nil
}

func (c *column) isGeneratedColumn() bool {
return strings.Contains(c.extra, "VIRTUAL GENERATED") || strings.Contains(c.extra, "STORED GENERATED")
}
54 changes: 54 additions & 0 deletions syncer/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"

. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb-tools/pkg/filter"

"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -86,9 +87,14 @@ func (s *testSyncerSuite) TestGenDDLSQL(c *C) {
c.Assert(err, IsNil)
stmt, err := p.ParseOneStmt(t[0], "", "")
c.Assert(err, IsNil)

sql, err := genDDLSQL(t[0], stmt, originTableNameSingle, targetTableNameSingle, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])

sql, err = genDDLSQL(t[1], stmt, originTableNameSingle, targetTableNameSingle, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])
}

testCase = [][]string{
Expand All @@ -101,9 +107,14 @@ func (s *testSyncerSuite) TestGenDDLSQL(c *C) {
c.Assert(err, IsNil)
stmt, err := p.ParseOneStmt(t[0], "", "")
c.Assert(err, IsNil)

sql, err := genDDLSQL(t[0], stmt, originTableNameDouble, targetTableNameDouble, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])

sql, err = genDDLSQL(t[1], stmt, originTableNameDouble, targetTableNameDouble, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])
}

}
Expand Down Expand Up @@ -338,3 +349,46 @@ func (s *testSyncerSuite) TestIgnoreDMLInQuery(c *C) {
c.Assert(pr.isDDL, Equals, cs.isDDL)
}
}

func (s *testSyncerSuite) TestResolveGeneratedColumnSQL(c *C) {
testCases := []struct {
sql string
expected string
}{
{
"ALTER TABLE `test`.`test` ADD COLUMN d int(11) GENERATED ALWAYS AS (c + 1) VIRTUAL",
"ALTER TABLE `test`.`test` ADD COLUMN `d` int(11) GENERATED ALWAYS AS (c + 1) VIRTUAL",
},
{
"ALTER TABLE `test`.`test` ADD COLUMN d int(11) AS (1 + 1) STORED",
"ALTER TABLE `test`.`test` ADD COLUMN `d` int(11) GENERATED ALWAYS AS (1 + 1) STORED",
},
}

syncer := &Syncer{}
parser, err := utils.GetParser(s.db, false)
c.Assert(err, IsNil)

for _, tc := range testCases {
ast1, err := parser.ParseOneStmt(tc.sql, "", "")
c.Assert(err, IsNil)

sqls, _, _, err := syncer.resolveDDLSQL(tc.sql, parser, "")
c.Assert(err, IsNil)

c.Assert(len(sqls), Equals, 1)
getSQL := sqls[0]
c.Assert(getSQL, Equals, tc.expected)

ast2, err := parser.ParseOneStmt(getSQL, "", "")
c.Assert(err, IsNil)

// compare parsed ast of the resoved SQL with parsed ast of the origin SQL.
// because text fields are not always same, and the difference of text
// makes no sense to the semantics, we just ignore checking it.
atStmt1 := ast1.(*ast.AlterTableStmt)
atStmt2 := ast2.(*ast.AlterTableStmt)
c.Assert(atStmt1.Table, DeepEquals, atStmt2.Table)
c.Assert(atStmt1.Specs, DeepEquals, atStmt2.Specs)
}
}
Loading

0 comments on commit 6c5eb1e

Please sign in to comment.