Skip to content

Commit

Permalink
schema(cdc): fix DML construct error caused by 'rename tables' DDL (p…
Browse files Browse the repository at this point in the history
…ingcap#5068) (pingcap#5080)

* fix(schema): fix DML construct error caused by 'rename tables' DDL

close  pingcap#5059

Co-authored-by: maxshuang <huangguohao@pingcap.com>
Co-authored-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
3 people authored Mar 31, 2022
1 parent 0c38e62 commit 2f363f6
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 7 deletions.
14 changes: 10 additions & 4 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,10 @@ func (s *schemaSnapshot) handleDDL(job *timodel.Job) error {
return errors.Trace(err)
}
case timodel.ActionRenameTables:
return s.renameTables(job)
err := s.renameTables(job)
if err != nil {
return errors.Trace(err)
}
case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable:
err := s.createTable(getWrapTableInfo(job))
if err != nil {
Expand Down Expand Up @@ -827,7 +830,8 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
lastSnap := s.snaps[len(s.snaps)-1]
if job.BinlogInfo.FinishedTS <= lastSnap.currentTs {
log.Info("ignore foregone DDL", zap.Int64("jobID", job.ID),
zap.String("DDL", job.Query), zap.String("changefeed", s.id))
zap.String("DDL", job.Query), zap.String("changefeed", s.id),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS))
return nil
}
snap = lastSnap.Clone()
Expand All @@ -837,11 +841,13 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
if err := snap.handleDDL(job); err != nil {
log.Error("handle DDL failed", zap.String("DDL", job.Query),
zap.Stringer("job", job), zap.Error(err),
zap.String("changefeed", s.id))
zap.String("changefeed", s.id), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS))
return errors.Trace(err)
}
log.Info("handle DDL", zap.String("DDL", job.Query),
zap.Stringer("job", job), zap.String("changefeed", s.id))
zap.Stringer("job", job), zap.String("changefeed", s.id),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS))

s.snaps = append(s.snaps, snap)
s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS)
return nil
Expand Down
9 changes: 6 additions & 3 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,11 @@ func TestHandleRenameTables(t *testing.T) {
rawArgs, err := json.Marshal(args)
require.Nil(t, err)
var job *timodel.Job = &timodel.Job{
Type: timodel.ActionRenameTables,
RawArgs: rawArgs,
BinlogInfo: &timodel.HistoryInfo{},
Type: timodel.ActionRenameTables,
RawArgs: rawArgs,
BinlogInfo: &timodel.HistoryInfo{
FinishedTS: 11112222,
},
}
job.BinlogInfo.MultipleTableInfos = append(job.BinlogInfo.MultipleTableInfos,
&timodel.TableInfo{
Expand Down Expand Up @@ -442,6 +444,7 @@ func TestHandleRenameTables(t *testing.T) {
t2 := model.TableName{Schema: "db_1", Table: "y"}
require.Equal(t, snap.tableNameToID[t1], int64(13))
require.Equal(t, snap.tableNameToID[t2], int64(14))
require.Equal(t, uint64(11112222), snap.currentTs)
}

func testDoDDLAndCheck(t *testing.T, snap *schemaSnapshot, job *timodel.Job, isErr bool) {
Expand Down
3 changes: 3 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,9 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
p.sendError(err)
return nil
})

// FIXME: using GetLastSnapshot here would be confused and get the wrong table name
// after `rename table` DDL, since `rename table` keeps the tableID unchanged
var tableName *model.TableName
retry.Do(ctx, func() error { //nolint:errcheck
if name, ok := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID); ok {
Expand Down
29 changes: 29 additions & 0 deletions tests/integration_tests/rename_tables/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/rename_tables/sync_diff/output"

source-instances = ["tidb0"]

target-instance = "mysql1"

target-check-tables = ["rename_tables_test.?*"]

[data-sources]
[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
53 changes: 53 additions & 0 deletions tests/integration_tests/rename_tables/data/test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
drop database if exists `rename_tables_test`;
create database `rename_tables_test`;
use `rename_tables_test`;

create table t1 (
value64 bigint unsigned not null,
primary key(value64)
);
insert into t1 values(17156792991891826145);
insert into t1 values(91891826145);
delete from t1 where value64=17156792991891826145;
update t1 set value64=17156792991891826;
update t1 set value64=56792991891826;

rename table t1 to t1_1;

create table t2 (
value64 bigint unsigned not null,
primary key(value64)
);
insert into t2 values(17156792991891826145);
insert into t2 values(91891826145);
delete from t2 where value64=91891826145;
update t2 set value64=17156792991891826;
update t2 set value64=56792991891826;

rename table t2 to t2_2;

create table t1 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

create table t2 (
value64 bigint unsigned not null,
value32 integer not null,
primary key(value64, value32)
);

insert into t1 values(17156792991891826145, 1);
insert into t1 values( 9223372036854775807, 2);
insert into t2 values(17156792991891826145, 3);
insert into t2 values( 9223372036854775807, 4);

rename table t1 to t1_7, t2 to t2_7;

insert into t1_7 values(91891826145, 5);
insert into t1_7 values(685477580, 6);
insert into t2_7 values(1715679991826145, 7);
insert into t2_7 values(2036854775807, 8);

create table finish_mark(id int primary key);
45 changes: 45 additions & 0 deletions tests/integration_tests/rename_tables/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

TOPIC_NAME="ticdc-rename-tables-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first

check_table_exists rename_tables_test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
echo "check table exists success"
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 60

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 2f363f6

Please sign in to comment.