diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index f1394fa9f..4aa2cceb7 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -744,7 +744,7 @@ func (c *twoPhaseCommitter) primary() []byte { if c.mutations != nil { return c.mutations.GetKey(0) } - if c.txnFileCtx.slice.len() > 0 { + if c.txnFileCtx.slice.Len() > 0 { return c.txnFileCtx.slice.chunkRanges[0].smallest } return nil @@ -1228,7 +1228,7 @@ func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, l logutil.Logger(bo.GetCtx()).Info("send TxnHeartBeat", zap.Uint64("startTS", c.startTS), zap.Uint64("newTTL", newTTL)) startTime := time.Now() - _, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, primaryKey, c.startTS, newTTL, c.txnFileCtx.slice.len() > 0) + _, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, primaryKey, c.startTS, newTTL, c.txnFileCtx.slice.Len() > 0) if err != nil { keepFail++ metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds()) diff --git a/txnkv/transaction/txn_file.go b/txnkv/transaction/txn_file.go index ace2943ca..ec402f6e6 100644 --- a/txnkv/transaction/txn_file.go +++ b/txnkv/transaction/txn_file.go @@ -81,12 +81,12 @@ func (b chunkBatch) String() string { // chunkBatch.txnChunkSlice should be sorted by smallest. func (b *chunkBatch) getSampleKeys() [][]byte { keys := make([][]byte, 0) - start := sort.Search(b.txnChunkSlice.len(), func(i int) bool { + start := sort.Search(b.txnChunkSlice.Len(), func(i int) bool { return bytes.Compare(b.region.StartKey, b.txnChunkSlice.chunkRanges[i].smallest) <= 0 }) - end := b.txnChunkSlice.len() + end := b.txnChunkSlice.Len() if len(b.region.EndKey) > 0 { - end = sort.Search(b.txnChunkSlice.len(), func(i int) bool { + end = sort.Search(b.txnChunkSlice.Len(), func(i int) bool { return bytes.Compare(b.txnChunkSlice.chunkRanges[i].smallest, b.region.EndKey) >= 0 }) } @@ -125,19 +125,8 @@ func (s *txnChunkSlice) Biggest() []byte { } func (cs *txnChunkSlice) appendSlice(other *txnChunkSlice) { - if cs.len() == 0 { - cs.chunkIDs = append(cs.chunkIDs, other.chunkIDs...) - cs.chunkRanges = append(cs.chunkRanges, other.chunkRanges...) - return - } - lastChunkRange := cs.chunkRanges[len(cs.chunkIDs)-1] - for i, chunkRange := range other.chunkRanges { - if bytes.Compare(lastChunkRange.smallest, chunkRange.smallest) < 0 { - cs.chunkIDs = append(cs.chunkIDs, other.chunkIDs[i:]...) - cs.chunkRanges = append(cs.chunkRanges, other.chunkRanges[i:]...) - break - } - } + cs.chunkIDs = append(cs.chunkIDs, other.chunkIDs...) + cs.chunkRanges = append(cs.chunkRanges, other.chunkRanges...) } func (cs *txnChunkSlice) append(chunkID uint64, chunkRange txnChunkRange) { @@ -145,10 +134,38 @@ func (cs *txnChunkSlice) append(chunkID uint64, chunkRange txnChunkRange) { cs.chunkRanges = append(cs.chunkRanges, chunkRange) } -func (cs *txnChunkSlice) len() int { +func (cs *txnChunkSlice) Len() int { return len(cs.chunkIDs) } +func (cs *txnChunkSlice) Swap(i, j int) { + cs.chunkIDs[i], cs.chunkIDs[j] = cs.chunkIDs[j], cs.chunkIDs[i] + cs.chunkRanges[i], cs.chunkRanges[j] = cs.chunkRanges[j], cs.chunkRanges[i] +} + +func (cs *txnChunkSlice) Less(i, j int) bool { + return bytes.Compare(cs.chunkRanges[i].smallest, cs.chunkRanges[j].smallest) < 0 +} + +func (cs *txnChunkSlice) sortAndDedup() { + if len(cs.chunkIDs) <= 1 { + return + } + + sort.Sort(cs) + + newIDs := cs.chunkIDs[:1] + newRanges := cs.chunkRanges[:1] + for i := 1; i < len(cs.chunkIDs); i++ { + if cs.chunkIDs[i] != newIDs[len(newIDs)-1] { + newIDs = append(newIDs, cs.chunkIDs[i]) + newRanges = append(newRanges, cs.chunkRanges[i]) + } + } + cs.chunkIDs = newIDs + cs.chunkRanges = newRanges +} + // []chunkBatch is sorted by region.StartKey. // Note: regions may be overlapping. func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoffer, mutations CommitterMutations) ([]chunkBatch, error) { @@ -527,7 +544,7 @@ func (c *twoPhaseCommitter) executeTxnFile(ctx context.Context) (err error) { undetermined := c.mu.undeterminedErr != nil c.mu.RUnlock() if !committed && !undetermined { - if c.txnFileCtx.slice.len() > 0 { + if c.txnFileCtx.slice.Len() > 0 { err1 := c.executeTxnFileAction(retry.NewBackofferWithVars(ctx, int(CommitMaxBackoff), c.txn.vars), c.txnFileCtx.slice, txnFileRollbackAction{}) if err1 != nil { logutil.Logger(ctx).Error("txn file: rollback on error failed", zap.Error(err1)) @@ -586,6 +603,9 @@ func (c *twoPhaseCommitter) executeTxnFile(ctx context.Context) (err error) { } err = c.executeTxnFileAction(commitBo, c.txnFileCtx.slice, txnFileCommitAction{commitTS: c.commitTS}) stepDone("commit") + if err != nil { + return + } err = c.afterExecuteTxnFile(rcInterceptor, reqInfo, ruDetails) return @@ -655,6 +675,7 @@ func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice regionErrChunks.appendSlice(r.regionErrSlice) } } + regionErrChunks.sortAndDedup() return regionErrChunks, nil } @@ -692,6 +713,11 @@ func (c *twoPhaseCommitter) executeTxnFileSliceSingleBatch(bo *retry.Backoffer, return nil, err1 } if regionErr != nil { + logutil.Logger(bo.GetCtx()).Debug("txn file: execute batch failed, region error", + zap.Uint64("startTS", c.startTS), + zap.Stringer("action", action), + zap.Any("batch", batch), + zap.Stringer("regionErr", regionErr)) return &batch.txnChunkSlice, nil } return nil, nil @@ -706,9 +732,10 @@ func (c *twoPhaseCommitter) executeTxnFileSliceWithRetry(bo *retry.Backoffer, ch if err != nil { return errors.WithStack(err) } - if regionErrChunks.len() == 0 { + if regionErrChunks.Len() == 0 { return nil } + logutil.Logger(bo.GetCtx()).Debug("txn file meet region errors", zap.Stringer("regionErrChunks", regionErrChunks)) currentChunks = regionErrChunks currentBatches = nil err = bo.Backoff(retry.BoRegionMiss, errors.Errorf("txn file: execute failed, region miss")) @@ -916,8 +943,8 @@ func (c *twoPhaseCommitter) preSplitTxnFileRegions(bo *retry.Backoffer) error { } var splitKeys [][]byte for _, batch := range batches { - if batch.len() > PreSplitRegionChunks { - for i := PreSplitRegionChunks; i < batch.len(); i += PreSplitRegionChunks { + if batch.Len() > PreSplitRegionChunks { + for i := PreSplitRegionChunks; i < batch.Len(); i += PreSplitRegionChunks { splitKeys = append(splitKeys, batch.chunkRanges[i].smallest) } } diff --git a/txnkv/transaction/txn_file_test.go b/txnkv/transaction/txn_file_test.go index dc1c0ab67..a69d42d25 100644 --- a/txnkv/transaction/txn_file_test.go +++ b/txnkv/transaction/txn_file_test.go @@ -15,6 +15,9 @@ package transaction import ( + "fmt" + "math/rand" + "slices" "testing" "github.com/stretchr/testify/assert" @@ -79,3 +82,40 @@ func TestTxnFileChunkBatch(t *testing.T) { assert.Equal(c.expected, strKeys) } } + +func TestChunkSliceSortAndDedup(t *testing.T) { + assert := assert.New(t) + + genRndChunkIDs := func() []uint64 { + len := rand.Intn(10) + ids := make([]uint64, 0, len) + for i := 0; i < len; i++ { + ids = append(ids, uint64(rand.Intn(len+len/2))) + } + return ids + } + + for i := 0; i < 100; i++ { + ids := genRndChunkIDs() + t.Logf("ids: %v\n", ids) + + expected := make([]uint64, len(ids)) + copy(expected, ids) + slices.Sort(expected) + expected = slices.Compact(expected) + + chunkSlice := txnChunkSlice{ + chunkIDs: make([]uint64, 0, len(ids)), + chunkRanges: make([]txnChunkRange, 0, len(ids)), + } + for _, id := range ids { + chunkSlice.chunkIDs = append(chunkSlice.chunkIDs, id) + chunkSlice.chunkRanges = append(chunkSlice.chunkRanges, txnChunkRange{smallest: []byte(fmt.Sprintf("k%04d", id)), biggest: []byte{}}) + } + chunkSlice.sortAndDedup() + assert.Equal(expected, chunkSlice.chunkIDs) + for i, id := range expected { + assert.Equal(fmt.Sprintf("k%04d", id), string(chunkSlice.chunkRanges[i].smallest)) + } + } +}