Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: fix forget to set lastRetryableErr when ingest RPC fail (#56345) #56913

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_library(
"//util/codec",
"//util/engine",
"//util/hack",
"//util/intest",
"//util/mathutil",
"//util/ranger",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
25 changes: 20 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/mathutil"
"github.com/tikv/client-go/v2/oracle"
tikvclient "github.com/tikv/client-go/v2/tikv"
Expand All @@ -76,9 +77,6 @@
dialTimeout = 5 * time.Minute
maxRetryTimes = 5
defaultRetryBackoffTime = 3 * time.Second
// maxWriteAndIngestRetryTimes is the max retry times for write and ingest.
// A large retry times is for tolerating tikv cluster failures.
maxWriteAndIngestRetryTimes = 30

gRPCKeepAliveTime = 10 * time.Minute
gRPCKeepAliveTimeout = 5 * time.Minute
Expand Down Expand Up @@ -111,6 +109,10 @@

errorEngineClosed = errors.New("engine is closed")
maxRetryBackoffSecond = 30

// MaxWriteAndIngestRetryTimes is the max retry times for write and ingest.
// A large retry times is for tolerating tikv cluster failures.
MaxWriteAndIngestRetryTimes = 30
)

// ImportClientFactory is factory to create new import client for specific store.
Expand Down Expand Up @@ -1523,8 +1525,21 @@
switch job.stage {
case regionScanned, wrote:
job.retryCount++
if job.retryCount > maxWriteAndIngestRetryTimes {
firstErr.Set(job.lastRetryableErr)
if job.retryCount > MaxWriteAndIngestRetryTimes {
lastErr := job.lastRetryableErr
if lastErr == nil {
if intest.InTest {

Check warning on line 1531 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L1528-L1531

Added lines #L1528 - L1531 were not covered by tests
panic("lastRetryableErr should not be nil")
}
lastErr = errors.New("retry limit exceeded")
log.FromContext(ctx).Error(
"lastRetryableErr should not be nil",
logutil.Key("startKey", job.keyRange.start),
logutil.Key("endKey", job.keyRange.end),
zap.Stringer("stage", job.stage),

Check warning on line 1539 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L1533-L1539

Added lines #L1533 - L1539 were not covered by tests
zap.Error(lastErr))
}
firstErr.Set(lastErr)

Check warning on line 1542 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L1541-L1542

Added lines #L1541 - L1542 were not covered by tests
workerCancel()
jobWg.Done()
continue
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1921,7 +1921,7 @@ func TestDoImport(t *testing.T) {
{
keyRange: Range{start: []byte{'a'}, end: []byte{'b'}},
engine: &Engine{},
retryCount: maxWriteAndIngestRetryTimes - 1,
retryCount: MaxWriteAndIngestRetryTimes - 1,
injected: getSuccessInjectedBehaviour(),
},
},
Expand All @@ -1931,7 +1931,7 @@ func TestDoImport(t *testing.T) {
{
keyRange: Range{start: []byte{'b'}, end: []byte{'c'}},
engine: &Engine{},
retryCount: maxWriteAndIngestRetryTimes - 1,
retryCount: MaxWriteAndIngestRetryTimes - 1,
injected: getSuccessInjectedBehaviour(),
},
},
Expand All @@ -1941,7 +1941,7 @@ func TestDoImport(t *testing.T) {
{
keyRange: Range{start: []byte{'c'}, end: []byte{'d'}},
engine: &Engine{},
retryCount: maxWriteAndIngestRetryTimes - 2,
retryCount: MaxWriteAndIngestRetryTimes - 2,
injected: []injectedBehaviour{
{
write: injectedWriteBehaviour{
Expand Down Expand Up @@ -1992,13 +1992,13 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}},
engine: &Engine{},
injected: getNeedRescanWhenIngestBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
retryCount: MaxWriteAndIngestRetryTimes,
},
{
keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}},
engine: &Engine{},
injected: getSuccessInjectedBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
retryCount: MaxWriteAndIngestRetryTimes,
},
},
},
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@
log.FromContext(ctx).Warn("meet underlying error, will retry ingest",
log.ShortError(err), logutil.SSTMetas(j.writeResult.sstMeta),
logutil.Region(j.region.Region), logutil.Leader(j.region.Leader))
j.lastRetryableErr = err

Check warning on line 454 in br/pkg/lightning/backend/local/region_job.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/region_job.go#L454

Added line #L454 was not covered by tests
continue
}
canContinue, err := j.convertStageOnIngestError(resp)
Expand Down Expand Up @@ -501,6 +502,9 @@
// doIngest send ingest commands to TiKV based on regionJob.writeResult.sstMeta.
// When meet error, it will remove finished sstMetas before return.
func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestResponse, error) {
failpoint.Inject("doIngestFailed", func() {
failpoint.Return(nil, errors.New("injected error"))
})

Check warning on line 507 in br/pkg/lightning/backend/local/region_job.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/region_job.go#L506-L507

Added lines #L506 - L507 were not covered by tests
clientFactory := local.importClientFactory
supportMultiIngest := local.supportMultiIngest
shouldCheckWriteStall := local.ShouldCheckWriteStall
Expand Down
26 changes: 26 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,29 @@ func TestAddUniqueIndexDuplicatedError(t *testing.T) {
tk.MustExec("INSERT INTO `b1cce552` (`f5d9aecb`, `d9337060`, `4c74082f`, `9215adc3`, `85ad5a07`, `8c60260f`, `8069da7b`, `91e218e1`) VALUES ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 846, 'N6QD1=@ped@owVoJx', '9soPM2d6H', 'Tv%'), ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 9052, '_HWaf#gD!bw', '9soPM2d6H', 'Tv%');")
tk.MustGetErrCode("ALTER TABLE `b1cce552` ADD unique INDEX `65290727` (`4c74082f`, `d9337060`, `8069da7b`);", errno.ErrDupEntry)
}

func TestIssue55808(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set global tidb_enable_dist_task = off;")
tk.MustExec("set global tidb_ddl_error_count_limit = 0")

backup := local.MaxWriteAndIngestRetryTimes
local.MaxWriteAndIngestRetryTimes = 1
t.Cleanup(func() {
local.MaxWriteAndIngestRetryTimes = backup
})

tk.MustExec("create table t (a int primary key, b int);")
for i := 0; i < 4; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/doIngestFailed", "return()"))
err := tk.ExecToErr("alter table t add index idx(a);")
require.ErrorContains(t, err, "injected error")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/doIngestFailed"))
}