Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#7683
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Nov 22, 2022
1 parent ed9ffce commit 49326fd
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
25 changes: 25 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ LOOP:
return errors.Trace(err)
}

// we must clean cached ddl and tables in changefeed initialization
// otherwise, the changefeed will loss tables that are needed to be replicated
// ref: https://github.com/pingcap/tiflow/issues/7682
c.ddlEventCache = nil
c.currentTables = nil

cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel

Expand Down Expand Up @@ -343,6 +349,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.cancel = func() {}
c.ddlPuller.Close()
c.schema = nil
<<<<<<< HEAD
c.cleanupRedoManager(ctx)
c.cleanupServiceGCSafePoints(ctx)
canceledCtx, cancel := context.WithCancel(context.Background())
Expand All @@ -356,6 +363,24 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {

changefeedCheckpointTsGauge.DeleteLabelValues(c.id)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id)
=======
c.barriers = nil
c.initialized = false
c.isReleased = true

log.Info("changefeed closed",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Any("status", c.state.Status),
zap.Stringer("info", c.state.Info),
zap.Bool("isRemoved", c.isRemoved))
}

func (c *changefeed) cleanupMetrics() {
changefeedCheckpointTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedCheckpointLagDuration.DeleteLabelValues(c.id.Namespace, c.id.ID)
>>>>>>> 719d27004b (changefeed (ticdc): fix data lost when pause and resume changefeed while executing DDL. (#7683))
c.metricsChangefeedCheckpointTsGauge = nil
c.metricsChangefeedCheckpointTsLagGauge = nil

Expand Down
74 changes: 74 additions & 0 deletions tests/integration_tests/batch_update_to_no_batch/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

# This integration test is used to test the following scenario:
# 1. cdc works well in batch mode
# 2. cdc works well in no-batch mode
# 3. cdc can switch from batch mode to no-batch mode and vice versa and works well
function run() {
# batch mode only supports mysql sink
if [ "$SINK_TYPE" == "kafka" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_sql "set global tidb_enable_change_multi_schema = on" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# This must be set before cdc server starts
run_sql "set global tidb_enable_change_multi_schema = on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
# TiDB global variables cache 2 seconds at most
sleep 2

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

# this test contains `recover table`, which requires super privilege, so we
# can't use the normal user
SINK_URI="mysql://root@127.0.0.1:3306/?batch-dml-enable=true"

changefeed_id="test"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c ${changefeed_id}

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# pause changefeed
run_cdc_cli changefeed pause -c ${changefeed_id}
# update changefeed to no batch dml mode
run_cdc_cli changefeed update -c ${changefeed_id} --sink-uri="mysql://root@127.0.0.1:3306/?batch-dml-enable=false" --no-confirm
# resume changefeed
run_cdc_cli changefeed resume -c ${changefeed_id}

run_sql_file $CUR/data/test_v5.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# pause changefeed
run_cdc_cli changefeed pause -c ${changefeed_id}
# update changefeed to no batch dml mode
run_cdc_cli changefeed update -c ${changefeed_id} --sink-uri="mysql://root@127.0.0.1:3306/?batch-dml-enable=true" --no-confirm
# resume changefeed
run_cdc_cli changefeed resume -c ${changefeed_id}

run_sql_file $CUR/data/test_finish.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists batch_update_to_no_batch.v1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists batch_update_to_no_batch.recover_and_insert ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists batch_update_to_no_batch.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 49326fd

Please sign in to comment.