Skip to content

Commit

Permalink
lightning: adapt new behaviour that "write" may return epoch error (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Oct 31, 2023
1 parent 99a4f35 commit 926a1e5
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 10 deletions.
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,7 +1434,6 @@ func (local *Backend) executeJob(
// if it's retryable error, we retry from scanning region
log.FromContext(ctx).Warn("meet retryable error when writing to TiKV",
log.ShortError(err), zap.Stringer("job stage", job.stage))
job.convertStageTo(needRescan)
job.lastRetryableErr = err
return nil
}
Expand Down
57 changes: 48 additions & 9 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"container/heap"
"context"
"fmt"
"io"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -179,12 +180,29 @@ func (j *regionJob) done(wg *sync.WaitGroup) {
}

// writeToTiKV writes the data to TiKV and mark this job as wrote stage.
// if any write logic has error, writeToTiKV will set job to a proper stage and return nil. TODO: <-check this
// if any write logic has error, writeToTiKV will set job to a proper stage and return nil.
// if any underlying logic has error, writeToTiKV will return an error.
// we don't need to do cleanup for the pairs written to tikv if encounters an error,
// tikv will take the responsibility to do so.
// TODO: let client-go provide a high-level write interface.
func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
err := local.doWrite(ctx, j)
if err == nil {
return nil
}
if !common.IsRetryableError(err) {
return err
}
// currently only one case will restart write
if strings.Contains(err.Error(), "RequestTooNew") {
j.convertStageTo(regionScanned)
return err
}
j.convertStageTo(needRescan)
return err
}

func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
if j.stage != regionScanned {
return nil
}
Expand Down Expand Up @@ -238,9 +256,25 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
ApiVersion: apiVersion,
}

annotateErr := func(in error, peer *metapb.Peer) error {
failpoint.Inject("changeEpochVersion", func(val failpoint.Value) {
cloned := *meta.RegionEpoch
meta.RegionEpoch = &cloned
i := val.(int)
if i >= 0 {
meta.RegionEpoch.Version += uint64(i)
} else {
meta.RegionEpoch.ConfVer -= uint64(-i)
}
})

annotateErr := func(in error, peer *metapb.Peer, msg string) error {
// annotate the error with peer/store/region info to help debug.
return errors.Annotatef(in, "peer %d, store %d, region %d, epoch %s", peer.Id, peer.StoreId, region.Id, region.RegionEpoch.String())
return errors.Annotatef(
in,
"peer %d, store %d, region %d, epoch %s, %s",
peer.Id, peer.StoreId, region.Id, region.RegionEpoch.String(),
msg,
)
}

leaderID := j.region.Leader.GetId()
Expand All @@ -260,17 +294,17 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
for _, peer := range region.GetPeers() {
cli, err := clientFactory.Create(ctx, peer.StoreId)
if err != nil {
return annotateErr(err, peer)
return annotateErr(err, peer, "when create client")
}

wstream, err := cli.Write(ctx)
if err != nil {
return annotateErr(err, peer)
return annotateErr(err, peer, "when open write stream")
}

// Bind uuid for this write request
if err = wstream.Send(req); err != nil {
return annotateErr(err, peer)
return annotateErr(err, peer, "when send meta")
}
clients = append(clients, wstream)
allPeers = append(allPeers, peer)
Expand Down Expand Up @@ -309,7 +343,12 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
return errors.Trace(err)
}
if err := clients[i].SendMsg(preparedMsg); err != nil {
return annotateErr(err, allPeers[i])
if err == io.EOF {
// if it's EOF, need RecvMsg to get the error
dummy := &sst.WriteResponse{}
err = clients[i].RecvMsg(dummy)
}
return annotateErr(err, allPeers[i], "when send data")
}
}
failpoint.Inject("afterFlushKVs", func() {
Expand Down Expand Up @@ -383,10 +422,10 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
for i, wStream := range clients {
resp, closeErr := wStream.CloseAndRecv()
if closeErr != nil {
return annotateErr(closeErr, allPeers[i])
return annotateErr(closeErr, allPeers[i], "when close write stream")
}
if resp.Error != nil {
return annotateErr(errors.New(resp.Error.Message), allPeers[i])
return annotateErr(errors.New("resp error: "+resp.Error.Message), allPeers[i], "when close write stream")
}
if leaderID == region.Peers[i].GetId() {
leaderPeerMetas = resp.Metas
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func isSingleRetryableError(err error) bool {
// 2. in write TiKV: rpc error: code = Unknown desc = EngineTraits(Engine(Status { code: IoError, sub_code:
// None, sev: NoError, state: \"IO error: No such file or directory: while stat a file for size:
// /...../63992d9c-fbc8-4708-b963-32495b299027_32279707_325_5280_write.sst: No such file or directory\"
// 3. in write TiKV: rpc error: code = Unknown desc = Engine("request region 26 is staler than local region,
// local epoch conf_ver: 5 version: 65, request epoch conf_ver: 5 version: 64, please rescan region later")
return true
default:
return false
Expand Down
29 changes: 29 additions & 0 deletions br/tests/lightning_max_random/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ cleanup() {

cleanup

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/changeEpochVersion=1*return(-1)"

# auto_random_max = 2^{64-1-10}-1
# db.test contains key auto_random_max - 1
# db.test1 contains key auto_random_max
Expand All @@ -63,4 +65,31 @@ check_contains 'ERROR'
run_sql 'INSERT INTO db.test2(b) VALUES(33);'
run_sql 'INSERT INTO db.test2(b) VALUES(44);'
run_sql 'INSERT INTO db.test2(b) VALUES(55);'

grep 'RequestTooOld' "$TEST_DIR/lightning.log" | grep -q 'needRescan'
cleanup

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/changeEpochVersion=1*return(10)"

# auto_random_max = 2^{64-1-10}-1
# db.test contains key auto_random_max - 1
# db.test1 contains key auto_random_max
# db.test2 contains key auto_random_max + 1 (overflow)
run_lightning --sorted-kv-dir "$TEST_DIR/sst" --config "$CUR/config.toml" --log-file "$TEST_DIR/lightning.log"
check_result
# successfully insert: d.test auto_random key has not reached maximum
run_sql 'INSERT INTO db.test(b) VALUES(11);'
# fail for further insertion
run_sql 'INSERT INTO db.test(b) VALUES(22);' 2>&1 | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt"
check_contains 'ERROR'
# fail: db.test1 has key auto_random_max
run_sql 'INSERT INTO db.test1(b) VALUES(11);'
run_sql 'INSERT INTO db.test1(b) VALUES(22);' 2>&1 | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt"
check_contains 'ERROR'
# successfully insert for overflow key
run_sql 'INSERT INTO db.test2(b) VALUES(33);'
run_sql 'INSERT INTO db.test2(b) VALUES(44);'
run_sql 'INSERT INTO db.test2(b) VALUES(55);'

grep 'RequestTooNew' "$TEST_DIR/lightning.log" | grep -q 'regionScanned'
cleanup

0 comments on commit 926a1e5

Please sign in to comment.