Skip to content

Commit

Permalink
lightning: fix forget to set lastRetryableErr when ingest RPC fail (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 29, 2024
1 parent 631b63d commit 41f1b9e
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 10 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_library(
"//util/codec",
"//util/engine",
"//util/hack",
"//util/intest",
"//util/mathutil",
"//util/ranger",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
25 changes: 20 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/mathutil"
"github.com/tikv/client-go/v2/oracle"
tikvclient "github.com/tikv/client-go/v2/tikv"
Expand All @@ -76,9 +77,6 @@ const (
dialTimeout = 5 * time.Minute
maxRetryTimes = 5
defaultRetryBackoffTime = 3 * time.Second
// maxWriteAndIngestRetryTimes is the max retry times for write and ingest.
// A large retry times is for tolerating tikv cluster failures.
maxWriteAndIngestRetryTimes = 30

gRPCKeepAliveTime = 10 * time.Minute
gRPCKeepAliveTimeout = 5 * time.Minute
Expand Down Expand Up @@ -111,6 +109,10 @@ var (

errorEngineClosed = errors.New("engine is closed")
maxRetryBackoffSecond = 30

// MaxWriteAndIngestRetryTimes is the max retry times for write and ingest.
// A large retry times is for tolerating tikv cluster failures.
MaxWriteAndIngestRetryTimes = 30
)

// ImportClientFactory is factory to create new import client for specific store.
Expand Down Expand Up @@ -1523,8 +1525,21 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges
switch job.stage {
case regionScanned, wrote:
job.retryCount++
if job.retryCount > maxWriteAndIngestRetryTimes {
firstErr.Set(job.lastRetryableErr)
if job.retryCount > MaxWriteAndIngestRetryTimes {
lastErr := job.lastRetryableErr
if lastErr == nil {
if intest.InTest {
panic("lastRetryableErr should not be nil")
}
lastErr = errors.New("retry limit exceeded")
log.FromContext(ctx).Error(
"lastRetryableErr should not be nil",
logutil.Key("startKey", job.keyRange.start),
logutil.Key("endKey", job.keyRange.end),
zap.Stringer("stage", job.stage),
zap.Error(lastErr))
}
firstErr.Set(lastErr)
workerCancel()
jobWg.Done()
continue
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1921,7 +1921,7 @@ func TestDoImport(t *testing.T) {
{
keyRange: Range{start: []byte{'a'}, end: []byte{'b'}},
engine: &Engine{},
retryCount: maxWriteAndIngestRetryTimes - 1,
retryCount: MaxWriteAndIngestRetryTimes - 1,
injected: getSuccessInjectedBehaviour(),
},
},
Expand All @@ -1931,7 +1931,7 @@ func TestDoImport(t *testing.T) {
{
keyRange: Range{start: []byte{'b'}, end: []byte{'c'}},
engine: &Engine{},
retryCount: maxWriteAndIngestRetryTimes - 1,
retryCount: MaxWriteAndIngestRetryTimes - 1,
injected: getSuccessInjectedBehaviour(),
},
},
Expand All @@ -1941,7 +1941,7 @@ func TestDoImport(t *testing.T) {
{
keyRange: Range{start: []byte{'c'}, end: []byte{'d'}},
engine: &Engine{},
retryCount: maxWriteAndIngestRetryTimes - 2,
retryCount: MaxWriteAndIngestRetryTimes - 2,
injected: []injectedBehaviour{
{
write: injectedWriteBehaviour{
Expand Down Expand Up @@ -1992,13 +1992,13 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}},
engine: &Engine{},
injected: getNeedRescanWhenIngestBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
retryCount: MaxWriteAndIngestRetryTimes,
},
{
keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}},
engine: &Engine{},
injected: getSuccessInjectedBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
retryCount: MaxWriteAndIngestRetryTimes,
},
},
},
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ func (local *Backend) ingest(ctx context.Context, j *regionJob) (err error) {
log.FromContext(ctx).Warn("meet underlying error, will retry ingest",
log.ShortError(err), logutil.SSTMetas(j.writeResult.sstMeta),
logutil.Region(j.region.Region), logutil.Leader(j.region.Leader))
j.lastRetryableErr = err
continue
}
canContinue, err := j.convertStageOnIngestError(resp)
Expand Down Expand Up @@ -501,6 +502,9 @@ func (local *Backend) checkWriteStall(
// doIngest send ingest commands to TiKV based on regionJob.writeResult.sstMeta.
// When meet error, it will remove finished sstMetas before return.
func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestResponse, error) {
failpoint.Inject("doIngestFailed", func() {
failpoint.Return(nil, errors.New("injected error"))
})
clientFactory := local.importClientFactory
supportMultiIngest := local.supportMultiIngest
shouldCheckWriteStall := local.ShouldCheckWriteStall
Expand Down
26 changes: 26 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,29 @@ func TestAddUniqueIndexDuplicatedError(t *testing.T) {
tk.MustExec("INSERT INTO `b1cce552` (`f5d9aecb`, `d9337060`, `4c74082f`, `9215adc3`, `85ad5a07`, `8c60260f`, `8069da7b`, `91e218e1`) VALUES ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 846, 'N6QD1=@ped@owVoJx', '9soPM2d6H', 'Tv%'), ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 9052, '_HWaf#gD!bw', '9soPM2d6H', 'Tv%');")
tk.MustGetErrCode("ALTER TABLE `b1cce552` ADD unique INDEX `65290727` (`4c74082f`, `d9337060`, `8069da7b`);", errno.ErrDupEntry)
}

func TestIssue55808(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set global tidb_enable_dist_task = off;")
tk.MustExec("set global tidb_ddl_error_count_limit = 0")

backup := local.MaxWriteAndIngestRetryTimes
local.MaxWriteAndIngestRetryTimes = 1
t.Cleanup(func() {
local.MaxWriteAndIngestRetryTimes = backup
})

tk.MustExec("create table t (a int primary key, b int);")
for i := 0; i < 4; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/doIngestFailed", "return()"))
err := tk.ExecToErr("alter table t add index idx(a);")
require.ErrorContains(t, err, "injected error")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/doIngestFailed"))
}

0 comments on commit 41f1b9e

Please sign in to comment.