From dd84529c0f51e88ad18d2292ec014a9430652a93 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 1 Aug 2024 20:38:32 +0800 Subject: [PATCH 1/5] fix missed slice during retry for region error Signed-off-by: Ping Yu --- txnkv/transaction/2pc.go | 4 +- txnkv/transaction/txn_file.go | 73 ++++++++++++++++++++++++----------- 2 files changed, 52 insertions(+), 25 deletions(-) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index f1394fa9f..4aa2cceb7 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -744,7 +744,7 @@ func (c *twoPhaseCommitter) primary() []byte { if c.mutations != nil { return c.mutations.GetKey(0) } - if c.txnFileCtx.slice.len() > 0 { + if c.txnFileCtx.slice.Len() > 0 { return c.txnFileCtx.slice.chunkRanges[0].smallest } return nil @@ -1228,7 +1228,7 @@ func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, l logutil.Logger(bo.GetCtx()).Info("send TxnHeartBeat", zap.Uint64("startTS", c.startTS), zap.Uint64("newTTL", newTTL)) startTime := time.Now() - _, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, primaryKey, c.startTS, newTTL, c.txnFileCtx.slice.len() > 0) + _, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, primaryKey, c.startTS, newTTL, c.txnFileCtx.slice.Len() > 0) if err != nil { keepFail++ metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds()) diff --git a/txnkv/transaction/txn_file.go b/txnkv/transaction/txn_file.go index ace2943ca..a851d7ad5 100644 --- a/txnkv/transaction/txn_file.go +++ b/txnkv/transaction/txn_file.go @@ -81,12 +81,12 @@ func (b chunkBatch) String() string { // chunkBatch.txnChunkSlice should be sorted by smallest. func (b *chunkBatch) getSampleKeys() [][]byte { keys := make([][]byte, 0) - start := sort.Search(b.txnChunkSlice.len(), func(i int) bool { + start := sort.Search(b.txnChunkSlice.Len(), func(i int) bool { return bytes.Compare(b.region.StartKey, b.txnChunkSlice.chunkRanges[i].smallest) <= 0 }) - end := b.txnChunkSlice.len() + end := b.txnChunkSlice.Len() if len(b.region.EndKey) > 0 { - end = sort.Search(b.txnChunkSlice.len(), func(i int) bool { + end = sort.Search(b.txnChunkSlice.Len(), func(i int) bool { return bytes.Compare(b.txnChunkSlice.chunkRanges[i].smallest, b.region.EndKey) >= 0 }) } @@ -125,19 +125,8 @@ func (s *txnChunkSlice) Biggest() []byte { } func (cs *txnChunkSlice) appendSlice(other *txnChunkSlice) { - if cs.len() == 0 { - cs.chunkIDs = append(cs.chunkIDs, other.chunkIDs...) - cs.chunkRanges = append(cs.chunkRanges, other.chunkRanges...) - return - } - lastChunkRange := cs.chunkRanges[len(cs.chunkIDs)-1] - for i, chunkRange := range other.chunkRanges { - if bytes.Compare(lastChunkRange.smallest, chunkRange.smallest) < 0 { - cs.chunkIDs = append(cs.chunkIDs, other.chunkIDs[i:]...) - cs.chunkRanges = append(cs.chunkRanges, other.chunkRanges[i:]...) - break - } - } + cs.chunkIDs = append(cs.chunkIDs, other.chunkIDs...) + cs.chunkRanges = append(cs.chunkRanges, other.chunkRanges...) } func (cs *txnChunkSlice) append(chunkID uint64, chunkRange txnChunkRange) { @@ -145,10 +134,41 @@ func (cs *txnChunkSlice) append(chunkID uint64, chunkRange txnChunkRange) { cs.chunkRanges = append(cs.chunkRanges, chunkRange) } -func (cs *txnChunkSlice) len() int { +func (cs *txnChunkSlice) Len() int { return len(cs.chunkIDs) } +func (cs *txnChunkSlice) Swap(i, j int) { + cs.chunkIDs[i], cs.chunkIDs[j] = cs.chunkIDs[j], cs.chunkIDs[i] + cs.chunkRanges[i], cs.chunkRanges[j] = cs.chunkRanges[j], cs.chunkRanges[i] +} + +func (cs *txnChunkSlice) Less(i, j int) bool { + return bytes.Compare(cs.chunkRanges[i].smallest, cs.chunkRanges[j].smallest) < 0 +} + +func (cs *txnChunkSlice) dedup() { + if len(cs.chunkIDs) == 0 { + return + } + + sort.Sort(cs) + + newIDs := make([]uint64, 0, len(cs.chunkIDs)) + newRanges := make([]txnChunkRange, 0, len(cs.chunkRanges)) + newIDs = append(newIDs, cs.chunkIDs[0]) + newRanges = append(newRanges, cs.chunkRanges[0]) + for i := 1; i < len(cs.chunkIDs); i++ { + if cs.chunkIDs[i] == cs.chunkIDs[i-1] { + continue + } + newIDs = append(newIDs, cs.chunkIDs[i]) + newRanges = append(newRanges, cs.chunkRanges[i]) + } + cs.chunkIDs = newIDs + cs.chunkRanges = newRanges +} + // []chunkBatch is sorted by region.StartKey. // Note: regions may be overlapping. func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoffer, mutations CommitterMutations) ([]chunkBatch, error) { @@ -190,7 +210,7 @@ func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoff return cmp < 0 }) - logutil.Logger(bo.GetCtx()).Debug("txn file group to batches", zap.Stringers("batches", batches)) + logutil.Logger(bo.GetCtx()).Info("txn file group to batches", zap.Stringers("batches", batches)) return batches, nil } @@ -527,7 +547,7 @@ func (c *twoPhaseCommitter) executeTxnFile(ctx context.Context) (err error) { undetermined := c.mu.undeterminedErr != nil c.mu.RUnlock() if !committed && !undetermined { - if c.txnFileCtx.slice.len() > 0 { + if c.txnFileCtx.slice.Len() > 0 { err1 := c.executeTxnFileAction(retry.NewBackofferWithVars(ctx, int(CommitMaxBackoff), c.txn.vars), c.txnFileCtx.slice, txnFileRollbackAction{}) if err1 != nil { logutil.Logger(ctx).Error("txn file: rollback on error failed", zap.Error(err1)) @@ -655,15 +675,17 @@ func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice regionErrChunks.appendSlice(r.regionErrSlice) } } + regionErrChunks.dedup() return regionErrChunks, nil } func (c *twoPhaseCommitter) executeTxnFileSliceSingleBatch(bo *retry.Backoffer, batch chunkBatch, action txnFileAction) (*txnChunkSlice, error) { resp, err1 := action.executeBatch(c, bo, batch) - logutil.Logger(bo.GetCtx()).Debug("txn file: execute batch finished", + logutil.Logger(bo.GetCtx()).Info("txn file: execute batch finished", zap.Uint64("startTS", c.startTS), zap.Any("batch", batch), zap.Stringer("action", action), + zap.Any("resp", resp), zap.Error(err1)) if err1 != nil { return nil, err1 @@ -692,6 +714,11 @@ func (c *twoPhaseCommitter) executeTxnFileSliceSingleBatch(bo *retry.Backoffer, return nil, err1 } if regionErr != nil { + logutil.Logger(bo.GetCtx()).Info("txn file: execute batch failed, region error", + zap.Uint64("startTS", c.startTS), + zap.Stringer("action", action), + zap.Any("batch", batch), + zap.Stringer("regionErr", regionErr)) return &batch.txnChunkSlice, nil } return nil, nil @@ -706,7 +733,7 @@ func (c *twoPhaseCommitter) executeTxnFileSliceWithRetry(bo *retry.Backoffer, ch if err != nil { return errors.WithStack(err) } - if regionErrChunks.len() == 0 { + if regionErrChunks.Len() == 0 { return nil } currentChunks = regionErrChunks @@ -916,8 +943,8 @@ func (c *twoPhaseCommitter) preSplitTxnFileRegions(bo *retry.Backoffer) error { } var splitKeys [][]byte for _, batch := range batches { - if batch.len() > PreSplitRegionChunks { - for i := PreSplitRegionChunks; i < batch.len(); i += PreSplitRegionChunks { + if batch.Len() > PreSplitRegionChunks { + for i := PreSplitRegionChunks; i < batch.Len(); i += PreSplitRegionChunks { splitKeys = append(splitKeys, batch.chunkRanges[i].smallest) } } From f9e5a9d17a25809523eb1e797e660ed540486b35 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 2 Aug 2024 10:28:28 +0800 Subject: [PATCH 2/5] fix commit error Signed-off-by: Ping Yu --- txnkv/transaction/txn_file.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/txnkv/transaction/txn_file.go b/txnkv/transaction/txn_file.go index a851d7ad5..734db3367 100644 --- a/txnkv/transaction/txn_file.go +++ b/txnkv/transaction/txn_file.go @@ -606,6 +606,9 @@ func (c *twoPhaseCommitter) executeTxnFile(ctx context.Context) (err error) { } err = c.executeTxnFileAction(commitBo, c.txnFileCtx.slice, txnFileCommitAction{commitTS: c.commitTS}) stepDone("commit") + if err != nil { + return + } err = c.afterExecuteTxnFile(rcInterceptor, reqInfo, ruDetails) return @@ -757,11 +760,12 @@ func (c *twoPhaseCommitter) executeTxnFilePrimaryBatch(bo *retry.Backoffer, firs firstBatch.isPrimary = true resp, err := action.executeBatch(c, bo, firstBatch) - logutil.Logger(bo.GetCtx()).Debug("txn file: execute primary batch finished", + logutil.Logger(bo.GetCtx()).Info("txn file: execute primary batch finished", zap.Uint64("startTS", c.startTS), zap.String("primary", kv.StrKey(c.primary())), zap.Stringer("action", action), zap.Stringer("batch", firstBatch), + zap.Any("resp", resp), zap.Error(err)) if err != nil { return nil, errors.WithStack(err) From 5f8d4c55e3c0e3896dde09ae9e4f533bfba0b8a8 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 2 Aug 2024 16:24:21 +0800 Subject: [PATCH 3/5] polish Signed-off-by: Ping Yu --- txnkv/transaction/txn_file.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/txnkv/transaction/txn_file.go b/txnkv/transaction/txn_file.go index 734db3367..6f990a1e8 100644 --- a/txnkv/transaction/txn_file.go +++ b/txnkv/transaction/txn_file.go @@ -147,8 +147,8 @@ func (cs *txnChunkSlice) Less(i, j int) bool { return bytes.Compare(cs.chunkRanges[i].smallest, cs.chunkRanges[j].smallest) < 0 } -func (cs *txnChunkSlice) dedup() { - if len(cs.chunkIDs) == 0 { +func (cs *txnChunkSlice) sortAndDedup() { + if len(cs.chunkIDs) <= 1 { return } @@ -210,7 +210,7 @@ func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoff return cmp < 0 }) - logutil.Logger(bo.GetCtx()).Info("txn file group to batches", zap.Stringers("batches", batches)) + logutil.Logger(bo.GetCtx()).Debug("txn file group to batches", zap.Stringers("batches", batches)) return batches, nil } @@ -678,17 +678,16 @@ func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice regionErrChunks.appendSlice(r.regionErrSlice) } } - regionErrChunks.dedup() + regionErrChunks.sortAndDedup() return regionErrChunks, nil } func (c *twoPhaseCommitter) executeTxnFileSliceSingleBatch(bo *retry.Backoffer, batch chunkBatch, action txnFileAction) (*txnChunkSlice, error) { resp, err1 := action.executeBatch(c, bo, batch) - logutil.Logger(bo.GetCtx()).Info("txn file: execute batch finished", + logutil.Logger(bo.GetCtx()).Debug("txn file: execute batch finished", zap.Uint64("startTS", c.startTS), zap.Any("batch", batch), zap.Stringer("action", action), - zap.Any("resp", resp), zap.Error(err1)) if err1 != nil { return nil, err1 @@ -717,7 +716,7 @@ func (c *twoPhaseCommitter) executeTxnFileSliceSingleBatch(bo *retry.Backoffer, return nil, err1 } if regionErr != nil { - logutil.Logger(bo.GetCtx()).Info("txn file: execute batch failed, region error", + logutil.Logger(bo.GetCtx()).Debug("txn file: execute batch failed, region error", zap.Uint64("startTS", c.startTS), zap.Stringer("action", action), zap.Any("batch", batch), @@ -760,12 +759,11 @@ func (c *twoPhaseCommitter) executeTxnFilePrimaryBatch(bo *retry.Backoffer, firs firstBatch.isPrimary = true resp, err := action.executeBatch(c, bo, firstBatch) - logutil.Logger(bo.GetCtx()).Info("txn file: execute primary batch finished", + logutil.Logger(bo.GetCtx()).Debug("txn file: execute primary batch finished", zap.Uint64("startTS", c.startTS), zap.String("primary", kv.StrKey(c.primary())), zap.Stringer("action", action), zap.Stringer("batch", firstBatch), - zap.Any("resp", resp), zap.Error(err)) if err != nil { return nil, errors.WithStack(err) From 39325d9bfe3bf7497d315b756c3b49536ffa697c Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 2 Aug 2024 19:25:26 +0800 Subject: [PATCH 4/5] improve dedup Signed-off-by: Ping Yu --- txnkv/transaction/txn_file.go | 13 ++++------ txnkv/transaction/txn_file_test.go | 40 ++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/txnkv/transaction/txn_file.go b/txnkv/transaction/txn_file.go index 6f990a1e8..7a9fee2fb 100644 --- a/txnkv/transaction/txn_file.go +++ b/txnkv/transaction/txn_file.go @@ -154,16 +154,13 @@ func (cs *txnChunkSlice) sortAndDedup() { sort.Sort(cs) - newIDs := make([]uint64, 0, len(cs.chunkIDs)) - newRanges := make([]txnChunkRange, 0, len(cs.chunkRanges)) - newIDs = append(newIDs, cs.chunkIDs[0]) - newRanges = append(newRanges, cs.chunkRanges[0]) + newIDs := cs.chunkIDs[:1] + newRanges := cs.chunkRanges[:1] for i := 1; i < len(cs.chunkIDs); i++ { - if cs.chunkIDs[i] == cs.chunkIDs[i-1] { - continue + if cs.chunkIDs[i] != newIDs[len(newIDs)-1] { + newIDs = append(newIDs, cs.chunkIDs[i]) + newRanges = append(newRanges, cs.chunkRanges[i]) } - newIDs = append(newIDs, cs.chunkIDs[i]) - newRanges = append(newRanges, cs.chunkRanges[i]) } cs.chunkIDs = newIDs cs.chunkRanges = newRanges diff --git a/txnkv/transaction/txn_file_test.go b/txnkv/transaction/txn_file_test.go index dc1c0ab67..a69d42d25 100644 --- a/txnkv/transaction/txn_file_test.go +++ b/txnkv/transaction/txn_file_test.go @@ -15,6 +15,9 @@ package transaction import ( + "fmt" + "math/rand" + "slices" "testing" "github.com/stretchr/testify/assert" @@ -79,3 +82,40 @@ func TestTxnFileChunkBatch(t *testing.T) { assert.Equal(c.expected, strKeys) } } + +func TestChunkSliceSortAndDedup(t *testing.T) { + assert := assert.New(t) + + genRndChunkIDs := func() []uint64 { + len := rand.Intn(10) + ids := make([]uint64, 0, len) + for i := 0; i < len; i++ { + ids = append(ids, uint64(rand.Intn(len+len/2))) + } + return ids + } + + for i := 0; i < 100; i++ { + ids := genRndChunkIDs() + t.Logf("ids: %v\n", ids) + + expected := make([]uint64, len(ids)) + copy(expected, ids) + slices.Sort(expected) + expected = slices.Compact(expected) + + chunkSlice := txnChunkSlice{ + chunkIDs: make([]uint64, 0, len(ids)), + chunkRanges: make([]txnChunkRange, 0, len(ids)), + } + for _, id := range ids { + chunkSlice.chunkIDs = append(chunkSlice.chunkIDs, id) + chunkSlice.chunkRanges = append(chunkSlice.chunkRanges, txnChunkRange{smallest: []byte(fmt.Sprintf("k%04d", id)), biggest: []byte{}}) + } + chunkSlice.sortAndDedup() + assert.Equal(expected, chunkSlice.chunkIDs) + for i, id := range expected { + assert.Equal(fmt.Sprintf("k%04d", id), string(chunkSlice.chunkRanges[i].smallest)) + } + } +} From b48a4b5ed6fc165c4113df92a10fb410edd4b36c Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 2 Aug 2024 19:40:59 +0800 Subject: [PATCH 5/5] debug log Signed-off-by: Ping Yu --- txnkv/transaction/txn_file.go | 1 + 1 file changed, 1 insertion(+) diff --git a/txnkv/transaction/txn_file.go b/txnkv/transaction/txn_file.go index 7a9fee2fb..ec402f6e6 100644 --- a/txnkv/transaction/txn_file.go +++ b/txnkv/transaction/txn_file.go @@ -735,6 +735,7 @@ func (c *twoPhaseCommitter) executeTxnFileSliceWithRetry(bo *retry.Backoffer, ch if regionErrChunks.Len() == 0 { return nil } + logutil.Logger(bo.GetCtx()).Debug("txn file meet region errors", zap.Stringer("regionErrChunks", regionErrChunks)) currentChunks = regionErrChunks currentBatches = nil err = bo.Backoff(retry.BoRegionMiss, errors.Errorf("txn file: execute failed, region miss"))