Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

master, config(dm): fix case-sensitive compatibility for dmctl #5307

Merged
merged 4 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
log.L().Warn("using an absolute relay path, relay log can't work when starting multiple relay worker")
}

return c.AdjustCaseSensitive(ctx2, db)
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered why this change was introduced, it's because when implementing openAPI , I was referring to the v2 version of the task config 😵

}

// AdjustCaseSensitive adjust CaseSensitive from DB.
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
cfg1.From.User = user
cfg1.From.Password = password
cfg1.RelayDir = "relay-dir"
c.Assert(checkAndAdjustSourceConfigFunc(ctx, cfg1), IsNil) // adjust source config.
c.Assert(checkAndAdjustSourceConfigForDMCtlFunc(ctx, cfg1), IsNil) // adjust source config.
cfg2 := cfg1.Clone()
cfg2.SourceID = "mysql-replica-02"

Expand Down
28 changes: 25 additions & 3 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package master

import (
"context"
"database/sql"
"encoding/binary"
"fmt"
"math/rand"
Expand Down Expand Up @@ -86,7 +87,11 @@ var (
registerOnce sync.Once
runBackgroundOnce sync.Once

checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
// the difference of below functions is checkAndAdjustSourceConfigForDMCtlFunc will not AdjustCaseSensitive. It's a
// compatibility compromise.
// When we need to change the implementation of dmctl to OpenAPI, we should notice the user about this change.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this upgrade case is so complex, we may add breaking change in doc and release note

see here

/cc @sunzhaoyang @WizardXiao

checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfigForDMCtl
)

// Server handles RPC requests for dm-master.
Expand Down Expand Up @@ -1311,15 +1316,19 @@ func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*conf
if err != nil {
return cfgs, err
}
if err := checkAndAdjustSourceConfigFunc(ctx, cfg); err != nil {
if err := checkAndAdjustSourceConfigForDMCtlFunc(ctx, cfg); err != nil {
return cfgs, err
}
cfgs[i] = cfg
}
return cfgs, nil
}

func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) error {
func innerCheckAndAdjustSourceConfig(
ctx context.Context,
cfg *config.SourceConfig,
hook func(sourceConfig *config.SourceConfig, ctx context.Context, db *sql.DB) error,
) error {
dbConfig := cfg.GenerateDBConfig()
fromDB, err := conn.DefaultDBProvider.Apply(dbConfig)
if err != nil {
Expand All @@ -1329,12 +1338,25 @@ func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) e
if err = cfg.Adjust(ctx, fromDB.DB); err != nil {
return err
}
if hook != nil {
if err = hook(cfg, ctx, fromDB.DB); err != nil {
return err
}
}
if _, err = cfg.Yaml(); err != nil {
return err
}
return cfg.Verify()
}

func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) error {
return innerCheckAndAdjustSourceConfig(ctx, cfg, (*config.SourceConfig).AdjustCaseSensitive)
}

func checkAndAdjustSourceConfigForDMCtl(ctx context.Context, cfg *config.SourceConfig) error {
return innerCheckAndAdjustSourceConfig(ctx, cfg, nil)
}

