Skip to content

Commit

Permalink
binlog: use node.Restore() to add TiDB-specified feature comments (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 authored Jan 3, 2022
1 parent dba42c9 commit 69e179e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 151 deletions.
103 changes: 32 additions & 71 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package binloginfo

import (
"math"
"regexp"
"strings"
"sync"
"sync/atomic"
Expand All @@ -28,9 +26,10 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
Expand All @@ -45,8 +44,6 @@ func init() {
// shared by all sessions.
var pumpsClient *pumpcli.PumpsClient
var pumpsClientLock sync.RWMutex
var shardPat = regexp.MustCompile(`(?P<REPLACE>SHARD_ROW_ID_BITS\s*=\s*\d+\s*)`)
var preSplitPat = regexp.MustCompile(`(?P<REPLACE>PRE_SPLIT_REGIONS\s*=\s*\d+\s*)`)

// BinlogInfo contains binlog data and binlog client.
type BinlogInfo struct {
Expand Down Expand Up @@ -283,7 +280,11 @@ func SetDDLBinlog(client *pumpcli.PumpsClient, txn kv.Transaction, jobID int64,
return
}

ddlQuery = AddSpecialComment(ddlQuery)
if commented, err := FormatAndAddTiDBSpecificComment(ddlQuery); err == nil {
ddlQuery = commented
} else {
logutil.BgLogger().Warn("Unable to add TiDB-specified comment for DDL query.", zap.String("DDL Query", ddlQuery), zap.Error(err))
}
info := &BinlogInfo{
Data: &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
Expand All @@ -296,71 +297,6 @@ func SetDDLBinlog(client *pumpcli.PumpsClient, txn kv.Transaction, jobID int64,
txn.SetOption(kv.BinlogInfo, info)
}

const specialPrefix = `/*T! `

// AddSpecialComment uses to add comment for table option in DDL query.
// Used by pingcap/ticdc.
func AddSpecialComment(ddlQuery string) string {
if strings.Contains(ddlQuery, specialPrefix) || strings.Contains(ddlQuery, driver.SpecialCommentVersionPrefix) {
return ddlQuery
}
ddlQuery = addSpecialCommentByRegexps(ddlQuery, specialPrefix, shardPat, preSplitPat)
for featureID, pattern := range driver.FeatureIDPatterns {
ddlQuery = addSpecialCommentByRegexps(ddlQuery, driver.BuildSpecialCommentPrefix(featureID), pattern)
}
return ddlQuery
}

// addSpecialCommentByRegexps uses to add special comment for the worlds in the ddlQuery with match the regexps.
// addSpecialCommentByRegexps will merge multi pattern regs to one special comment.
func addSpecialCommentByRegexps(ddlQuery string, prefix string, regs ...*regexp.Regexp) string {
upperQuery := strings.ToUpper(ddlQuery)
var specialComments []string
minIdx := math.MaxInt64
for i := 0; i < len(regs); {
reg := regs[i]
locs := reg.FindStringSubmatchIndex(upperQuery)
ns := reg.SubexpNames()
var loc []int
if len(locs) > 0 {
for i, n := range ns {
if n == "REPLACE" {
loc = locs[i*2 : (i+1)*2]
break
}
}
}
if len(loc) < 2 {
i++
continue
}
specialComments = append(specialComments, ddlQuery[loc[0]:loc[1]])
if loc[0] < minIdx {
minIdx = loc[0]
}
ddlQuery = ddlQuery[:loc[0]] + ddlQuery[loc[1]:]
upperQuery = upperQuery[:loc[0]] + upperQuery[loc[1]:]
}
if minIdx != math.MaxInt64 {
query := ddlQuery[:minIdx] + prefix
for _, comment := range specialComments {
if query[len(query)-1] != ' ' {
query += " "
}
query += comment
}
if query[len(query)-1] != ' ' {
query += " "
}
query += "*/"
if len(ddlQuery[minIdx:]) > 0 {
return query + " " + ddlQuery[minIdx:]
}
return query
}
return ddlQuery
}

// MockPumpsClient creates a PumpsClient, used for test.
func MockPumpsClient(client binlog.PumpClient) *pumpcli.PumpsClient {
nodeID := "pump-1"
Expand Down Expand Up @@ -390,3 +326,28 @@ func MockPumpsClient(client binlog.PumpClient) *pumpcli.PumpsClient {

return pCli
}

// FormatAndAddTiDBSpecificComment translate tidb feature syntax to tidb-specified comment.
// ddlQuery can be multiple-statements separated by ';' and the statement can be empty.
func FormatAndAddTiDBSpecificComment(ddlQuery string) (string, error) {
stmts, _, err := parser.New().ParseSQL(ddlQuery)
if err != nil {
return "", errors.Trace(err)
}
var sb strings.Builder
// translate TiDB feature to special comment
restoreFlags := format.RestoreTiDBSpecialComment
// escape the keyword
restoreFlags |= format.RestoreNameBackQuotes
// upper case keyword
restoreFlags |= format.RestoreKeyWordUppercase
// wrap string with single quote
restoreFlags |= format.RestoreStringSingleQuotes
for _, stmt := range stmts {
if err = stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil {
return "", errors.Trace(err)
}
sb.WriteString(";")
}
return sb.String(), nil
}
91 changes: 55 additions & 36 deletions sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ func getLatestDDLBinlog(t *testing.T, pump *mockBinlogPump, ddlQuery string) (pr
require.Greater(t, preDDL.DdlJobId, int64(0))
require.Greater(t, preDDL.StartTs, int64(0))
require.Equal(t, int64(0), preDDL.CommitTs)
require.Equal(t, ddlQuery, string(preDDL.DdlQuery))
formatted, err := binloginfo.FormatAndAddTiDBSpecificComment(ddlQuery)
require.NoError(t, err, "ddlQuery: %s", ddlQuery)
require.Equal(t, formatted, string(preDDL.DdlQuery))
return
}

Expand Down Expand Up @@ -549,144 +551,161 @@ func TestDeleteSchema(t *testing.T) {
tk.MustExec("delete b1 from b2 right join b1 on b1.job_id = b2.job_id and batch_class = 'TEST';")
}

func TestAddSpecialComment(t *testing.T) {
func TestFormatAndAddTiDBSpecificComment(t *testing.T) {
testCase := []struct {
input string
result string
}{
{
"create table t1 (id int ) shard_row_id_bits=2;",
"create table t1 (id int ) /*T! shard_row_id_bits=2 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */;",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"create table t1 (id int ) /*T! shard_row_id_bits=2 pre_split_regions=2 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */;",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"create table t1 (id int ) /*T! shard_row_id_bits=2 pre_split_regions=2 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */;",
},

{
"create table t1 (id int ) shard_row_id_bits=2 engine=innodb pre_split_regions=2;",
"create table t1 (id int ) /*T! shard_row_id_bits=2 pre_split_regions=2 */ engine=innodb ;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ ENGINE = innodb /*T! PRE_SPLIT_REGIONS = 2 */;",
},
{
"create table t1 (id int ) pre_split_regions=2 shard_row_id_bits=2;",
"create table t1 (id int ) /*T! shard_row_id_bits=2 pre_split_regions=2 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T! PRE_SPLIT_REGIONS = 2 */ /*T! SHARD_ROW_ID_BITS = 2 */;",
},
{
"create table t6 (id int ) shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2;",
"create table t6 (id int ) /*T! shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2 */ ;",
"CREATE TABLE `t6` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! SHARD_ROW_ID_BITS = 3 */ /*T! PRE_SPLIT_REGIONS = 2 */;",
},
{
"create table t1 (id int primary key auto_random(2));",
"create table t1 (id int primary key /*T![auto_rand] auto_random(2) */ );",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */);",
},
{
"create table t1 (id int primary key auto_random);",
"create table t1 (id int primary key /*T![auto_rand] auto_random */ );",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM */);",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"create table t1 (id int /*T![auto_rand] auto_random ( 4 ) */ primary key);",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY);",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"create table t1 (id int /*T![auto_rand] auto_random ( 4 ) */ primary key);",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY);",
},
{
"create table t1 (id int auto_random ( 3 ) primary key) auto_random_base = 100;",
"create table t1 (id int /*T![auto_rand] auto_random ( 3 ) */ primary key) /*T![auto_rand_base] auto_random_base = 100 */ ;",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(3) */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 100 */;",
},
{
"create table t1 (id int auto_random primary key) auto_random_base = 50;",
"create table t1 (id int /*T![auto_rand] auto_random */ primary key) /*T![auto_rand_base] auto_random_base = 50 */ ;",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 50 */;",
},
{
"create table t1 (id int auto_increment key) auto_id_cache 100;",
"create table t1 (id int auto_increment key) /*T![auto_id_cache] auto_id_cache 100 */ ;",
"CREATE TABLE `t1` (`id` INT AUTO_INCREMENT PRIMARY KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 100 */;",
},
{
"create table t1 (id int auto_increment unique) auto_id_cache 10;",
"create table t1 (id int auto_increment unique) /*T![auto_id_cache] auto_id_cache 10 */ ;",
"CREATE TABLE `t1` (`id` INT AUTO_INCREMENT UNIQUE KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 10 */;",
},
{
"create table t1 (id int) auto_id_cache = 5;",
"create table t1 (id int) /*T![auto_id_cache] auto_id_cache = 5 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */;",
},
{
"create table t1 (id int) auto_id_cache=5;",
"create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */;",
},
{
"create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;",
"create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */;",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) clustered);",
"create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] clustered */ );",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] CLUSTERED */);",
},
{
"create table t1(id int, v int, primary key(a) clustered);",
"create table t1(id int, v int, primary key(a) /*T![clustered_index] clustered */ );",
"CREATE TABLE `t1` (`id` INT,`v` INT,PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */);",
},
{
"create table t1(id int primary key clustered, v int);",
"create table t1(id int primary key /*T![clustered_index] clustered */ , v int);",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![clustered_index] CLUSTERED */,`v` INT);",
},
{
"alter table t add primary key(a) clustered;",
"alter table t add primary key(a) /*T![clustered_index] clustered */ ;",
"ALTER TABLE `t` ADD PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */;",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) nonclustered);",
"create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */ );",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */);",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */);",
"create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */);",
},
{
"create table clustered_test(id int)",
"create table clustered_test(id int)",
"CREATE TABLE `clustered_test` (`id` INT);",
},
{
"create database clustered_test",
"create database clustered_test",
"CREATE DATABASE `clustered_test`;",
},
{
"create database clustered",
"create database clustered",
"CREATE DATABASE `clustered`;",
},
{
"create table clustered (id int)",
"create table clustered (id int)",
"CREATE TABLE `clustered` (`id` INT);",
},
{
"create table t1 (id int, a varchar(255) key clustered);",
"create table t1 (id int, a varchar(255) key /*T![clustered_index] clustered */ );",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */);",
},
{
"alter table t force auto_increment = 12;",
"alter table t /*T![force_inc] force */ auto_increment = 12;",
"ALTER TABLE `t` /*T![force_inc] FORCE */ AUTO_INCREMENT = 12;",
},
{
"alter table t force, auto_increment = 12;",
"alter table t force, auto_increment = 12;",
"ALTER TABLE `t` FORCE /* AlterTableForce is not supported */ , AUTO_INCREMENT = 12;",
},
{
// https://github.com/pingcap/tiflow/issues/3755
"create table cdc_test (id varchar(10) primary key ,c1 varchar(10)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */",
"CREATE TABLE `cdc_test` (`id` VARCHAR(10) PRIMARY KEY,`c1` VARCHAR(10)) ENGINE = InnoDB DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_BIN /*T! SHARD_ROW_ID_BITS = 4 */ /*T! PRE_SPLIT_REGIONS = 3 */;",
},
{
"create table clustered (id int); create table t1 (id int, a varchar(255) key clustered); ",
"CREATE TABLE `clustered` (`id` INT);CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */);",
},
{
"",
"",
},
{
";;",
"",
},
{
"alter table t cache",
"alter table t cache",
"ALTER TABLE `t` CACHE;",
},
{
"alter table t nocache",
"alter table t nocache",
"ALTER TABLE `t` NOCACHE;",
},
}
for _, ca := range testCase {
re := binloginfo.AddSpecialComment(ca.input)
re, err := binloginfo.FormatAndAddTiDBSpecificComment(ca.input)
require.Equal(t, ca.result, re)
require.NoError(t, err, "Unexpected error for AddTiDBSpecificComment, test input: %s", ca.input)
}
}

Expand Down
44 changes: 0 additions & 44 deletions types/parser_driver/special_cmt_ctrl.go

This file was deleted.

0 comments on commit 69e179e

Please sign in to comment.