From 83e9e4715f2377ed67f54245b6e35c793d0d63f6 Mon Sep 17 00:00:00 2001 From: weedge Date: Thu, 8 Jun 2023 14:59:57 +0800 Subject: [PATCH 1/3] add gc options Signed-off-by: weedge --- examples/gcworker/gcworker.go | 8 +++++++- tikv/gc.go | 25 +++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/examples/gcworker/gcworker.go b/examples/gcworker/gcworker.go index 39da00df0..1191adcd2 100644 --- a/examples/gcworker/gcworker.go +++ b/examples/gcworker/gcworker.go @@ -21,6 +21,7 @@ import ( "time" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv" ) @@ -36,10 +37,15 @@ func main() { panic(err) } - sysSafepoint, err := client.GC(context.Background(), *safepoint) + + sysSafepoint, err := client.GC(context.Background(), *safepoint, tikv.WithConcurrency(10)) if err != nil { panic(err) } fmt.Printf("Finished GC, expect safepoint:%d(%+v),real safepoint:%d(+%v)\n", *safepoint, oracle.GetTimeFromTS(*safepoint), sysSafepoint, oracle.GetTimeFromTS(sysSafepoint)) + err = client.Close() + if err != nil { + panic(err) + } } diff --git a/tikv/gc.go b/tikv/gc.go index d448815ed..2b47e6bca 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -50,8 +50,15 @@ import ( // // GC is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). // We skip the second step "delete ranges" which is an optimization for TiDB. -func (s *KVStore) GC(ctx context.Context, safepoint uint64) (newSafePoint uint64, err error) { - err = s.resolveLocks(ctx, safepoint, 8) +func (s *KVStore) GC(ctx context.Context, safepoint uint64, opts ...GCOpt) (newSafePoint uint64, err error) { + // default concurrency 8 + opt := &gcOption{concurrency: 8} + // Apply gc options. + for _, o := range opts { + o(opt) + } + + err = s.resolveLocks(ctx, safepoint, opt.concurrency) if err != nil { return } @@ -59,6 +66,20 @@ func (s *KVStore) GC(ctx context.Context, safepoint uint64) (newSafePoint uint64 return s.pdClient.UpdateGCSafePoint(ctx, safepoint) } +type gcOption struct { + concurrency int +} + +// GCOpt gc options +type GCOpt func(*gcOption) + +// WithConcurrency is used to set gc RangeTaskRunner concurrency. +func WithConcurrency(concurrency int) GCOpt { + return func(opt *gcOption) { + opt.concurrency = concurrency + } +} + func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error { handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) { return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey) From 75849a82146c554a90fcd4d01605ea9528a1f5b7 Mon Sep 17 00:00:00 2001 From: weedge Date: Tue, 20 Jun 2023 10:14:18 +0800 Subject: [PATCH 2/3] fix txn get Signed-off-by: weedge --- txnkv/transaction/txn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index e5067ce8a..77a9140bb 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -206,7 +206,7 @@ func (txn *KVTxn) GetVars() *tikv.Variables { func (txn *KVTxn) Get(ctx context.Context, k []byte) ([]byte, error) { ret, err := txn.us.Get(ctx, k) if tikverr.IsErrNotFound(err) { - return nil, err + return nil, nil } if err != nil { return nil, err From 85c65998e04e2ff54bcc17aef1bc20b1f4ed45ba Mon Sep 17 00:00:00 2001 From: weedge Date: Tue, 20 Jun 2023 12:22:33 +0800 Subject: [PATCH 3/3] change back get not found key Signed-off-by: weedge --- txnkv/transaction/txn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 77a9140bb..e5067ce8a 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -206,7 +206,7 @@ func (txn *KVTxn) GetVars() *tikv.Variables { func (txn *KVTxn) Get(ctx context.Context, k []byte) ([]byte, error) { ret, err := txn.us.Get(ctx, k) if tikverr.IsErrNotFound(err) { - return nil, nil + return nil, err } if err != nil { return nil, err