func parseSourceConfig(contents []string) ([]*config.SourceConfig, error) {
cfgs := make([]*config.SourceConfig, len(contents))
for i, content := range contents {
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ func (t *testMaster) SetUpSuite(c *check.C) {
t.workerClients = make(map[string]workerrpc.Client)
t.saveMaxRetryNum = maxRetryNum
maxRetryNum = 2
checkAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
checkAndAdjustSourceConfigForDMCtlFunc = checkAndNoAdjustSourceConfigMock
}

func (t *testMaster) TearDownSuite(c *check.C) {
maxRetryNum = t.saveMaxRetryNum
checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfig
}

func (t *testMaster) SetUpTest(c *check.C) {
Expand Down
3 changes: 2 additions & 1 deletion dm/pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,13 @@ func FetchLowerCaseTableNamesSetting(ctx context.Context, conn *sql.Conn) (Lower
return LowerCaseTableNamesFlavor(res), nil
}

// GetDBCaseSensitive returns the case sensitive setting of target db.
// GetDBCaseSensitive returns the case-sensitive setting of target db.
func GetDBCaseSensitive(ctx context.Context, db *sql.DB) (bool, error) {
conn, err := db.Conn(ctx)
if err != nil {
return true, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
defer conn.Close()
lcFlavor, err := FetchLowerCaseTableNamesSetting(ctx, conn)
if err != nil {
return true, err
Expand Down
7 changes: 3 additions & 4 deletions dm/tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,13 @@ function test_query_timeout() {
# don't know why CI has turned on Event Scheduler
run_sql_both_source 'SET GLOBAL event_scheduler = OFF;'

# there's only 2 rows in result, one for dm-worker's source-level status, one for SHOW PROCESSLIST
# there's only 1 row in result, which is for SHOW PROCESSLIST
run_sql_source1 'SHOW PROCESSLIST;'
check_rows_equal 2
check_rows_equal 1

run_sql_source2 'SHOW PROCESSLIST;'
check_rows_equal 2
check_rows_equal 1

# there's only 1 row in result, which is for SHOW PROCESSLIST
run_sql_tidb 'SHOW PROCESSLIST;'
check_rows_equal 1

Expand Down
1 change: 1 addition & 0 deletions dm/tests/case_sensitive/data/db2.prepare.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ create table Upper_Table (
PRIMARY KEY (id));
insert into Upper_Table (name, ts) values ('Arya', now()), ('Bran', '2021-05-11 10:01:05'), ('Sansa', NULL);

-- if case-insensitive, this should report conflict with Upper_Table
create table upper_table(id int NOT NULL PRIMARY KEY);

-- test block-allow-list
Expand Down
51 changes: 42 additions & 9 deletions dm/tests/case_sensitive/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,32 @@ source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME
API_VERSION="v1alpha1"

function run() {
function prepare_sensitive_task() {
cp $cur/data/db1.prepare.sql $WORK_DIR/db1.prepare.sql
cp $cur/data/db2.prepare.sql $WORK_DIR/db2.prepare.sql
cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
}

function prepare_insensitive_task() {
cp $cur/data/db1.prepare.sql $WORK_DIR/db1.prepare.sql
cp $cur/data/db2.prepare.sql $WORK_DIR/db2.prepare.sql
cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml

sed -i "/sensitive/d" $WORK_DIR/dm-task.yaml
sed -i "/create table upper_table/d" $WORK_DIR/db2.prepare.sql
}

function run_with_prepared() {
run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'"
inject_points=(
"github.com/pingcap/tiflow/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
"github.com/pingcap/tiflow/dm/relay/NewUpstreamServer=return(true)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'
# manually create the route table
run_sql 'CREATE DATABASE IF NOT EXISTS `UPPER_DB_ROUTE`' $TIDB_PORT $TIDB_PASSWORD
Expand All @@ -40,7 +55,6 @@ function run() {
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# start DM task only
cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta"
# check task has started
check_metric $WORKER1_PORT "dm_worker_task_state{source_id=\"mysql-replica-01\",task=\"test\",worker=\"worker1\"}" 10 1 3
Expand Down Expand Up @@ -82,15 +96,13 @@ function run() {

# test block-allow-list by the way
run_sql "show databases;" $TIDB_PORT $TIDB_PASSWORD
check_not_contains "Upper_Db_IGNORE"
check_contains "Upper_DB1"
check_contains "lower_db"
# test route-rule
check_contains "UPPER_DB_ROUTE"

run_sql "show tables from UPPER_DB_ROUTE" $TIDB_PORT $TIDB_PASSWORD
check_contains "do_table_route"
check_not_contains "Do_table_ignore"
run_sql_tidb_with_retry "select count(*) from UPPER_DB_ROUTE.do_table_route" "count(*): 5"

# test binlog event filter
Expand All @@ -101,16 +113,37 @@ function run() {
# ensure the truncate is ignored and the new row is inserted
run_sql_tidb_with_retry "select count(*) from UPPER_DB_ROUTE.do_table_route" "count(*): 6"

dmctl_stop_task test
dmctl_operate_source stop $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source stop $WORK_DIR/source2.yaml $SOURCE_ID2

export GO_FAILPOINTS=''
}

function check_ignore_when_sensitive() {
run_sql "show databases;" $TIDB_PORT $TIDB_PASSWORD
check_not_contains "Upper_Db_IGNORE"
run_sql "show tables from UPPER_DB_ROUTE" $TIDB_PORT $TIDB_PASSWORD
check_not_contains "Do_table_ignore"
}

trap cleanup_process EXIT
trap "cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE sync_diff_inspector" EXIT
trap "cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE Upper_Db_IGNORE sync_diff_inspector" EXIT

# also cleanup dm processes in case of last run failed
cleanup_process $*
cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE
run
cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE Upper_Db_IGNORE

prepare_sensitive_task
run_with_prepared
check_ignore_when_sensitive

cleanup_process $*
cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE Upper_Db_IGNORE

prepare_insensitive_task
run_with_prepared

cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"