diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 9517ac72895..a98de2c51ef 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -188,6 +188,8 @@ func (c *pdServiceDiscovery) Init() error { func (c *pdServiceDiscovery) initRetry(f func() error) error { var err error + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for i := 0; i < c.option.maxRetryTimes; i++ { if err = f(); err == nil { return nil @@ -195,7 +197,7 @@ func (c *pdServiceDiscovery) initRetry(f func() error) error { select { case <-c.ctx.Done(): return err - case <-time.After(time.Second): + case <-ticker.C: } } return errors.WithStack(err) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 84629090e04..04f2148c704 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/timerutil" atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) @@ -286,9 +287,9 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { - watchRetryTimer.Reset(20 * time.Millisecond) + timerutil.SafeResetTimer(watchRetryTimer, 20*time.Millisecond) }) } } @@ -296,7 +297,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) if err != nil { log.Warn("watch resource group config failed", zap.Error(err)) - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) } } case <-emergencyTokenAcquisitionTicker.C: @@ -330,9 +331,9 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { }) if !ok { watchMetaChannel = nil - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { - watchRetryTimer.Reset(20 * time.Millisecond) + timerutil.SafeResetTimer(watchRetryTimer, 20*time.Millisecond) }) continue } @@ -366,9 +367,9 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case resp, ok := <-watchConfigChannel: if !ok { watchConfigChannel = nil - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { - watchRetryTimer.Reset(20 * time.Millisecond) + timerutil.SafeResetTimer(watchRetryTimer, 20*time.Millisecond) }) continue } @@ -521,7 +522,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, ClientUniqueId: c.clientUniqueID, } if c.ruConfig.DegradedModeWaitDuration > 0 && c.responseDeadlineCh == nil { - c.run.responseDeadline.Reset(c.ruConfig.DegradedModeWaitDuration) + timerutil.SafeResetTimer(c.run.responseDeadline, c.ruConfig.DegradedModeWaitDuration) c.responseDeadlineCh = c.run.responseDeadline.C } go func() { diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index fbf474cabc3..54880e80f0f 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -348,6 +348,8 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso err error stream rmpb.ResourceManager_AcquireTokenBucketsClient ) + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { cc, err := c.resourceManagerClient() if err != nil { @@ -365,7 +367,7 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso select { case <-ctx.Done(): return err - case <-time.After(retryInterval): + case <-ticker.C: } } return err diff --git a/client/retry/backoff.go b/client/retry/backoff.go index e2ca9ab3972..39751734ff1 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -34,9 +34,11 @@ func (bo *BackOffer) Exec( fn func() error, ) error { if err := fn(); err != nil { + timer := time.NewTimer(bo.nextInterval()) + defer timer.Stop() select { case <-ctx.Done(): - case <-time.After(bo.nextInterval()): + case <-timer.C: failpoint.Inject("backOffExecute", func() { testBackOffExecuteFlag = true }) diff --git a/client/timerutil/pool.go b/client/timerutil/pool.go new file mode 100644 index 00000000000..2d608b09053 --- /dev/null +++ b/client/timerutil/pool.go @@ -0,0 +1,43 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerutil + +import ( + "sync" + "time" +) + +// GlobalTimerPool is a global pool for reusing *time.Timer. +var GlobalTimerPool TimerPool + +// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse. +type TimerPool struct { + pool sync.Pool +} + +// Get returns a timer with a given duration. +func (tp *TimerPool) Get(d time.Duration) *time.Timer { + if v := tp.pool.Get(); v != nil { + timer := v.(*time.Timer) + timer.Reset(d) + return timer + } + return time.NewTimer(d) +} + +// Put tries to call timer.Stop() before putting it back into pool, +// if the timer.Stop() returns false (it has either already expired or been stopped), +// have a shot at draining the channel with residual time if there is one. +func (tp *TimerPool) Put(timer *time.Timer) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + tp.pool.Put(timer) +} diff --git a/client/timerutil/pool_test.go b/client/timerutil/pool_test.go new file mode 100644 index 00000000000..f90a305d99f --- /dev/null +++ b/client/timerutil/pool_test.go @@ -0,0 +1,70 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerutil + +import ( + "testing" + "time" +) + +func TestTimerPool(t *testing.T) { + var tp TimerPool + + for i := 0; i < 100; i++ { + timer := tp.Get(20 * time.Millisecond) + + select { + case <-timer.C: + t.Errorf("timer expired too early") + continue + default: + } + + select { + case <-time.After(100 * time.Millisecond): + t.Errorf("timer didn't expire on time") + case <-timer.C: + } + + tp.Put(timer) + } +} + +const timeout = 10 * time.Millisecond + +func BenchmarkTimerUtilization(b *testing.B) { + b.Run("TimerWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) + b.Run("TimerWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} + +func BenchmarkTimerPoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) +} + +func BenchmarkTimerNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} diff --git a/client/timerutil/util.go b/client/timerutil/util.go new file mode 100644 index 00000000000..7e24671a09e --- /dev/null +++ b/client/timerutil/util.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timerutil + +import "time" + +// SafeResetTimer is used to reset timer safely. +// Before Go 1.23, the only safe way to use Reset was to call Timer.Stop and explicitly drain the timer first. +// We need be careful here, see more details in the comments of Timer.Reset. +// https://pkg.go.dev/time@master#Timer.Reset +func SafeResetTimer(t *time.Timer, d time.Duration) { + // Stop the timer if it's not stopped. + if !t.Stop() { + select { + case <-t.C: // try to drain from the channel + default: + } + } + t.Reset(d) +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 1aa9543bfa2..97fb1b70887 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/client/retry" + "github.com/tikv/pd/client/timerutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -135,11 +136,24 @@ func (c *tsoClient) updateTSODispatcher() { } type deadline struct { - timer <-chan time.Time + timer *time.Timer done chan struct{} cancel context.CancelFunc } +func newTSDeadline( + timeout time.Duration, + done chan struct{}, + cancel context.CancelFunc, +) *deadline { + timer := timerutil.GlobalTimerPool.Get(timeout) + return &deadline{ + timer: timer, + done: done, + cancel: cancel, + } +} + func (c *tsoClient) tsCancelLoop() { defer c.wg.Done() @@ -168,19 +182,21 @@ func (c *tsoClient) tsCancelLoop() { func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { if _, exist := c.tsDeadline.Load(dcLocation); !exist { - tsDeadlineCh := make(chan deadline, 1) + tsDeadlineCh := make(chan *deadline, 1) c.tsDeadline.Store(dcLocation, tsDeadlineCh) - go func(dc string, tsDeadlineCh <-chan deadline) { + go func(dc string, tsDeadlineCh <-chan *deadline) { for { select { case d := <-tsDeadlineCh: select { - case <-d.timer: + case <-d.timer.C: log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) d.cancel() + timerutil.GlobalTimerPool.Put(d.timer) case <-d.done: - continue + timerutil.GlobalTimerPool.Put(d.timer) case <-ctx.Done(): + timerutil.GlobalTimerPool.Put(d.timer) return } case <-ctx.Done(): @@ -230,6 +246,8 @@ func (c *tsoClient) checkAllocator( }() cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc) var healthCli healthpb.HealthClient + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for { // the pd/allocator leader change, we need to re-establish the stream if u != url { @@ -260,7 +278,7 @@ func (c *tsoClient) checkAllocator( select { case <-dispatcherCtx.Done(): return - case <-time.After(time.Second): + case <-ticker.C: // To ensure we can get the latest allocator leader // and once the leader is changed, we can exit this function. cc, u = c.GetTSOAllocatorClientConnByDCLocation(dc) @@ -367,6 +385,7 @@ func (c *tsoClient) handleDispatcher( // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(c.option.timeout) + defer streamLoopTimer.Stop() bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout) tsoBatchLoop: for { @@ -390,7 +409,7 @@ tsoBatchLoop: if maxBatchWaitInterval >= 0 { tbc.adjustBestBatchSize() } - streamLoopTimer.Reset(c.option.timeout) + timerutil.SafeResetTimer(streamLoopTimer, c.option.timeout) // Choose a stream to send the TSO gRPC request. streamChoosingLoop: for { @@ -404,16 +423,20 @@ tsoBatchLoop: if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { continue streamChoosingLoop } + timer := time.NewTimer(retryInterval) select { case <-dispatcherCtx.Done(): + timer.Stop() return case <-streamLoopTimer.C: err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) c.svcDiscovery.ScheduleCheckMemberChanged() c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) + timer.Stop() continue tsoBatchLoop - case <-time.After(retryInterval): + case <-timer.C: + timer.Stop() continue streamChoosingLoop } } @@ -430,11 +453,7 @@ tsoBatchLoop: } } done := make(chan struct{}) - dl := deadline{ - timer: time.After(c.option.timeout), - done: done, - cancel: cancel, - } + dl := newTSDeadline(c.option.timeout, done, cancel) tsDeadlineCh, ok := c.tsDeadline.Load(dc) for !ok || tsDeadlineCh == nil { c.scheduleCheckTSDeadline() @@ -444,7 +463,7 @@ tsoBatchLoop: select { case <-dispatcherCtx.Done(): return - case tsDeadlineCh.(chan deadline) <- dl: + case tsDeadlineCh.(chan *deadline) <- dl: } opts = extractSpanReference(tbc, opts[:0]) err = c.processRequests(stream, dc, tbc, opts) @@ -556,6 +575,8 @@ func (c *tsoClient) tryConnectToTSO( } // retry several times before falling back to the follower when the network problem happens + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) @@ -588,7 +609,7 @@ func (c *tsoClient) tryConnectToTSO( select { case <-dispatcherCtx.Done(): return err - case <-time.After(retryInterval): + case <-ticker.C: } } diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index c9b021e5242..85657316f98 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -115,6 +115,8 @@ func (c *tsoServiceDiscovery) Init() error { func (c *tsoServiceDiscovery) initRetry(f func() error) error { var err error + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < c.option.maxRetryTimes; i++ { if err = f(); err == nil { return nil @@ -122,7 +124,7 @@ func (c *tsoServiceDiscovery) initRetry(f func() error) error { select { case <-c.ctx.Done(): return err - case <-time.After(time.Second): + case <-ticker.C: } } return errors.WithStack(err) @@ -151,11 +153,13 @@ func (c *tsoServiceDiscovery) startCheckMemberLoop() { ctx, cancel := context.WithCancel(c.ctx) defer cancel() + ticker := time.NewTicker(memberUpdateInterval) + defer ticker.Stop() for { select { case <-c.checkMembershipCh: - case <-time.After(memberUpdateInterval): + case <-ticker.C: case <-ctx.Done(): log.Info("[tso] exit check member loop") return diff --git a/client/tso_stream.go b/client/tso_stream.go index baa764dffb2..ede57fd2c81 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -83,10 +83,12 @@ func (b *tsoTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFu } func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done chan struct{}, timeout time.Duration) { + timer := time.NewTimer(timeout) + defer timer.Stop() select { case <-done: return - case <-time.After(timeout): + case <-timer.C: cancel() case <-ctx.Done(): } diff --git a/pkg/election/lease.go b/pkg/election/lease.go index bb1e023b5b7..00d4ed56303 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -23,6 +23,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/timerutil" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -105,6 +106,8 @@ func (l *lease) KeepAlive(ctx context.Context) { timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3) var maxExpire time.Time + timer := time.NewTimer(l.leaseTimeout) + defer timer.Stop() for { select { case t := <-timeCh: @@ -118,7 +121,8 @@ func (l *lease) KeepAlive(ctx context.Context) { l.expireTime.Store(t) } } - case <-time.After(l.leaseTimeout): + timerutil.SafeResetTimer(timer, l.leaseTimeout) + case <-timer.C: log.Info("lease timeout", zap.Time("expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose)) return case <-ctx.Done(): diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index f18b80ed305..4d8f60b6723 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -153,11 +153,13 @@ func (m *GroupManager) startWatchLoop() { revision int64 err error ) + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { select { case <-ctx.Done(): return - case <-time.After(retryInterval): + case <-ticker.C: } resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey)) if err == nil { diff --git a/pkg/mcs/discovery/register.go b/pkg/mcs/discovery/register.go index 617c1520b8d..49a186138c7 100644 --- a/pkg/mcs/discovery/register.go +++ b/pkg/mcs/discovery/register.go @@ -86,6 +86,7 @@ func (sr *ServiceRegister) Register() error { select { case <-sr.ctx.Done(): log.Info("exit register process", zap.String("key", sr.key)) + t.Stop() return default: } @@ -94,11 +95,13 @@ func (sr *ServiceRegister) Register() error { resp, err := sr.cli.Grant(sr.ctx, sr.ttl) if err != nil { log.Error("grant lease failed", zap.String("key", sr.key), zap.Error(err)) + t.Stop() continue } if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil { log.Error("put the key failed", zap.String("key", sr.key), zap.Error(err)) + t.Stop() continue } } diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index 17e86a3c02b..e3fe64f8a33 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -286,9 +286,11 @@ func (s *Server) startGRPCServer(l net.Listener) { gs.GracefulStop() close(done) }() + timer := time.NewTimer(utils.DefaultGRPCGracefulStopTimeout) + defer timer.Stop() select { case <-done: - case <-time.After(utils.DefaultGRPCGracefulStopTimeout): + case <-timer.C: log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout)) gs.Stop() } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 2a3172d6592..8a82248651e 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -362,9 +362,11 @@ func (s *Server) startGRPCServer(l net.Listener) { gs.GracefulStop() close(done) }() + timer := time.NewTimer(mcsutils.DefaultGRPCGracefulStopTimeout) + defer timer.Stop() select { case <-done: - case <-time.After(mcsutils.DefaultGRPCGracefulStopTimeout): + case <-timer.C: log.Info("stopping grpc gracefully is taking longer than expected and force stopping now") gs.Stop() } diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index 68a0efc31fa..0c9d7610749 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -36,6 +36,8 @@ const ( // InitClusterID initializes the cluster ID. func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err error) { + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 { return clusterID, nil @@ -43,7 +45,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err select { case <-ctx.Done(): return 0, err - case <-time.After(retryInterval): + case <-ticker.C: } } return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 8aba66c5412..2a04424d0da 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -284,10 +284,12 @@ func (kgm *KeyspaceGroupManager) Close() { func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel context.CancelFunc, done chan struct{}) { defer logutil.LogPanic() + timer := time.NewTimer(kgm.loadKeyspaceGroupsTimeout) + defer timer.Stop() select { case <-done: return - case <-time.After(kgm.loadKeyspaceGroupsTimeout): + case <-timer.C: log.Error("failed to initialize keyspace group manager", zap.Any("timeout-setting", kgm.loadKeyspaceGroupsTimeout), errs.ZapError(errs.ErrLoadKeyspaceGroupsTimeout)) @@ -374,6 +376,8 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups( i int resp *clientv3.GetResponse ) + ticker := time.NewTicker(defaultLoadFromEtcdRetryInterval) + defer ticker.Stop() for ; i < kgm.loadFromEtcdMaxRetryTimes; i++ { resp, err = etcdutil.EtcdKVGet(kgm.etcdClient, startKey, opOption...) @@ -397,7 +401,7 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups( select { case <-ctx.Done(): return 0, []*endpoint.KeyspaceGroup{}, false, errs.ErrLoadKeyspaceGroupsTerminated - case <-time.After(defaultLoadFromEtcdRetryInterval): + case <-ticker.C: } } diff --git a/pkg/utils/timerutil/pool.go b/pkg/utils/timerutil/pool.go new file mode 100644 index 00000000000..2d608b09053 --- /dev/null +++ b/pkg/utils/timerutil/pool.go @@ -0,0 +1,43 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerutil + +import ( + "sync" + "time" +) + +// GlobalTimerPool is a global pool for reusing *time.Timer. +var GlobalTimerPool TimerPool + +// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse. +type TimerPool struct { + pool sync.Pool +} + +// Get returns a timer with a given duration. +func (tp *TimerPool) Get(d time.Duration) *time.Timer { + if v := tp.pool.Get(); v != nil { + timer := v.(*time.Timer) + timer.Reset(d) + return timer + } + return time.NewTimer(d) +} + +// Put tries to call timer.Stop() before putting it back into pool, +// if the timer.Stop() returns false (it has either already expired or been stopped), +// have a shot at draining the channel with residual time if there is one. +func (tp *TimerPool) Put(timer *time.Timer) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + tp.pool.Put(timer) +} diff --git a/pkg/utils/timerutil/pool_test.go b/pkg/utils/timerutil/pool_test.go new file mode 100644 index 00000000000..f90a305d99f --- /dev/null +++ b/pkg/utils/timerutil/pool_test.go @@ -0,0 +1,70 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 + +package timerutil + +import ( + "testing" + "time" +) + +func TestTimerPool(t *testing.T) { + var tp TimerPool + + for i := 0; i < 100; i++ { + timer := tp.Get(20 * time.Millisecond) + + select { + case <-timer.C: + t.Errorf("timer expired too early") + continue + default: + } + + select { + case <-time.After(100 * time.Millisecond): + t.Errorf("timer didn't expire on time") + case <-timer.C: + } + + tp.Put(timer) + } +} + +const timeout = 10 * time.Millisecond + +func BenchmarkTimerUtilization(b *testing.B) { + b.Run("TimerWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) + b.Run("TimerWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} + +func BenchmarkTimerPoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := GlobalTimerPool.Get(timeout) + GlobalTimerPool.Put(t) + } + }) +} + +func BenchmarkTimerNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + t := time.NewTimer(timeout) + t.Stop() + } + }) +} diff --git a/pkg/utils/timerutil/util.go b/pkg/utils/timerutil/util.go new file mode 100644 index 00000000000..7e24671a09e --- /dev/null +++ b/pkg/utils/timerutil/util.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timerutil + +import "time" + +// SafeResetTimer is used to reset timer safely. +// Before Go 1.23, the only safe way to use Reset was to call Timer.Stop and explicitly drain the timer first. +// We need be careful here, see more details in the comments of Timer.Reset. +// https://pkg.go.dev/time@master#Timer.Reset +func SafeResetTimer(t *time.Timer, d time.Duration) { + // Stop the timer if it's not stopped. + if !t.Stop() { + select { + case <-t.C: // try to drain from the channel + default: + } + } + t.Reset(d) +} diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 0d18c5a3e6e..6b51bcc02c5 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/timerutil" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -68,7 +69,7 @@ func (s *TSODispatcher) DispatchRequest( val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests)) reqCh := val.(chan Request) if !loaded { - tsDeadlineCh := make(chan deadline, 1) + tsDeadlineCh := make(chan *deadline, 1) go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh, updateServicePrimaryAddrChs...) go watchTSDeadline(ctx, tsDeadlineCh) } @@ -81,7 +82,7 @@ func (s *TSODispatcher) dispatch( forwardedHost string, clientConn *grpc.ClientConn, tsoRequestCh <-chan Request, - tsDeadlineCh chan<- deadline, + tsDeadlineCh chan<- *deadline, doneCh <-chan struct{}, errCh chan<- error, updateServicePrimaryAddrChs ...chan<- struct{}) { @@ -121,11 +122,7 @@ func (s *TSODispatcher) dispatch( requests[i] = <-tsoRequestCh } done := make(chan struct{}) - dl := deadline{ - timer: time.After(DefaultTSOProxyTimeout), - done: done, - cancel: cancel, - } + dl := newTSDeadline(DefaultTSOProxyTimeout, done, cancel) select { case tsDeadlineCh <- dl: case <-dispatcherCtx.Done(): @@ -205,12 +202,25 @@ func (s *TSODispatcher) finishRequest(requests []Request, physical, firstLogical } type deadline struct { - timer <-chan time.Time + timer *time.Timer done chan struct{} cancel context.CancelFunc } -func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) { +func newTSDeadline( + timeout time.Duration, + done chan struct{}, + cancel context.CancelFunc, +) *deadline { + timer := timerutil.GlobalTimerPool.Get(timeout) + return &deadline{ + timer: timer, + done: done, + cancel: cancel, + } +} + +func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *deadline) { defer logutil.LogPanic() ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -218,13 +228,15 @@ func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) { select { case d := <-tsDeadlineCh: select { - case <-d.timer: + case <-d.timer.C: log.Error("tso proxy request processing is canceled due to timeout", errs.ZapError(errs.ErrProxyTSOTimeout)) d.cancel() + timerutil.GlobalTimerPool.Put(d.timer) case <-d.done: - continue + timerutil.GlobalTimerPool.Put(d.timer) case <-ctx.Done(): + timerutil.GlobalTimerPool.Put(d.timer) return } case <-ctx.Done(): @@ -235,11 +247,12 @@ func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) { func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { defer logutil.LogPanic() - + timer := time.NewTimer(3 * time.Second) + defer timer.Stop() select { case <-done: return - case <-time.After(3 * time.Second): + case <-timer.C: cancel() case <-streamCtx.Done(): } diff --git a/server/api/pprof.go b/server/api/pprof.go index b64278a21b7..900c48f8368 100644 --- a/server/api/pprof.go +++ b/server/api/pprof.go @@ -209,8 +209,10 @@ func (h *pprofHandler) PProfThreadcreate(w http.ResponseWriter, r *http.Request) } func sleepWithCtx(ctx context.Context, d time.Duration) { + timer := time.NewTimer(d) + defer timer.Stop() select { - case <-time.After(d): + case <-timer.C: case <-ctx.Done(): } } diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 2f8ebcb3a45..f17dc1c2052 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -136,8 +136,8 @@ func (c *coordinator) patrolRegions() { defer logutil.LogPanic() defer c.wg.Done() - timer := time.NewTimer(c.cluster.GetOpts().GetPatrolRegionInterval()) - defer timer.Stop() + ticker := time.NewTicker(c.cluster.GetOpts().GetPatrolRegionInterval()) + defer ticker.Stop() log.Info("coordinator starts patrol regions") start := time.Now() @@ -147,8 +147,9 @@ func (c *coordinator) patrolRegions() { ) for { select { - case <-timer.C: - timer.Reset(c.cluster.GetOpts().GetPatrolRegionInterval()) + case <-ticker.C: + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(c.cluster.GetOpts().GetPatrolRegionInterval()) case <-c.ctx.Done(): c.setPatrolRegionsDuration(0) log.Info("patrol regions has been stopped") @@ -838,12 +839,11 @@ func (c *coordinator) runScheduler(s *scheduleController) { defer c.wg.Done() defer s.Cleanup(c.cluster) - timer := time.NewTimer(s.GetInterval()) - defer timer.Stop() + ticker := time.NewTicker(s.GetInterval()) + defer ticker.Stop() for { select { - case <-timer.C: - timer.Reset(s.GetInterval()) + case <-ticker.C: diagnosable := s.diagnosticRecorder.isAllowed() if !s.AllowSchedule(diagnosable) { continue @@ -852,7 +852,8 @@ func (c *coordinator) runScheduler(s *scheduleController) { added := c.opController.AddWaitingOperator(op...) log.Debug("add operator", zap.Int("added", added), zap.Int("total", len(op)), zap.String("scheduler", s.GetName())) } - + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(s.GetInterval()) case <-s.Ctx().Done(): log.Info("scheduler has been stopped", zap.String("scheduler-name", s.GetName()), diff --git a/server/grpc_service.go b/server/grpc_service.go index c858a1f6161..b1fd840e6f7 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -564,13 +564,15 @@ func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error { defer logutil.LogPanic() done <- b.stream.SendAndClose(bucket) }() + timer := time.NewTimer(heartbeatSendTimeout) + defer timer.Stop() select { case err := <-done: if err != nil { atomic.StoreInt32(&b.closed, 1) } return err - case <-time.After(heartbeatSendTimeout): + case <-timer.C: atomic.StoreInt32(&b.closed, 1) return ErrSendHeartbeatTimeout } @@ -604,13 +606,15 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error { defer logutil.LogPanic() done <- s.stream.Send(m) }() + timer := time.NewTimer(heartbeatSendTimeout) + defer timer.Stop() select { case err := <-done: if err != nil { atomic.StoreInt32(&s.closed, 1) } return errors.WithStack(err) - case <-time.After(heartbeatSendTimeout): + case <-timer.C: atomic.StoreInt32(&s.closed, 1) return ErrSendHeartbeatTimeout } @@ -1761,10 +1765,12 @@ func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient // TODO: If goroutine here timeout when tso stream created successfully, we need to handle it correctly. func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { defer logutil.LogPanic() + timer := time.NewTimer(3 * time.Second) + defer timer.Stop() select { case <-done: return - case <-time.After(3 * time.Second): + case <-timer.C: cancel() case <-streamCtx.Done(): } diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 35fd80ec3b5..da2bf177af3 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -164,11 +164,14 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } } log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err)) + timer := time.NewTimer(retryInterval) select { case <-ctx.Done(): log.Info("stop synchronizing with leader due to context canceled") + timer.Stop() return - case <-time.After(retryInterval): + case <-timer.C: + timer.Stop() } continue } @@ -181,11 +184,14 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { if err = stream.CloseSend(); err != nil { log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err)) } + timer := time.NewTimer(retryInterval) select { case <-ctx.Done(): log.Info("stop synchronizing with leader due to context canceled") + timer.Stop() return - case <-time.After(retryInterval): + case <-timer.C: + timer.Stop() } break } diff --git a/server/replication/replication_mode.go b/server/replication/replication_mode.go index 708270ee43f..453ce276afa 100644 --- a/server/replication/replication_mode.go +++ b/server/replication/replication_mode.go @@ -340,8 +340,10 @@ const ( // Run starts the background job. func (m *ModeManager) Run(ctx context.Context) { // Wait for a while when just start, in case tikv do not connect in time. + timer := time.NewTimer(idleTimeout) + defer timer.Stop() select { - case <-time.After(idleTimeout): + case <-timer.C: case <-ctx.Done(): return } @@ -351,9 +353,11 @@ func (m *ModeManager) Run(ctx context.Context) { go func() { defer wg.Done() + ticker := time.NewTicker(tickInterval) + defer ticker.Stop() for { select { - case <-time.After(tickInterval): + case <-ticker.C: case <-ctx.Done(): return } @@ -363,9 +367,11 @@ func (m *ModeManager) Run(ctx context.Context) { go func() { defer wg.Done() + ticker := time.NewTicker(replicateStateInterval) + defer ticker.Stop() for { select { - case <-time.After(replicateStateInterval): + case <-ticker.C: case <-ctx.Done(): return } diff --git a/server/server.go b/server/server.go index 2943640eefd..e21501bc61e 100644 --- a/server/server.go +++ b/server/server.go @@ -612,9 +612,11 @@ func (s *Server) serverMetricsLoop() { ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() + ticker := time.NewTicker(serverMetricsInterval) + defer ticker.Stop() for { select { - case <-time.After(serverMetricsInterval): + case <-ticker.C: s.collectEtcdStateMetrics() case <-ctx.Done(): log.Info("server is closed, exit metrics loop") @@ -1652,10 +1654,14 @@ func (s *Server) etcdLeaderLoop() { ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() + ticker := time.NewTicker(s.cfg.LeaderPriorityCheckInterval.Duration) + defer ticker.Stop() for { select { - case <-time.After(s.cfg.LeaderPriorityCheckInterval.Duration): + case <-ticker.C: s.member.CheckPriority(ctx) + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(s.cfg.LeaderPriorityCheckInterval.Duration) case <-ctx.Done(): log.Info("server is closed, exit etcd leader loop") return @@ -1796,6 +1802,8 @@ func (s *Server) UnmarkSnapshotRecovering(ctx context.Context) error { // GetServicePrimaryAddr returns the primary address for a given service. // Note: This function will only return primary address without judging if it's alive. func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) { + ticker := time.NewTicker(retryIntervalGetServicePrimary) + defer ticker.Stop() for i := 0; i < maxRetryTimesGetServicePrimary; i++ { if v, ok := s.servicePrimaryMap.Load(serviceName); ok { return v.(string), true @@ -1805,7 +1813,7 @@ func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) return "", false case <-ctx.Done(): return "", false - case <-time.After(retryIntervalGetServicePrimary): + case <-ticker.C: } } return "", false @@ -1823,6 +1831,8 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { revision int64 err error ) + ticker := time.NewTicker(retryIntervalGetServicePrimary) + defer ticker.Stop() for i := 0; i < maxRetryTimesGetServicePrimary; i++ { revision, err = s.updateServicePrimaryAddr(serviceName) if revision != 0 && err == nil { // update success @@ -1831,7 +1841,7 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { select { case <-ctx.Done(): return - case <-time.After(retryIntervalGetServicePrimary): + case <-ticker.C: } } if err != nil { diff --git a/tools/pd-simulator/simulator/client.go b/tools/pd-simulator/simulator/client.go index 8cf60aa99c6..a0232e7c0a4 100644 --- a/tools/pd-simulator/simulator/client.go +++ b/tools/pd-simulator/simulator/client.go @@ -150,21 +150,22 @@ func (c *client) createHeartbeatStream() (pdpb.PD_RegionHeartbeatClient, context cancel context.CancelFunc ctx context.Context ) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() for { ctx, cancel = context.WithCancel(c.ctx) stream, err = c.pdClient().RegionHeartbeat(ctx) - if err != nil { - simutil.Logger.Error("create region heartbeat stream error", zap.String("tag", c.tag), zap.Error(err)) - cancel() - select { - case <-time.After(time.Second): - continue - case <-c.ctx.Done(): - simutil.Logger.Info("cancel create stream loop") - return nil, ctx, cancel - } + if err == nil { + break + } + simutil.Logger.Error("create region heartbeat stream error", zap.String("tag", c.tag), zap.Error(err)) + cancel() + select { + case <-c.ctx.Done(): + simutil.Logger.Info("cancel create stream loop") + return nil, ctx, cancel + case <-ticker.C: } - break } return stream, ctx, cancel }