diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index 130d4908578c2..a1f202ceb6863 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -407,12 +407,21 @@ func (w *GCWorker) calcSafePointByMinStartTS(ctx context.Context, safePoint uint return safePoint } - if globalMinStartTS < safePoint { + // If the lock.ts <= max_ts(safePoint), it will be collected and resolved by the gc worker, + // the locks of ongoing pessimistic transactions could be resolved by the gc worker and then + // the transaction is aborted, decrement the value by 1 to avoid this. + globalMinStartAllowedTS := globalMinStartTS + if globalMinStartTS > 0 { + globalMinStartAllowedTS = globalMinStartTS - 1 + } + + if globalMinStartAllowedTS < safePoint { logutil.Logger(ctx).Info("[gc worker] gc safepoint blocked by a running session", zap.String("uuid", w.uuid), zap.Uint64("globalMinStartTS", globalMinStartTS), + zap.Uint64("globalMinStartAllowedTS", globalMinStartAllowedTS), zap.Uint64("safePoint", safePoint)) - safePoint = globalMinStartTS + safePoint = globalMinStartAllowedTS } return safePoint } diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_test.go index 8273801102a29..68dd66a6cd893 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_test.go @@ -261,7 +261,7 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) { strconv.FormatUint(now-oracle.EncodeTSO(20000), 10)) c.Assert(err, IsNil) sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now-oracle.EncodeTSO(10000)) - c.Assert(sp, Equals, now-oracle.EncodeTSO(20000)) + c.Assert(sp, Equals, now-oracle.EncodeTSO(20000)-1) } func (s *testGCWorkerSuite) TestPrepareGC(c *C) { @@ -1588,3 +1588,52 @@ func (s *testGCWorkerSuite) TestGCPlacementRules(c *C) { c.Assert(pid, Equals, int64(1)) c.Assert(err, IsNil) } + +func (s *testGCWorkerSuite) TestGCWithPendingTxn(c *C) { + ctx := context.Background() + gcSafePointCacheInterval = 0 + err := s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse) + c.Assert(err, IsNil) + + k1 := []byte("tk1") + v1 := []byte("v1") + txn, err := s.store.Begin() + c.Assert(err, IsNil) + txn.SetOption(kv.Pessimistic, true) + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + + // Lock the key. + err = txn.Set(k1, v1) + c.Assert(err, IsNil) + err = txn.LockKeys(ctx, lockCtx, k1) + c.Assert(err, IsNil) + + // Prepare to run gc with txn's startTS as the safepoint ts. + spkv := s.tikvStore.GetSafePointKV() + err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(txn.StartTS(), 10)) + c.Assert(err, IsNil) + s.mustSetTiDBServiceSafePoint(c, txn.StartTS(), txn.StartTS()) + veryLong := gcDefaultLifeTime * 100 + err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong)) + c.Assert(err, IsNil) + s.gcWorker.lastFinish = time.Now().Add(-veryLong) + s.oracle.AddOffset(time.Minute * 10) + err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue) + c.Assert(err, IsNil) + + // Trigger the tick let the gc job start. + err = s.gcWorker.leaderTick(ctx) + c.Assert(err, IsNil) + // Wait for GC finish + select { + case err = <-s.gcWorker.done: + s.gcWorker.gcIsRunning = false + break + case <-time.After(time.Second * 10): + err = errors.New("receive from s.gcWorker.done timeout") + } + c.Assert(err, IsNil) + + err = txn.Commit(ctx) + c.Assert(err, IsNil) +}