Skip to content

Commit

Permalink
ticdc/owner: Fix ddl special comment syntax error (#3845)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored Dec 20, 2021
1 parent 5ecb465 commit 7ccaad2
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 34 deletions.
34 changes: 32 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ package owner

import (
"context"
"strings"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/format"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
schedulerv2 "github.com/pingcap/tiflow/cdc/scheduler"
Expand Down Expand Up @@ -482,7 +484,11 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
if err != nil {
return false, errors.Trace(err)
}
ddlEvent.Query = binloginfo.AddSpecialComment(ddlEvent.Query)
ddlEvent.Query, err = addSpecialComment(ddlEvent.Query)
if err != nil {
return false, errors.Trace(err)
}

c.ddlEventCache = ddlEvent
if c.redoManager.Enabled() {
err = c.redoManager.EmitDDLEvent(ctx, ddlEvent)
Expand Down Expand Up @@ -534,3 +540,27 @@ func (c *changefeed) GetInfoProvider() schedulerv2.InfoProvider {
}
return nil
}

// addSpecialComment translate tidb feature to comment
func addSpecialComment(ddlQuery string) (string, error) {
stms, _, err := parser.New().ParseSQL(ddlQuery)
if err != nil {
return "", errors.Trace(err)
}
if len(stms) != 1 {
log.Panic("invalid ddlQuery statement size", zap.String("ddlQuery", ddlQuery))
}
var sb strings.Builder
// translate TiDB feature to special comment
restoreFlags := format.RestoreTiDBSpecialComment
// escape the keyword
restoreFlags |= format.RestoreNameBackQuotes
// upper case keyword
restoreFlags |= format.RestoreKeyWordUppercase
// wrap string with single quote
restoreFlags |= format.RestoreStringSingleQuotes
if err = stms[0].Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil {
return "", errors.Trace(err)
}
return sb.String(), nil
}
145 changes: 143 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "create database test1")
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "CREATE DATABASE `test1`")

// executing the ddl finished
mockDDLSink.ddlDone = true
Expand All @@ -295,7 +295,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)")
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "CREATE TABLE `test1`.`test1` (`id` INT PRIMARY KEY)")

// executing the ddl finished
mockDDLSink.ddlDone = true
Expand Down Expand Up @@ -431,3 +431,144 @@ func testChangefeedReleaseResource(
_, err = os.Stat(redoLogDir)
c.Assert(os.IsNotExist(err), check.IsTrue)
}

func (s *changefeedSuite) TestAddSpecialComment(c *check.C) {
defer testleak.AfterTest(c)()
testCase := []struct {
input string
result string
}{
{
"create table t1 (id int ) shard_row_id_bits=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 engine=innodb pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ ENGINE = innodb /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) pre_split_regions=2 shard_row_id_bits=2;",
"CREATE TABLE `t1` (`id` INT) /*T! PRE_SPLIT_REGIONS = 2 */ /*T! SHARD_ROW_ID_BITS = 2 */",
},
{
"create table t6 (id int ) shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2;",
"CREATE TABLE `t6` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! SHARD_ROW_ID_BITS = 3 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int primary key auto_random(2));",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */)",
},
{
"create table t1 (id int primary key auto_random);",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM */)",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)",
},
{
"create table t1 (id int auto_random ( 3 ) primary key) auto_random_base = 100;",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(3) */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 100 */",
},
{
"create table t1 (id int auto_random primary key) auto_random_base = 50;",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 50 */",
},
{
"create table t1 (id int auto_increment key) auto_id_cache 100;",
"CREATE TABLE `t1` (`id` INT AUTO_INCREMENT PRIMARY KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 100 */",
},
{
"create table t1 (id int auto_increment unique) auto_id_cache 10;",
"CREATE TABLE `t1` (`id` INT AUTO_INCREMENT UNIQUE KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 10 */",
},
{
"create table t1 (id int) auto_id_cache = 5;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int) auto_id_cache=5;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) clustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] CLUSTERED */)",
},
{
"create table t1(id int, v int, primary key(a) clustered);",
"CREATE TABLE `t1` (`id` INT,`v` INT,PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */)",
},
{
"create table t1(id int primary key clustered, v int);",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![clustered_index] CLUSTERED */,`v` INT)",
},
{
"alter table t add primary key(a) clustered;",
"ALTER TABLE `t` ADD PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) nonclustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)",
},
{
"create table clustered_test(id int)",
"CREATE TABLE `clustered_test` (`id` INT)",
},
{
"create database clustered_test",
"CREATE DATABASE `clustered_test`",
},
{
"create database clustered",
"CREATE DATABASE `clustered`",
},
{
"create table clustered (id int)",
"CREATE TABLE `clustered` (`id` INT)",
},
{
"create table t1 (id int, a varchar(255) key clustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */)",
},
{
"alter table t force auto_increment = 12;",
"ALTER TABLE `t` /*T![force_inc] FORCE */ AUTO_INCREMENT = 12",
},
{
"alter table t force, auto_increment = 12;",
"ALTER TABLE `t` FORCE /* AlterTableForce is not supported */ , AUTO_INCREMENT = 12",
},
{
"create table cdc_test (id varchar(10) primary key ,c1 varchar(10)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */",
"CREATE TABLE `cdc_test` (`id` VARCHAR(10) PRIMARY KEY,`c1` VARCHAR(10)) ENGINE = InnoDB DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_BIN /*T! SHARD_ROW_ID_BITS = 4 */ /*T! PRE_SPLIT_REGIONS = 3 */",
},
}
for _, ca := range testCase {
re, err := addSpecialComment(ca.input)
c.Check(err, check.IsNil)
c.Check(re, check.Equals, ca.result)
}
c.Assert(func() {
_, _ = addSpecialComment("alter table t force, auto_increment = 12;alter table t force, auto_increment = 12;")
}, check.Panics, "invalid ddlQuery statement size")
}
64 changes: 34 additions & 30 deletions tests/integration_tests/ddl_reentrant/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,27 @@ WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

