Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: adapt new behaviour that "write" may return epoch error #47667

Merged
merged 14 commits into from
Oct 31, 2023
Merged
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,6 @@ func (local *Backend) executeJob(
// if it's retryable error, we retry from scanning region
log.FromContext(ctx).Warn("meet retryable error when writing to TiKV",
log.ShortError(err), zap.Stringer("job stage", job.stage))
job.convertStageTo(needRescan)
job.lastRetryableErr = err
return nil
}
Expand Down
57 changes: 48 additions & 9 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"container/heap"
"context"
"fmt"
"io"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -180,12 +181,29 @@ func (j *regionJob) done(wg *sync.WaitGroup) {
}

// writeToTiKV writes the data to TiKV and mark this job as wrote stage.
// if any write logic has error, writeToTiKV will set job to a proper stage and return nil. TODO: <-check this
// if any write logic has error, writeToTiKV will set job to a proper stage and return nil.
// if any underlying logic has error, writeToTiKV will return an error.
// we don't need to do cleanup for the pairs written to tikv if encounters an error,
// tikv will take the responsibility to do so.
// TODO: let client-go provide a high-level write interface.
func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
err := local.doWrite(ctx, j)
if err == nil {
return nil
}
if !common.IsRetryableError(err) {
return err
}
// currently only one case will restart write
if strings.Contains(err.Error(), "RequestTooNew") {
j.convertStageTo(regionScanned)
return err
}
j.convertStageTo(needRescan)
return err
}

func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
if j.stage != regionScanned {
return nil
}
Expand Down Expand Up @@ -239,9 +257,25 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
ApiVersion: apiVersion,
}

annotateErr := func(in error, peer *metapb.Peer) error {
failpoint.Inject("changeEpochVersion", func(val failpoint.Value) {
cloned := *meta.RegionEpoch
meta.RegionEpoch = &cloned
i := val.(int)
if i >= 0 {
meta.RegionEpoch.Version += uint64(i)
} else {
meta.RegionEpoch.ConfVer -= uint64(-i)
}
})

annotateErr := func(in error, peer *metapb.Peer, msg string) error {
// annotate the error with peer/store/region info to help debug.
return errors.Annotatef(in, "peer %d, store %d, region %d, epoch %s", peer.Id, peer.StoreId, region.Id, region.RegionEpoch.String())
return errors.Annotatef(
in,
"peer %d, store %d, region %d, epoch %s, %s",
peer.Id, peer.StoreId, region.Id, region.RegionEpoch.String(),
msg,
)
}

leaderID := j.region.Leader.GetId()
Expand All @@ -261,17 +295,17 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
for _, peer := range region.GetPeers() {
cli, err := clientFactory.Create(ctx, peer.StoreId)
if err != nil {
return annotateErr(err, peer)
return annotateErr(err, peer, "when create client")
}

wstream, err := cli.Write(ctx)
if err != nil {
return annotateErr(err, peer)
return annotateErr(err, peer, "when open write stream")
}

// Bind uuid for this write request
if err = wstream.Send(req); err != nil {
return annotateErr(err, peer)
return annotateErr(err, peer, "when send meta")
}
clients = append(clients, wstream)
allPeers = append(allPeers, peer)
Expand Down Expand Up @@ -310,7 +344,12 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
return errors.Trace(err)
}
if err := clients[i].SendMsg(preparedMsg); err != nil {
return annotateErr(err, allPeers[i])
if err == io.EOF {
// if it's EOF, need RecvMsg to get the error
dummy := &sst.WriteResponse{}
err = clients[i].RecvMsg(dummy)
}
return annotateErr(err, allPeers[i], "when send data")
}
}
failpoint.Inject("afterFlushKVs", func() {
Expand Down Expand Up @@ -384,10 +423,10 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
for i, wStream := range clients {
resp, closeErr := wStream.CloseAndRecv()
if closeErr != nil {
return annotateErr(closeErr, allPeers[i])
return annotateErr(closeErr, allPeers[i], "when close write stream")
}
if resp.Error != nil {
return annotateErr(errors.New(resp.Error.Message), allPeers[i])
return annotateErr(errors.New("resp error: "+resp.Error.Message), allPeers[i], "when close write stream")
}
if leaderID == region.Peers[i].GetId() {
leaderPeerMetas = resp.Metas
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ func isSingleRetryableError(err error) bool {
// 2. in write TiKV: rpc error: code = Unknown desc = EngineTraits(Engine(Status { code: IoError, sub_code:
// None, sev: NoError, state: \"IO error: No such file or directory: while stat a file for size:
// /...../63992d9c-fbc8-4708-b963-32495b299027_32279707_325_5280_write.sst: No such file or directory\"
// 3. in write TiKV: rpc error: code = Unknown desc = Engine("request region 26 is staler than local region,
// local epoch conf_ver: 5 version: 65, request epoch conf_ver: 5 version: 64, please rescan region later")
return true
default:
return false
Expand Down
29 changes: 29 additions & 0 deletions br/tests/lightning_max_random/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ cleanup() {

cleanup

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/changeEpochVersion=1*return(-1)"

# auto_random_max = 2^{64-1-10}-1
# db.test contains key auto_random_max - 1
# db.test1 contains key auto_random_max
Expand All @@ -63,4 +65,31 @@ check_contains 'ERROR'
run_sql 'INSERT INTO db.test2(b) VALUES(33);'
run_sql 'INSERT INTO db.test2(b) VALUES(44);'
run_sql 'INSERT INTO db.test2(b) VALUES(55);'

grep 'RequestTooOld' "$TEST_DIR/lightning.log" | grep -q 'needRescan'
cleanup

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/changeEpochVersion=1*return(10)"

# auto_random_max = 2^{64-1-10}-1
# db.test contains key auto_random_max - 1
# db.test1 contains key auto_random_max
# db.test2 contains key auto_random_max + 1 (overflow)
run_lightning --sorted-kv-dir "$TEST_DIR/sst" --config "$CUR/config.toml" --log-file "$TEST_DIR/lightning.log"
check_result
# successfully insert: d.test auto_random key has not reached maximum
run_sql 'INSERT INTO db.test(b) VALUES(11);'
# fail for further insertion
run_sql 'INSERT INTO db.test(b) VALUES(22);' 2>&1 | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt"
check_contains 'ERROR'
# fail: db.test1 has key auto_random_max
run_sql 'INSERT INTO db.test1(b) VALUES(11);'
run_sql 'INSERT INTO db.test1(b) VALUES(22);' 2>&1 | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt"
check_contains 'ERROR'
# successfully insert for overflow key
run_sql 'INSERT INTO db.test2(b) VALUES(33);'
run_sql 'INSERT INTO db.test2(b) VALUES(44);'
run_sql 'INSERT INTO db.test2(b) VALUES(55);'

grep 'RequestTooNew' "$TEST_DIR/lightning.log" | grep -q 'regionScanned'
cleanup
Loading