From 41f1b9e80acf6686490857db4d13c381bf5e1ef4 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 29 Oct 2024 14:58:06 +0800 Subject: [PATCH] lightning: fix forget to set lastRetryableErr when ingest RPC fail (#56345) (#56913) close pingcap/tidb#55808 --- br/pkg/lightning/backend/local/BUILD.bazel | 1 + br/pkg/lightning/backend/local/local.go | 25 ++++++++++++++---- br/pkg/lightning/backend/local/local_test.go | 10 +++---- br/pkg/lightning/backend/local/region_job.go | 4 +++ .../addindextest/integration_test.go | 26 +++++++++++++++++++ 5 files changed, 56 insertions(+), 10 deletions(-) diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index af57eeb6f7003..ccc91c1163708 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -55,6 +55,7 @@ go_library( "//util/codec", "//util/engine", "//util/hack", + "//util/intest", "//util/mathutil", "//util/ranger", "@com_github_cockroachdb_pebble//:pebble", diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 5e2c6ddd72831..7b2b3bb96e980 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -56,6 +56,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/engine" + "github.com/pingcap/tidb/util/intest" "github.com/pingcap/tidb/util/mathutil" "github.com/tikv/client-go/v2/oracle" tikvclient "github.com/tikv/client-go/v2/tikv" @@ -76,9 +77,6 @@ const ( dialTimeout = 5 * time.Minute maxRetryTimes = 5 defaultRetryBackoffTime = 3 * time.Second - // maxWriteAndIngestRetryTimes is the max retry times for write and ingest. - // A large retry times is for tolerating tikv cluster failures. - maxWriteAndIngestRetryTimes = 30 gRPCKeepAliveTime = 10 * time.Minute gRPCKeepAliveTimeout = 5 * time.Minute @@ -111,6 +109,10 @@ var ( errorEngineClosed = errors.New("engine is closed") maxRetryBackoffSecond = 30 + + // MaxWriteAndIngestRetryTimes is the max retry times for write and ingest. + // A large retry times is for tolerating tikv cluster failures. + MaxWriteAndIngestRetryTimes = 30 ) // ImportClientFactory is factory to create new import client for specific store. @@ -1523,8 +1525,21 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges switch job.stage { case regionScanned, wrote: job.retryCount++ - if job.retryCount > maxWriteAndIngestRetryTimes { - firstErr.Set(job.lastRetryableErr) + if job.retryCount > MaxWriteAndIngestRetryTimes { + lastErr := job.lastRetryableErr + if lastErr == nil { + if intest.InTest { + panic("lastRetryableErr should not be nil") + } + lastErr = errors.New("retry limit exceeded") + log.FromContext(ctx).Error( + "lastRetryableErr should not be nil", + logutil.Key("startKey", job.keyRange.start), + logutil.Key("endKey", job.keyRange.end), + zap.Stringer("stage", job.stage), + zap.Error(lastErr)) + } + firstErr.Set(lastErr) workerCancel() jobWg.Done() continue diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index c54ceac27ef99..64bdf94dcd96d 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1921,7 +1921,7 @@ func TestDoImport(t *testing.T) { { keyRange: Range{start: []byte{'a'}, end: []byte{'b'}}, engine: &Engine{}, - retryCount: maxWriteAndIngestRetryTimes - 1, + retryCount: MaxWriteAndIngestRetryTimes - 1, injected: getSuccessInjectedBehaviour(), }, }, @@ -1931,7 +1931,7 @@ func TestDoImport(t *testing.T) { { keyRange: Range{start: []byte{'b'}, end: []byte{'c'}}, engine: &Engine{}, - retryCount: maxWriteAndIngestRetryTimes - 1, + retryCount: MaxWriteAndIngestRetryTimes - 1, injected: getSuccessInjectedBehaviour(), }, }, @@ -1941,7 +1941,7 @@ func TestDoImport(t *testing.T) { { keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, engine: &Engine{}, - retryCount: maxWriteAndIngestRetryTimes - 2, + retryCount: MaxWriteAndIngestRetryTimes - 2, injected: []injectedBehaviour{ { write: injectedWriteBehaviour{ @@ -1992,13 +1992,13 @@ func TestRegionJobResetRetryCounter(t *testing.T) { keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}}, engine: &Engine{}, injected: getNeedRescanWhenIngestBehaviour(), - retryCount: maxWriteAndIngestRetryTimes, + retryCount: MaxWriteAndIngestRetryTimes, }, { keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}}, engine: &Engine{}, injected: getSuccessInjectedBehaviour(), - retryCount: maxWriteAndIngestRetryTimes, + retryCount: MaxWriteAndIngestRetryTimes, }, }, }, diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index a021937bdcad9..e332a2d3e59cd 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -451,6 +451,7 @@ func (local *Backend) ingest(ctx context.Context, j *regionJob) (err error) { log.FromContext(ctx).Warn("meet underlying error, will retry ingest", log.ShortError(err), logutil.SSTMetas(j.writeResult.sstMeta), logutil.Region(j.region.Region), logutil.Leader(j.region.Leader)) + j.lastRetryableErr = err continue } canContinue, err := j.convertStageOnIngestError(resp) @@ -501,6 +502,9 @@ func (local *Backend) checkWriteStall( // doIngest send ingest commands to TiKV based on regionJob.writeResult.sstMeta. // When meet error, it will remove finished sstMetas before return. func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestResponse, error) { + failpoint.Inject("doIngestFailed", func() { + failpoint.Return(nil, errors.New("injected error")) + }) clientFactory := local.importClientFactory supportMultiIngest := local.supportMultiIngest shouldCheckWriteStall := local.ShouldCheckWriteStall diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index bb6af7e925673..d28dcd61d0043 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -565,3 +565,29 @@ func TestAddUniqueIndexDuplicatedError(t *testing.T) { tk.MustExec("INSERT INTO `b1cce552` (`f5d9aecb`, `d9337060`, `4c74082f`, `9215adc3`, `85ad5a07`, `8c60260f`, `8069da7b`, `91e218e1`) VALUES ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 846, 'N6QD1=@ped@owVoJx', '9soPM2d6H', 'Tv%'), ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 9052, '_HWaf#gD!bw', '9soPM2d6H', 'Tv%');") tk.MustGetErrCode("ALTER TABLE `b1cce552` ADD unique INDEX `65290727` (`4c74082f`, `d9337060`, `8069da7b`);", errno.ErrDupEntry) } + +func TestIssue55808(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("set global tidb_enable_dist_task = off;") + tk.MustExec("set global tidb_ddl_error_count_limit = 0") + + backup := local.MaxWriteAndIngestRetryTimes + local.MaxWriteAndIngestRetryTimes = 1 + t.Cleanup(func() { + local.MaxWriteAndIngestRetryTimes = backup + }) + + tk.MustExec("create table t (a int primary key, b int);") + for i := 0; i < 4; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000)) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/doIngestFailed", "return()")) + err := tk.ExecToErr("alter table t add index idx(a);") + require.ErrorContains(t, err, "injected error") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/doIngestFailed")) +}