From 49326fd90469bce1e226474e3dd6f4fbbe2a815e Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Tue, 22 Nov 2022 22:15:58 +0800 Subject: [PATCH] This is an automated cherry-pick of #7683 Signed-off-by: ti-chi-bot --- cdc/owner/changefeed.go | 25 +++++++ .../batch_update_to_no_batch/run.sh | 74 +++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 tests/integration_tests/batch_update_to_no_batch/run.sh diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 6cb7a7d651b..7597766b036 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -291,6 +291,12 @@ LOOP: return errors.Trace(err) } + // we must clean cached ddl and tables in changefeed initialization + // otherwise, the changefeed will loss tables that are needed to be replicated + // ref: https://github.com/pingcap/tiflow/issues/7682 + c.ddlEventCache = nil + c.currentTables = nil + cancelCtx, cancel := cdcContext.WithCancel(ctx) c.cancel = cancel @@ -343,6 +349,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { c.cancel = func() {} c.ddlPuller.Close() c.schema = nil +<<<<<<< HEAD c.cleanupRedoManager(ctx) c.cleanupServiceGCSafePoints(ctx) canceledCtx, cancel := context.WithCancel(context.Background()) @@ -356,6 +363,24 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { changefeedCheckpointTsGauge.DeleteLabelValues(c.id) changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id) +======= + c.barriers = nil + c.initialized = false + c.isReleased = true + + log.Info("changefeed closed", + zap.String("namespace", c.id.Namespace), + zap.String("changefeed", c.id.ID), + zap.Any("status", c.state.Status), + zap.Stringer("info", c.state.Info), + zap.Bool("isRemoved", c.isRemoved)) +} + +func (c *changefeed) cleanupMetrics() { + changefeedCheckpointTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) + changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) + changefeedCheckpointLagDuration.DeleteLabelValues(c.id.Namespace, c.id.ID) +>>>>>>> 719d27004b (changefeed (ticdc): fix data lost when pause and resume changefeed while executing DDL. (#7683)) c.metricsChangefeedCheckpointTsGauge = nil c.metricsChangefeedCheckpointTsLagGauge = nil diff --git a/tests/integration_tests/batch_update_to_no_batch/run.sh b/tests/integration_tests/batch_update_to_no_batch/run.sh new file mode 100644 index 00000000000..ee5865267fa --- /dev/null +++ b/tests/integration_tests/batch_update_to_no_batch/run.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +# This integration test is used to test the following scenario: +# 1. cdc works well in batch mode +# 2. cdc works well in no-batch mode +# 3. cdc can switch from batch mode to no-batch mode and vice versa and works well +function run() { + # batch mode only supports mysql sink + if [ "$SINK_TYPE" == "kafka" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_sql "set global tidb_enable_change_multi_schema = on" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # This must be set before cdc server starts + run_sql "set global tidb_enable_change_multi_schema = on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + # TiDB global variables cache 2 seconds at most + sleep 2 + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + # this test contains `recover table`, which requires super privilege, so we + # can't use the normal user + SINK_URI="mysql://root@127.0.0.1:3306/?batch-dml-enable=true" + + changefeed_id="test" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c ${changefeed_id} + + run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # pause changefeed + run_cdc_cli changefeed pause -c ${changefeed_id} + # update changefeed to no batch dml mode + run_cdc_cli changefeed update -c ${changefeed_id} --sink-uri="mysql://root@127.0.0.1:3306/?batch-dml-enable=false" --no-confirm + # resume changefeed + run_cdc_cli changefeed resume -c ${changefeed_id} + + run_sql_file $CUR/data/test_v5.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # pause changefeed + run_cdc_cli changefeed pause -c ${changefeed_id} + # update changefeed to no batch dml mode + run_cdc_cli changefeed update -c ${changefeed_id} --sink-uri="mysql://root@127.0.0.1:3306/?batch-dml-enable=true" --no-confirm + # resume changefeed + run_cdc_cli changefeed resume -c ${changefeed_id} + + run_sql_file $CUR/data/test_finish.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists batch_update_to_no_batch.v1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists batch_update_to_no_batch.recover_and_insert ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists batch_update_to_no_batch.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"