diff --git a/br/tests/br_log_restore/run.sh b/br/tests/br_log_restore/run.sh deleted file mode 100755 index 3753e58267816..0000000000000 --- a/br/tests/br_log_restore/run.sh +++ /dev/null @@ -1,181 +0,0 @@ -#!/bin/bash -# -# Copyright 2020 PingCAP, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -eux -DB="$TEST_NAME" -TABLE="usertable" -DB_COUNT=3 -BUCKET="cdcs3" -CDC_COUNT=3 - -# start the s3 server -export MINIO_ACCESS_KEY=brs3accesskey -export MINIO_SECRET_KEY=brs3secretkey -export MINIO_BROWSER=off -export AWS_ACCESS_KEY_ID=$MINIO_ACCESS_KEY -export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY -export S3_ENDPOINT=127.0.0.1:24928 -rm -rf "$TEST_DIR/$DB" -mkdir -p "$TEST_DIR/$DB" -bin/minio server --address $S3_ENDPOINT "$TEST_DIR/$DB" & -i=0 -while ! curl -o /dev/null -s "http://$S3_ENDPOINT/"; do - i=$(($i+1)) - if [ $i -gt 30 ]; then - echo 'Failed to start minio' - exit 1 - fi - sleep 2 -done - -bin/mc config --config-dir "$TEST_DIR/$TEST_NAME" \ - host add minio http://$S3_ENDPOINT $MINIO_ACCESS_KEY $MINIO_SECRET_KEY -bin/mc mb --config-dir "$TEST_DIR/$TEST_NAME" minio/$BUCKET - -# Start cdc servers -run_cdc server --pd=https://$PD_ADDR --log-file=ticdc.log --addr=0.0.0.0:18301 --advertise-addr=127.0.0.1:18301 & -trap 'cat ticdc.log' ERR - -# TODO: remove this after TiCDC supports TiDB clustered index -run_sql "set @@global.tidb_enable_clustered_index=0" -# TiDB global variables cache 2 seconds -sleep 2 - -# create change feed for s3 log -run_cdc cli changefeed create --pd=https://$PD_ADDR --sink-uri="s3://$BUCKET/$DB?endpoint=http://$S3_ENDPOINT" --changefeed-id="simple-replication-task" - -start_ts=$(run_sql "show master status;" | grep Position | awk -F ':' '{print $2}' | xargs) - -# Fill in the database -for i in $(seq $DB_COUNT); do - run_sql "CREATE DATABASE $DB${i};" - go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB${i} -done - -for i in $(seq $DB_COUNT); do - row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') -done - -# test drop & create schema/table, finally only db2 has one row -run_sql "create schema ${DB}_DDL1;" -run_sql "create table ${DB}_DDL1.t1 (a int primary key, b varchar(10));" -run_sql "insert into ${DB}_DDL1.t1 values (1, 'x');" - -run_sql "drop schema ${DB}_DDL1;" -run_sql "create schema ${DB}_DDL1;" -run_sql "create schema ${DB}_DDL2;" - -run_sql "create table ${DB}_DDL2.t2 (a int primary key, b varchar(10));" -run_sql "insert into ${DB}_DDl2.t2 values (2, 'x');" - -run_sql "drop table ${DB}_DDL2.t2;" -run_sql "create table ${DB}_DDL2.t2 (a int primary key, b varchar(10));" -run_sql "insert into ${DB}_DDL2.t2 values (3, 'x');" -run_sql "delete from ${DB}_DDL2.t2 where a = 3;" -run_sql "insert into ${DB}_DDL2.t2 values (4, 'x');" - -end_ts=$(run_sql "show master status;" | grep Position | awk -F ':' '{print $2}' | xargs) - - -# if we restore with ts range [start_ts, end_ts], then the below record won't be restored. -run_sql "insert into ${DB}_DDL2.t2 values (5, 'x');" - -wait_time=0 -checkpoint_ts=$(run_cdc cli changefeed query -c simple-replication-task --pd=https://$PD_ADDR | jq '.status."checkpoint-ts"') -while [ "$checkpoint_ts" -lt "$end_ts" ]; do - echo "waiting for cdclog syncing... (checkpoint_ts = $checkpoint_ts; end_ts = $end_ts)" - if [ "$wait_time" -gt 300 ]; then - echo "cdc failed to sync after 300s, please check the CDC log." - exit 1 - fi - sleep 5 - wait_time=$(( wait_time + 5 )) - checkpoint_ts=$(run_cdc cli changefeed query -c simple-replication-task --pd=https://$PD_ADDR | jq '.status."checkpoint-ts"') -done - -# remove the change feed, because we don't want to record the drop ddl. -echo "Y" | run_cdc cli unsafe reset --pd=https://$PD_ADDR - -for i in $(seq $DB_COUNT); do - run_sql "DROP DATABASE $DB${i};" -done -run_sql "DROP DATABASE ${DB}_DDL1" -run_sql "DROP DATABASE ${DB}_DDL2" - -# restore full -export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=return("notleader")' -echo "restore start..." -run_br restore cdclog -s "s3://$BUCKET/$DB" --pd $PD_ADDR --s3.endpoint="http://$S3_ENDPOINT" \ - --log-file "restore.log" --log-level "info" --start-ts $start_ts --end-ts $end_ts - -for i in $(seq $DB_COUNT); do - row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') -done - -fail=false -row_count=$(run_sql "SELECT COUNT(*) FROM ${DB}_DDL2.t2 WHERE a=4;" | awk '/COUNT/{print $2}') -if [ "$row_count" -ne "1" ]; then - fail=true - echo "TEST: [$TEST_NAME] fail on dml&ddl drop test." -fi - - -# record a=5 shouldn't be restore, because we set -end-ts without this record. -row_count=$(run_sql "SELECT COUNT(*) FROM ${DB}_DDL2.t2 WHERE a=5;" | awk '/COUNT/{print $2}') -if [ "$row_count" -ne "0" ]; then - fail=true - echo "TEST: [$TEST_NAME] fail on ts range test." -fi - -export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=return("epochnotmatch")' -echo "restore again to restore a=5 record..." -run_br restore cdclog -s "s3://$BUCKET/$DB" --pd $PD_ADDR --s3.endpoint="http://$S3_ENDPOINT" \ - --log-file "restore.log" --log-level "info" --start-ts $end_ts - -# record a=5 should be restore, because we set -end-ts without this record. -row_count=$(run_sql "SELECT COUNT(*) FROM ${DB}_DDL2.t2 WHERE a=5;" | awk '/COUNT/{print $2}') -if [ "$row_count" -ne "1" ]; then - fail=true - echo "TEST: [$TEST_NAME] fail on recover ts range test." -fi - -# record a=3 should be deleted -row_count=$(run_sql "SELECT COUNT(*) FROM ${DB}_DDL2.t2 WHERE a=3;" | awk '/COUNT/{print $2}') -if [ "$row_count" -ne "0" ]; then - fail=true - echo "TEST: [$TEST_NAME] fail on key not deleted." -fi - - -for i in $(seq $DB_COUNT); do - if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then - fail=true - echo "TEST: [$TEST_NAME] fail on database $DB${i}" - fi - echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}" -done - -if $fail; then - echo "TEST: [$TEST_NAME] failed!" - exit 1 -fi - -for i in $(seq $DB_COUNT); do - run_sql "DROP DATABASE $DB${i};" -done - -run_sql "DROP DATABASE ${DB}_DDL1" -run_sql "DROP DATABASE ${DB}_DDL2" diff --git a/br/tests/br_log_restore/workload b/br/tests/br_log_restore/workload deleted file mode 100644 index 664fe7ee88228..0000000000000 --- a/br/tests/br_log_restore/workload +++ /dev/null @@ -1,12 +0,0 @@ -recordcount=1000 -operationcount=0 -workload=core - -readallfields=true - -readproportion=0 -updateproportion=0 -scanproportion=0 -insertproportion=0 - -requestdistribution=uniform diff --git a/br/tests/br_other/run.sh b/br/tests/br_other/run.sh index 313f2c5e273c0..79ffb9d2732e8 100644 --- a/br/tests/br_other/run.sh +++ b/br/tests/br_other/run.sh @@ -95,6 +95,8 @@ run_curl https://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-siz run_curl https://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-keys"' | grep -E "^0$" backup_fail=0 +# generate 1.sst to make another backup failed. +touch "$TEST_DIR/$DB/lock/1.sst" echo "another backup start expect to fail due to last backup add a lockfile" run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --concurrency 4 || backup_fail=1 if [ "$backup_fail" -ne "1" ];then diff --git a/ddl/ddl.go b/ddl/ddl.go index 1835eb9bfb28e..ebec94019105b 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -545,6 +545,10 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { d.limitJobCh <- task // worker should restart to continue handling tasks in limitJobCh, and send back through task.err err := <-task.err + if err != nil { + // The transaction of enqueuing job is failed. + return errors.Trace(err) + } ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 6cc3a77461c56..617c42c639d6c 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -301,6 +301,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { return errors.Trace(err) } } + failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr")) + } + }) return nil }) var jobs string @@ -310,7 +315,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, task.job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) } - logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs)) + if err != nil { + logutil.BgLogger().Warn("[ddl] add DDL jobs failed", zap.String("jobs", jobs), zap.Error(err)) + } else { + logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs)) + } } // getHistoryDDLJob gets a DDL job with job's ID from history queue. diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index d42c6f888a689..5321d12671b40 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -494,6 +494,32 @@ func (s *testDDLSuite) TestColumnError(c *C) { doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionDropColumns, []interface{}{[]model.CIStr{model.NewCIStr("c5"), model.NewCIStr("c6")}, make([]bool, 2)}, ctx, d) } +func (s *testDDLSerialSuite) TestAddBatchJobError(c *C) { + store := testCreateStore(c, "test_add_batch_job_error") + defer func() { + err := store.Close() + c.Assert(err, IsNil) + }() + d, err := testNewDDLAndStart( + context.Background(), + WithStore(store), + WithLease(testLease), + ) + c.Assert(err, IsNil) + defer func() { + err := d.Stop() + c.Assert(err, IsNil) + }() + ctx := testNewContext(d) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr", `return(true)`), IsNil) + // Test the job runner should not hang forever. + job := &model.Job{SchemaID: 1, TableID: 1} + err = d.doDDLJob(ctx, job) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "mockAddBatchDDLJobsErr") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr"), IsNil) +} + func testCheckOwner(c *C, d *ddl, expectedVal bool) { c.Assert(d.isOwner(), Equals, expectedVal) }