ddls=("create database ddl_reentrant" false
"create table ddl_reentrant.t1 (id int primary key, id2 int not null, a varchar(10) not null, unique a(a), unique id2(id2))" false
"alter table ddl_reentrant.t1 add column b int" false
"alter table ddl_reentrant.t1 drop column b" false
"alter table ddl_reentrant.t1 add key index_a(a)" false
"alter table ddl_reentrant.t1 drop index index_a" false
"truncate table ddl_reentrant.t1" true
"alter table ddl_reentrant.t1 modify a varchar(20)" true
"rename table ddl_reentrant.t1 to ddl_reentrant.t2" false
"alter table ddl_reentrant.t2 alter a set default 'hello'" true
"alter table ddl_reentrant.t2 comment='modify comment'" true
"alter table ddl_reentrant.t2 rename index a to idx_a" false
"create table ddl_reentrant.t3 (a int primary key, b int) partition by range(a) (partition p0 values less than (1000), partition p1 values less than (2000))" false
"alter table ddl_reentrant.t3 add partition (partition p2 values less than (3000))" false
"alter table ddl_reentrant.t3 drop partition p2" false
"alter table ddl_reentrant.t3 truncate partition p0" true
"create view ddl_reentrant.t3_view as select a, b from ddl_reentrant.t3" false
"drop view ddl_reentrant.t3_view" false
"alter table ddl_reentrant.t3 default character set utf8mb4 default collate utf8mb4_unicode_ci" true
"alter schema ddl_reentrant default character set utf8mb4 default collate utf8mb4_unicode_ci" true
# cdc parse and restore ddl with flags format.RestoreStringSingleQuotes|format.RestoreNameBackQuotes|format.RestoreKeyWordUppercase|format.RestoreTiDBSpecialComment
ddls=("create database ddl_reentrant" false 'CREATE DATABASE `ddl_reentrant`'
"create table ddl_reentrant.t1 (id int primary key, id2 int not null, a varchar(10) not null, unique a(a), unique id2(id2))" false 'CREATE TABLE `ddl_reentrant`.`t1` (`id` INT PRIMARY KEY,`id2` INT NOT NULL,`a` VARCHAR(10) NOT NULL,UNIQUE `a`(`a`),UNIQUE `id2`(`id2`))'
"alter table ddl_reentrant.t1 add column b int" false 'ALTER TABLE `ddl_reentrant`.`t1` ADD COLUMN `b` INT'
"alter table ddl_reentrant.t1 drop column b" false 'ALTER TABLE `ddl_reentrant`.`t1` DROP COLUMN `b`'
"alter table ddl_reentrant.t1 add key index_a(a)" false 'ALTER TABLE `ddl_reentrant`.`t1` ADD INDEX `index_a`(`a`)'
"alter table ddl_reentrant.t1 drop index index_a" false 'ALTER TABLE `ddl_reentrant`.`t1` DROP INDEX `index_a`'
"truncate table ddl_reentrant.t1" true 'TRUNCATE TABLE `ddl_reentrant`.`t1`'
"alter table ddl_reentrant.t1 modify a varchar(20)" true 'ALTER TABLE `ddl_reentrant`.`t1` MODIFY COLUMN `a` VARCHAR(20)'
"rename table ddl_reentrant.t1 to ddl_reentrant.t2" false 'RENAME TABLE `ddl_reentrant`.`t1` TO `ddl_reentrant`.`t2`'
"alter table ddl_reentrant.t2 alter a set default 'hello'" true 'ALTER TABLE `ddl_reentrant`.`t2` ALTER COLUMN `a` SET DEFAULT _UTF8MB4'"'hello'"
"alter table ddl_reentrant.t2 comment='modify comment'" true 'ALTER TABLE `ddl_reentrant`.`t2` COMMENT = '"'modify comment'"
"alter table ddl_reentrant.t2 rename index a to idx_a" false 'ALTER TABLE `ddl_reentrant`.`t2` RENAME INDEX `a` TO `idx_a`'
"create table ddl_reentrant.t3 (a int primary key, b int) partition by range(a) (partition p0 values less than (1000), partition p1 values less than (2000))" false 'CREATE TABLE `ddl_reentrant`.`t3` (`a` INT PRIMARY KEY,`b` INT) PARTITION BY RANGE (`a`) (PARTITION `p0` VALUES LESS THAN (1000),PARTITION `p1` VALUES LESS THAN (2000))'
"alter table ddl_reentrant.t3 add partition (partition p2 values less than (3000))" false 'ALTER TABLE `ddl_reentrant`.`t3` ADD PARTITION (PARTITION `p2` VALUES LESS THAN (3000))'
"alter table ddl_reentrant.t3 drop partition p2" false 'ALTER TABLE `ddl_reentrant`.`t3` DROP PARTITION `p2`'
"alter table ddl_reentrant.t3 truncate partition p0" true 'ALTER TABLE `ddl_reentrant`.`t3` TRUNCATE PARTITION `p0`'
"create view ddl_reentrant.t3_view as select a, b from ddl_reentrant.t3" false 'CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `ddl_reentrant`.`t3_view` AS SELECT `a`,`b` FROM `ddl_reentrant`.`t3`'
"drop view ddl_reentrant.t3_view" false 'DROP VIEW `ddl_reentrant`.`t3_view`'
"alter table ddl_reentrant.t3 default character set utf8mb4 default collate utf8mb4_unicode_ci" true 'ALTER TABLE `ddl_reentrant`.`t3` CHARACTER SET UTF8MB4 COLLATE UTF8MB4_UNICODE_CI'
"alter schema ddl_reentrant default character set utf8mb4 default collate utf8mb4_unicode_ci" true 'ALTER DATABASE `ddl_reentrant` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci'
)

function complete_ddls() {
Expand All @@ -36,14 +37,14 @@ function complete_ddls() {
echo "skip some DDLs in tidb v4.0.x"
else
# DDLs that are supportted since 5.0
ddls+=("alter table ddl_reentrant.t2 add column c1 int, add column c2 int, add column c3 int" false)
ddls+=("alter table ddl_reentrant.t2 drop column c1, drop column c2, drop column c3" false)
ddls+=("alter table ddl_reentrant.t2 add column c1 int, add column c2 int, add column c3 int" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD COLUMN `c1` INT, ADD COLUMN `c2` INT, ADD COLUMN `c3` INT')
ddls+=("alter table ddl_reentrant.t2 drop column c1, drop column c2, drop column c3" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP COLUMN `c1`, DROP COLUMN `c2`, DROP COLUMN `c3`')
fi
ddls+=("alter table ddl_reentrant.t2 drop primary key" false)
ddls+=("alter table ddl_reentrant.t2 add primary key pk(id)" false)
ddls+=("drop table ddl_reentrant.t2" false)
ddls+=("recover table ddl_reentrant.t2" false)
ddls+=("drop database ddl_reentrant" false)
ddls+=("alter table ddl_reentrant.t2 drop primary key" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP PRIMARY KEY')
ddls+=("alter table ddl_reentrant.t2 add primary key pk(id)" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD PRIMARY KEY `pk`(`id`)')
ddls+=("drop table ddl_reentrant.t2" false 'DROP TABLE `ddl_reentrant`.`t2`')
ddls+=("recover table ddl_reentrant.t2" false 'RECOVER TABLE `ddl_reentrant`.`t2`')
ddls+=("drop database ddl_reentrant" false 'DROP DATABASE `ddl_reentrant`')
}

changefeedid=""
Expand Down Expand Up @@ -94,14 +95,15 @@ tidb_build_branch=$(mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} -e \
function ddl_test() {
ddl=$1
is_reentrant=$2
restored_sql=$3

echo "------------------------------------------"
echo "test ddl $ddl, is_reentrant: $is_reentrant"
echo "test ddl $ddl, is_reentrant: $is_reentrant restored_sql: $restored_sql"

run_sql $ddl ${UP_TIDB_HOST} ${UP_TIDB_PORT}
ensure 10 check_ts_forward $changefeedid

echo $ddl >${WORK_DIR}/ddl_temp.sql
echo $restored_sql >${WORK_DIR}/ddl_temp.sql
ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" true
ddl_finished_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log | tail -n 1 | grep -oE '"CommitTs\\":[0-9]{18}' | awk -F: '{print $(NF)}')
cdc cli changefeed remove --changefeed-id=${changefeedid}
Expand Down Expand Up @@ -146,7 +148,9 @@ function run() {
idx=$((idx + 1))
idxs_reentrant=${ddls[$idx]}
idx=$((idx + 1))
ddl_test $ddl $idxs_reentrant
restored_sql=${ddls[$idx]}
idx=$((idx + 1))
ddl_test $ddl $idxs_reentrant $restored_sql
done
IFS=$OLDIFS

Expand Down

0 comments on commit 7ccaad2

Please sign in to comment.