From 06c5c424e1b5530c5c77c8b8d197dd20277b3bdc Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 11 Dec 2024 16:57:36 +0800 Subject: [PATCH] This is an automated cherry-pick of #8877 close tikv/pd#8876 Signed-off-by: ti-chi-bot --- .../resource_group/controller/controller.go | 9 +-- client/timerutil/pool.go | 43 ++++++++++++ client/timerutil/pool_test.go | 70 +++++++++++++++++++ client/timerutil/util.go | 32 +++++++++ client/tso_dispatcher.go | 33 +++++++++ pkg/election/lease.go | 7 ++ pkg/utils/timerutil/pool.go | 43 ++++++++++++ pkg/utils/timerutil/pool_test.go | 70 +++++++++++++++++++ pkg/utils/timerutil/util.go | 32 +++++++++ pkg/utils/tsoutil/tso_dispatcher.go | 32 +++++++++ 10 files changed, 367 insertions(+), 4 deletions(-) create mode 100644 client/timerutil/pool.go create mode 100644 client/timerutil/pool_test.go create mode 100644 client/timerutil/util.go create mode 100644 pkg/utils/timerutil/pool.go create mode 100644 pkg/utils/timerutil/pool_test.go create mode 100644 pkg/utils/timerutil/util.go diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 84629090e04..3d8c0b89d91 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/timerutil" atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) @@ -286,7 +287,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) @@ -296,7 +297,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) if err != nil { log.Warn("watch resource group config failed", zap.Error(err)) - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) } } case <-emergencyTokenAcquisitionTicker.C: @@ -330,7 +331,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { }) if !ok { watchMetaChannel = nil - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) @@ -366,7 +367,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case resp, ok := <-watchConfigChannel: if !ok { watchConfigChannel = nil - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) diff --git a/client/timerutil/pool.go b/client/timerutil/pool.go new file mode 100644 index 00000000000..2d608b09053 --- /dev/null +++ b/client/timerutil/pool.go @@ -0,0 +1,43 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerutil + +import ( + "sync" + "time" +) + +// GlobalTimerPool is a global pool for reusing *time.Timer. +var GlobalTimerPool TimerPool + +// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse. +type TimerPool struct { + pool sync.Pool +} + +// Get returns a timer with a given duration. +func (tp *TimerPool) Get(d time.Duration) *time.Timer { + if v := tp.pool.Get(); v != nil { + timer := v.(*time.Timer) + timer.Reset(d) + return timer + } + return time.NewTimer(d) +} + +// Put tries to call timer.Stop() before putting it back into pool, +// if the timer.Stop() returns false (it has either already expired or been stopped), +// have a shot at draining the channel with residual time if there is one. +func (tp *TimerPool) Put(timer *time.Timer) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + tp.pool.Put(timer) +} diff --git a/client/timerutil/pool_test.go b/client/timerutil/pool_test.go new file mode 100644 index 00000000000..f90a305d99f --- /dev/null +++ b/client/timerutil/pool_test.go @@ -0,0 +1,70 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerutil + +import ( + "testing" + "time" +) + +func TestTimerPool(t *testing.T) { + var tp TimerPool + + for i := 0; i < 100; i++ { + timer := tp.Get(20 * time.Millisecond) + + select { + case <-timer.C: + t.Errorf("timer expired too early") + continue + default: + } + + select { + case <-time.After(100 * time.Millisecond): + t.Errorf("timer didn't expire on time") + case <-timer.C: + } + + tp.Put(timer) + } +} + +const timeout = 10 * time.Millisecond + +func BenchmarkTimerUtilization(b *testing.B) { + b.Run("TimerWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) + b.Run("TimerWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} + +func BenchmarkTimerPoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) +} + +func BenchmarkTimerNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} diff --git a/client/timerutil/util.go b/client/timerutil/util.go new file mode 100644 index 00000000000..7e24671a09e --- /dev/null +++ b/client/timerutil/util.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timerutil + +import "time" + +// SafeResetTimer is used to reset timer safely. +// Before Go 1.23, the only safe way to use Reset was to call Timer.Stop and explicitly drain the timer first. +// We need be careful here, see more details in the comments of Timer.Reset. +// https://pkg.go.dev/time@master#Timer.Reset +func SafeResetTimer(t *time.Timer, d time.Duration) { + // Stop the timer if it's not stopped. + if !t.Stop() { + select { + case <-t.C: // try to drain from the channel + default: + } + } + t.Reset(d) +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 1aa9543bfa2..0834e745838 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -28,6 +28,11 @@ import ( "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/client/retry" +<<<<<<< HEAD +======= + "github.com/tikv/pd/client/timerutil" + "github.com/tikv/pd/client/tsoutil" +>>>>>>> 86d4ad0f1 (resource control: fix unsafe usage of timer.Reset (#8877)) "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -140,6 +145,22 @@ type deadline struct { cancel context.CancelFunc } +<<<<<<< HEAD +======= +func newTSDeadline( + timeout time.Duration, + done chan struct{}, + cancel context.CancelFunc, +) *deadline { + timer := timerutil.GlobalTimerPool.Get(timeout) + return &deadline{ + timer: timer, + done: done, + cancel: cancel, + } +} + +>>>>>>> 86d4ad0f1 (resource control: fix unsafe usage of timer.Reset (#8877)) func (c *tsoClient) tsCancelLoop() { defer c.wg.Done() @@ -178,9 +199,17 @@ func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { case <-d.timer: log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) d.cancel() +<<<<<<< HEAD case <-d.done: continue case <-ctx.Done(): +======= + timerutil.GlobalTimerPool.Put(d.timer) + case <-d.done: + timerutil.GlobalTimerPool.Put(d.timer) + case <-ctx.Done(): + timerutil.GlobalTimerPool.Put(d.timer) +>>>>>>> 86d4ad0f1 (resource control: fix unsafe usage of timer.Reset (#8877)) return } case <-ctx.Done(): @@ -390,7 +419,11 @@ tsoBatchLoop: if maxBatchWaitInterval >= 0 { tbc.adjustBestBatchSize() } +<<<<<<< HEAD streamLoopTimer.Reset(c.option.timeout) +======= + timerutil.SafeResetTimer(streamLoopTimer, c.option.timeout) +>>>>>>> 86d4ad0f1 (resource control: fix unsafe usage of timer.Reset (#8877)) // Choose a stream to send the TSO gRPC request. streamChoosingLoop: for { diff --git a/pkg/election/lease.go b/pkg/election/lease.go index bb1e023b5b7..1af58ab792e 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -23,6 +23,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/timerutil" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -118,8 +119,14 @@ func (l *lease) KeepAlive(ctx context.Context) { l.expireTime.Store(t) } } +<<<<<<< HEAD case <-time.After(l.leaseTimeout): log.Info("lease timeout", zap.Time("expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose)) +======= + timerutil.SafeResetTimer(timer, l.leaseTimeout) + case <-timer.C: + log.Info("keep alive lease too slow", zap.Duration("timeout-duration", l.leaseTimeout), zap.Time("actual-expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose)) +>>>>>>> 86d4ad0f1 (resource control: fix unsafe usage of timer.Reset (#8877)) return case <-ctx.Done(): return diff --git a/pkg/utils/timerutil/pool.go b/pkg/utils/timerutil/pool.go new file mode 100644 index 00000000000..2d608b09053 --- /dev/null +++ b/pkg/utils/timerutil/pool.go @@ -0,0 +1,43 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerutil + +import ( + "sync" + "time" +) + +// GlobalTimerPool is a global pool for reusing *time.Timer. +var GlobalTimerPool TimerPool + +// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse. +type TimerPool struct { + pool sync.Pool +} + +// Get returns a timer with a given duration. +func (tp *TimerPool) Get(d time.Duration) *time.Timer { + if v := tp.pool.Get(); v != nil { + timer := v.(*time.Timer) + timer.Reset(d) + return timer + } + return time.NewTimer(d) +} + +// Put tries to call timer.Stop() before putting it back into pool, +// if the timer.Stop() returns false (it has either already expired or been stopped), +// have a shot at draining the channel with residual time if there is one. +func (tp *TimerPool) Put(timer *time.Timer) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + tp.pool.Put(timer) +} diff --git a/pkg/utils/timerutil/pool_test.go b/pkg/utils/timerutil/pool_test.go new file mode 100644 index 00000000000..f90a305d99f --- /dev/null +++ b/pkg/utils/timerutil/pool_test.go @@ -0,0 +1,70 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerutil + +import ( + "testing" + "time" +) + +func TestTimerPool(t *testing.T) { + var tp TimerPool + + for i := 0; i < 100; i++ { + timer := tp.Get(20 * time.Millisecond) + + select { + case <-timer.C: + t.Errorf("timer expired too early") + continue + default: + } + + select { + case <-time.After(100 * time.Millisecond): + t.Errorf("timer didn't expire on time") + case <-timer.C: + } + + tp.Put(timer) + } +} + +const timeout = 10 * time.Millisecond + +func BenchmarkTimerUtilization(b *testing.B) { + b.Run("TimerWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) + b.Run("TimerWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} + +func BenchmarkTimerPoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) +} + +func BenchmarkTimerNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} diff --git a/pkg/utils/timerutil/util.go b/pkg/utils/timerutil/util.go new file mode 100644 index 00000000000..7e24671a09e --- /dev/null +++ b/pkg/utils/timerutil/util.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timerutil + +import "time" + +// SafeResetTimer is used to reset timer safely. +// Before Go 1.23, the only safe way to use Reset was to call Timer.Stop and explicitly drain the timer first. +// We need be careful here, see more details in the comments of Timer.Reset. +// https://pkg.go.dev/time@master#Timer.Reset +func SafeResetTimer(t *time.Timer, d time.Duration) { + // Stop the timer if it's not stopped. + if !t.Stop() { + select { + case <-t.C: // try to drain from the channel + default: + } + } + t.Reset(d) +} diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 0d18c5a3e6e..407fe9aab3d 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -24,7 +24,12 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" +<<<<<<< HEAD +======= + "github.com/tikv/pd/pkg/utils/etcdutil" +>>>>>>> 86d4ad0f1 (resource control: fix unsafe usage of timer.Reset (#8877)) "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/timerutil" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -210,7 +215,26 @@ type deadline struct { cancel context.CancelFunc } +<<<<<<< HEAD func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) { +======= +// NewTSDeadline creates a new TSDeadline. +func NewTSDeadline( + timeout time.Duration, + done chan struct{}, + cancel context.CancelFunc, +) *TSDeadline { + timer := timerutil.GlobalTimerPool.Get(timeout) + return &TSDeadline{ + timer: timer, + done: done, + cancel: cancel, + } +} + +// WatchTSDeadline watches the deadline of each tso request. +func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) { +>>>>>>> 86d4ad0f1 (resource control: fix unsafe usage of timer.Reset (#8877)) defer logutil.LogPanic() ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -222,9 +246,17 @@ func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) { log.Error("tso proxy request processing is canceled due to timeout", errs.ZapError(errs.ErrProxyTSOTimeout)) d.cancel() +<<<<<<< HEAD case <-d.done: continue case <-ctx.Done(): +======= + timerutil.GlobalTimerPool.Put(d.timer) + case <-d.done: + timerutil.GlobalTimerPool.Put(d.timer) + case <-ctx.Done(): + timerutil.GlobalTimerPool.Put(d.timer) +>>>>>>> 86d4ad0f1 (resource control: fix unsafe usage of timer.Reset (#8877)) return } case <-ctx.Done():