diff --git a/client/client.go b/client/client.go index d5cf7cf28d0..ba4741e4ed9 100644 --- a/client/client.go +++ b/client/client.go @@ -797,6 +797,12 @@ func (c *client) UpdateOption(option DynamicOption, value any) error { return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool") } c.option.setEnableFollowerHandle(enable) + case TSOClientRPCConcurrency: + value, ok := value.(int) + if !ok { + return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int") + } + c.option.setTSOClientRPCConcurrency(value) default: return errors.New("[pd] unsupported client option") } diff --git a/client/metrics.go b/client/metrics.go index a83b4a36407..d1b375aea8a 100644 --- a/client/metrics.go +++ b/client/metrics.go @@ -47,6 +47,7 @@ var ( tsoBatchSendLatency prometheus.Histogram requestForwarded *prometheus.GaugeVec ongoingRequestCountGauge *prometheus.GaugeVec + estimateTSOLatencyGauge *prometheus.GaugeVec ) func initMetrics(constLabels prometheus.Labels) { @@ -127,6 +128,14 @@ func initMetrics(constLabels prometheus.Labels) { Help: "Current count of ongoing batch tso requests", ConstLabels: constLabels, }, []string{"stream"}) + estimateTSOLatencyGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd_client", + Subsystem: "request", + Name: "estimate_tso_latency", + Help: "Estimated latency of an RTT of getting TSO", + ConstLabels: constLabels, + }, []string{"stream"}) } var ( @@ -236,4 +245,5 @@ func registerMetrics() { prometheus.MustRegister(tsoBatchSize) prometheus.MustRegister(tsoBatchSendLatency) prometheus.MustRegister(requestForwarded) + prometheus.MustRegister(estimateTSOLatencyGauge) } diff --git a/client/option.go b/client/option.go index 0109bfc4ed0..3f2b7119b52 100644 --- a/client/option.go +++ b/client/option.go @@ -29,6 +29,7 @@ const ( defaultMaxTSOBatchWaitInterval time.Duration = 0 defaultEnableTSOFollowerProxy = false defaultEnableFollowerHandle = false + defaultTSOClientRPCConcurrency = 1 ) // DynamicOption is used to distinguish the dynamic option type. @@ -43,6 +44,8 @@ const ( EnableTSOFollowerProxy // EnableFollowerHandle is the follower handle option. EnableFollowerHandle + // TSOClientRPCConcurrency controls the amount of ongoing TSO RPC requests at the same time in a single TSO client. + TSOClientRPCConcurrency dynamicOptionCount ) @@ -77,6 +80,7 @@ func newOption() *option { co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval) co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy) co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle) + co.dynamicOptions[TSOClientRPCConcurrency].Store(defaultTSOClientRPCConcurrency) return co } @@ -127,3 +131,14 @@ func (o *option) setEnableTSOFollowerProxy(enable bool) { func (o *option) getEnableTSOFollowerProxy() bool { return o.dynamicOptions[EnableTSOFollowerProxy].Load().(bool) } + +func (o *option) setTSOClientRPCConcurrency(value int) { + old := o.getTSOClientRPCConcurrency() + if value != old { + o.dynamicOptions[TSOClientRPCConcurrency].Store(value) + } +} + +func (o *option) getTSOClientRPCConcurrency() int { + return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int) +} diff --git a/client/tso_batch_controller.go b/client/tso_batch_controller.go index 32191889160..b810e108667 100644 --- a/client/tso_batch_controller.go +++ b/client/tso_batch_controller.go @@ -64,6 +64,17 @@ func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, tsoRequ // TODO: `tbc.collectedRequestCount` should never be non-empty here. Consider do assertion here. tbc.collectedRequestCount = 0 for { + // If the batch size reaches the maxBatchSize limit but the token haven't arrived yet, don't receive more + // requests, and return when token is ready. + if tbc.collectedRequestCount >= tbc.maxBatchSize && !tokenAcquired { + select { + case <-ctx.Done(): + return ctx.Err() + case <-tokenCh: + return nil + } + } + select { case <-ctx.Done(): return ctx.Err() @@ -146,6 +157,37 @@ fetchPendingRequestsLoop: return nil } +// fetchRequestsWithTimer tries to fetch requests until the given timer ticks. The caller must set the timer properly +// before calling this function. +func (tbc *tsoBatchController) fetchRequestsWithTimer(ctx context.Context, tsoRequestCh <-chan *tsoRequest, timer *time.Timer) error { +batchingLoop: + for tbc.collectedRequestCount < tbc.maxBatchSize { + select { + case <-ctx.Done(): + return ctx.Err() + case req := <-tsoRequestCh: + tbc.pushRequest(req) + case <-timer.C: + break batchingLoop + } + } + + // Try to collect more requests in non-blocking way. +nonWaitingBatchLoop: + for tbc.collectedRequestCount < tbc.maxBatchSize { + select { + case <-ctx.Done(): + return ctx.Err() + case req := <-tsoRequestCh: + tbc.pushRequest(req) + default: + break nonWaitingBatchLoop + } + } + + return nil +} + func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest) { tbc.collectedRequests[tbc.collectedRequestCount] = tsoReq tbc.collectedRequestCount++ diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index a1e0b03a1fa..7febf194f3c 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -17,9 +17,11 @@ package pd import ( "context" "fmt" + "math" "math/rand" "runtime/trace" "sync" + "sync/atomic" "time" "github.com/opentracing/opentracing-go" @@ -61,6 +63,7 @@ type tsoInfo struct { respReceivedAt time.Time physical int64 logical int64 + sourceStreamID string } type tsoServiceProvider interface { @@ -69,6 +72,8 @@ type tsoServiceProvider interface { updateConnectionCtxs(ctx context.Context, dc string, connectionCtxs *sync.Map) bool } +const dispatcherCheckRPCConcurrencyInterval = time.Second * 5 + type tsoDispatcher struct { ctx context.Context cancel context.CancelFunc @@ -79,7 +84,7 @@ type tsoDispatcher struct { connectionCtxs *sync.Map tsoRequestCh chan *tsoRequest tsDeadlineCh chan *deadline - lastTSOInfo *tsoInfo + latestTSOInfo atomic.Pointer[tsoInfo] // For reusing tsoBatchController objects batchBufferPool *sync.Pool @@ -87,7 +92,10 @@ type tsoDispatcher struct { // A token must be acquired here before sending an RPC request, and the token must be put back after finishing the // RPC. This is used like a semaphore, but we don't use semaphore directly here as it cannot be selected with // other channels. - tokenCh chan struct{} + tokenCh chan struct{} + lastCheckConcurrencyTime time.Time + tokenCount int + rpcConcurrency int updateConnectionCtxsCh chan struct{} } @@ -115,7 +123,7 @@ func newTSODispatcher( provider: provider, connectionCtxs: &sync.Map{}, tsoRequestCh: tsoRequestCh, - tsDeadlineCh: make(chan *deadline, 1), + tsDeadlineCh: make(chan *deadline, tokenChCapacity), batchBufferPool: &sync.Pool{ New: func() any { return newTSOBatchController(maxBatchSize * 2) @@ -187,9 +195,6 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { batchController *tsoBatchController ) - // Currently only 1 concurrency is supported. Put one token in. - td.tokenCh <- struct{}{} - log.Info("[tso] tso dispatcher created", zap.String("dc-location", dc)) // Clean up the connectionCtxs when the dispatcher exits. defer func() { @@ -200,6 +205,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { return true }) if batchController != nil && batchController.collectedRequestCount != 0 { + // If you encounter this failure, please check the stack in the logs to see if it's a panic. log.Fatal("batched tso requests not cleared when exiting the tso dispatcher loop", zap.Any("panic", recover())) } tsoErr := errors.WithStack(errClosing) @@ -219,6 +225,12 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(option.timeout) defer streamLoopTimer.Stop() + + // Create a not-started-timer to be used for collecting batches for concurrent RPC. + batchingTimer := time.NewTimer(0) + <-batchingTimer.C + defer batchingTimer.Stop() + bo := retry.InitialBackoffer(updateMemberBackOffBaseTime, updateMemberTimeout, updateMemberBackOffBaseTime) tsoBatchLoop: for { @@ -233,8 +245,18 @@ tsoBatchLoop: batchController = td.batchBufferPool.Get().(*tsoBatchController) } - // Start to collect the TSO requests. maxBatchWaitInterval := option.getMaxTSOBatchWaitInterval() + + currentBatchStartTime := time.Now() + // Update concurrency settings if needed. + if err = td.checkTSORPCConcurrency(ctx, maxBatchWaitInterval, currentBatchStartTime); err != nil { + // checkTSORPCConcurrency can only fail due to `ctx` being invalidated. + log.Info("[tso] stop checking tso rpc concurrency configurations due to context canceled", + zap.String("dc-location", dc), zap.Error(err)) + return + } + + // Start to collect the TSO requests. // Once the TSO requests are collected, must make sure they could be finished or revoked eventually, // otherwise the upper caller may get blocked on waiting for the results. if err = batchController.fetchPendingRequests(ctx, td.tsoRequestCh, td.tokenCh, maxBatchWaitInterval); err != nil { @@ -318,6 +340,57 @@ tsoBatchLoop: break streamChoosingLoop } + + noDelay := false + failpoint.Inject("tsoDispatcherConcurrentModeNoDelay", func() { + noDelay = true + }) + + // If concurrent RPC is enabled, the time for collecting each request batch is expected to be + // estimatedRPCDuration / concurrency. Note the time mentioned here is counted from starting trying to collect + // the batch, instead of the time when the first request arrives. + // Here, if the elapsed time since starting collecting this batch didn't reach the expected batch time, then + // continue collecting. + if td.isConcurrentRPCEnabled() { + estimatedLatency := stream.EstimatedRPCLatency() + goalBatchTime := estimatedLatency / time.Duration(td.rpcConcurrency) + + failpoint.Inject("tsoDispatcherConcurrentModeAssertDelayDuration", func(val failpoint.Value) { + if s, ok := val.(string); ok { + expected, err := time.ParseDuration(s) + if err != nil { + panic(err) + } + if math.Abs(expected.Seconds()-goalBatchTime.Seconds()) > 1e-6 { + log.Fatal("tsoDispatcher: trying to delay for unexpected duration for the batch", zap.Duration("goalBatchTime", goalBatchTime), zap.Duration("expectedBatchTime", expected)) + } + } else { + panic("invalid value for failpoint tsoDispatcherConcurrentModeAssertDelayDuration: expected string") + } + }) + + waitTimerStart := time.Now() + remainingBatchTime := goalBatchTime - waitTimerStart.Sub(currentBatchStartTime) + if remainingBatchTime > 0 && !noDelay { + if !batchingTimer.Stop() { + select { + case <-batchingTimer.C: + default: + } + } + batchingTimer.Reset(remainingBatchTime) + + err = batchController.fetchRequestsWithTimer(ctx, td.tsoRequestCh, batchingTimer) + if err != nil { + // There should not be other kinds of errors. + log.Info("[tso] stop fetching the pending tso requests due to context canceled", + zap.String("dc-location", dc), zap.Error(err)) + td.cancelCollectedRequests(batchController, invalidStreamID, errors.WithStack(ctx.Err())) + return + } + } + } + done := make(chan struct{}) dl := newTSDeadline(option.timeout, done, cancel) select { @@ -495,6 +568,9 @@ func (td *tsoDispatcher) processRequests( reqKeyspaceGroupID = svcDiscovery.GetKeyspaceGroupID() ) + // Load latest allocated ts for monotonicity assertion. + tsoInfoBeforeReq := td.latestTSOInfo.Load() + cb := func(result tsoRequestResult, reqKeyspaceGroupID uint32, err error) { // As golang doesn't allow double-closing a channel, here is implicitly a check that the callback // is never called twice or called while it's also being cancelled elsewhere. @@ -513,10 +589,12 @@ func (td *tsoDispatcher) processRequests( respReceivedAt: time.Now(), physical: result.physical, logical: result.logical, + sourceStreamID: stream.streamID, } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits) - td.compareAndSwapTS(curTSOInfo, firstLogical) + // Do the check before releasing the token. + td.checkMonotonicity(tsoInfoBeforeReq, curTSOInfo, firstLogical) td.doneCollectedRequests(tbc, result.physical, firstLogical, result.suffixBits, stream.streamID) } @@ -542,32 +620,35 @@ func (td *tsoDispatcher) doneCollectedRequests(tbc *tsoBatchController, physical tbc.finishCollectedRequests(physical, firstLogical, suffixBits, streamID, nil) } -func (td *tsoDispatcher) compareAndSwapTS( - curTSOInfo *tsoInfo, firstLogical int64, +// checkMonotonicity checks whether the monotonicity of the TSO allocation is violated. +// It asserts (curTSOInfo, firstLogical) must be larger than lastTSOInfo, and updates td.latestTSOInfo if it grows. +// +// Note that when concurrent RPC is enabled, the lastTSOInfo may not be the latest value stored in td.latestTSOInfo +// field. Instead, it's the value that was loaded just before the current RPC request's beginning. The reason is, +// if two requests processing time has overlap, they don't have a strong order, and the later-finished one may be +// allocated later (with larger value) than another. We only need to guarantee request A returns larger ts than B +// if request A *starts* after request B *finishes*. +func (td *tsoDispatcher) checkMonotonicity( + lastTSOInfo *tsoInfo, curTSOInfo *tsoInfo, firstLogical int64, ) { - if td.lastTSOInfo != nil { - var ( - lastTSOInfo = td.lastTSOInfo - dc = td.dc - physical = curTSOInfo.physical - keyspaceID = td.provider.getServiceDiscovery().GetKeyspaceID() - ) - if td.lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { + keyspaceID := td.provider.getServiceDiscovery().GetKeyspaceID() + if lastTSOInfo != nil { + if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { log.Info("[tso] keyspace group changed", - zap.String("dc-location", dc), + zap.String("dc-location", td.dc), zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) } // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then - // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned + // all TSOs we get will be [6, 7, 8, 9, 10]. latestTSOInfo.logical stores the logical part of the largest ts returned // last time. - if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { + if tsoutil.TSLessEqual(curTSOInfo.physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { log.Panic("[tso] timestamp fallback", - zap.String("dc-location", dc), + zap.String("dc-location", td.dc), zap.Uint32("keyspace", keyspaceID), zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), - zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", curTSOInfo.physical, firstLogical)), zap.String("last-tso-server", lastTSOInfo.tsoServer), zap.String("cur-tso-server", curTSOInfo.tsoServer), zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), @@ -575,8 +656,103 @@ func (td *tsoDispatcher) compareAndSwapTS( zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), - zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt), + zap.String("last-stream-id", lastTSOInfo.sourceStreamID), + zap.String("cur-stream-id", curTSOInfo.sourceStreamID)) + } + } + + if td.latestTSOInfo.CompareAndSwap(nil, curTSOInfo) { + // If latestTSOInfo is missing, simply store it and exit. + return + } + + // Replace if we are holding a larger ts than that has been recorded. + for { + old := td.latestTSOInfo.Load() + if tsoutil.TSLessEqual(curTSOInfo.physical, curTSOInfo.logical, old.physical, old.logical) { + // The current one is large enough. Skip. + break + } + if td.latestTSOInfo.CompareAndSwap(old, curTSOInfo) { + // Successfully replaced. + break + } + } +} + +// checkTSORPCConcurrency checks configurations about TSO RPC concurrency, and adjust the token count if needed. +// Some other options (EnableTSOFollowerProxy and MaxTSOBatchWaitInterval) may affect the availability of concurrent +// RPC requests. As the dispatcher loop loads MaxTSOBatchWaitInterval in each single circle, pass it directly to this +// function. Other configurations will be loaded within this function when needed. +// +// Behavior of the function: +// - As concurrent TSO RPC requests is an optimization aiming on the opposite purpose to that of EnableTSOFollowerProxy +// and MaxTSOBatchWaitInterval, so once either EnableTSOFollowerProxy and MaxTSOBatchWaitInterval is enabled, the +// concurrency will always be set to 1 no matter how the user configured it. +// - Normally, this function takes effect in a limited frequency controlled by dispatcherCheckRPCConcurrencyInterval. +// However, if the RPC concurrency is set to more than 1, and MaxTSOBatchWaitInterval is changed from disabled into +// enabled (0 -> positive), this function takes effect immediately to disable concurrent RPC requests. +// - After this function takes effect, the final decision of concurrency and token count will be set to +// td.rpcConcurrency and td.tokenCount; and tokens available in td.tokenCh will also be adjusted. +func (td *tsoDispatcher) checkTSORPCConcurrency(ctx context.Context, maxBatchWaitInterval time.Duration, now time.Time) error { + // If we currently enabled concurrent TSO RPC requests, but `maxBatchWaitInterval` is a positive value, it must + // because that MaxTSOBatchWaitInterval is just enabled. In this case, disable concurrent TSO RPC requests + // immediately, because MaxTSOBatchWaitInterval and concurrent RPC requests has opposite purpose. + immediatelyUpdate := td.rpcConcurrency > 1 && maxBatchWaitInterval > 0 + + // Allow always updating for test purpose. + failpoint.Inject("tsoDispatcherAlwaysCheckConcurrency", func() { + immediatelyUpdate = true + }) + + if !immediatelyUpdate && now.Sub(td.lastCheckConcurrencyTime) < dispatcherCheckRPCConcurrencyInterval { + return nil + } + td.lastCheckConcurrencyTime = now + + newConcurrency := td.provider.getOption().getTSOClientRPCConcurrency() + if maxBatchWaitInterval > 0 || td.provider.getOption().getEnableTSOFollowerProxy() { + newConcurrency = 1 + } + + if newConcurrency == td.rpcConcurrency { + return nil + } + + log.Info("[tso] switching tso rpc concurrency", zap.Int("old", td.rpcConcurrency), zap.Int("new", newConcurrency)) + td.rpcConcurrency = newConcurrency + + // Find a proper token count. + // When the concurrency is set to 1, there's only 1 token, which means only 1 RPC request can run at the same + // time. + // When the concurrency is set to more than 1, the time interval between sending two batches of requests is + // controlled by an estimation of an average RPC duration. But as the duration of an RPC may jitter in the network, + // and an RPC request may finish earlier or later. So we allow there to be the actual number of concurrent ongoing + // request to be fluctuating. So in this case, the token count will be set to 2 times the expected concurrency. + newTokenCount := newConcurrency + if newConcurrency > 1 { + newTokenCount = newConcurrency * 2 + } + + if newTokenCount > td.tokenCount { + for td.tokenCount < newTokenCount { + td.tokenCh <- struct{}{} + td.tokenCount++ + } + } else if newTokenCount < td.tokenCount { + for td.tokenCount > newTokenCount { + select { + case <-ctx.Done(): + return ctx.Err() + case <-td.tokenCh: + } + td.tokenCount-- } } - td.lastTSOInfo = curTSOInfo + return nil +} + +func (td *tsoDispatcher) isConcurrentRPCEnabled() bool { + return td.rpcConcurrency > 1 } diff --git a/client/tso_dispatcher_test.go b/client/tso_dispatcher_test.go index b8f0fcef208..bf038e7b7f3 100644 --- a/client/tso_dispatcher_test.go +++ b/client/tso_dispatcher_test.go @@ -18,20 +18,27 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "go.uber.org/zap/zapcore" ) type mockTSOServiceProvider struct { - option *option + option *option + createStream func(ctx context.Context) *tsoStream + updateConnMu sync.Mutex } -func newMockTSOServiceProvider(option *option) *mockTSOServiceProvider { +func newMockTSOServiceProvider(option *option, createStream func(ctx context.Context) *tsoStream) *mockTSOServiceProvider { return &mockTSOServiceProvider{ - option: option, + option: option, + createStream: createStream, } } @@ -43,17 +50,279 @@ func (*mockTSOServiceProvider) getServiceDiscovery() ServiceDiscovery { return NewMockPDServiceDiscovery([]string{mockStreamURL}, nil) } -func (*mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool { +func (m *mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool { + // Avoid concurrent updating in the background updating goroutine and active updating in the dispatcher loop when + // stream is missing. + m.updateConnMu.Lock() + defer m.updateConnMu.Unlock() + _, ok := connectionCtxs.Load(mockStreamURL) if ok { return true } ctx, cancel := context.WithCancel(ctx) - stream := newTSOStream(ctx, mockStreamURL, newMockTSOStreamImpl(ctx, true)) + var stream *tsoStream + if m.createStream == nil { + stream = newTSOStream(ctx, mockStreamURL, newMockTSOStreamImpl(ctx, resultModeGenerated)) + } else { + stream = m.createStream(ctx) + } connectionCtxs.LoadOrStore(mockStreamURL, &tsoConnectionContext{ctx, cancel, mockStreamURL, stream}) return true } +type testTSODispatcherSuite struct { + suite.Suite + re *require.Assertions + + streamInner *mockTSOStreamImpl + stream *tsoStream + dispatcher *tsoDispatcher + dispatcherWg sync.WaitGroup + option *option + + reqPool *sync.Pool +} + +func (s *testTSODispatcherSuite) SetupTest() { + s.re = require.New(s.T()) + s.option = newOption() + s.option.timeout = time.Hour + // As the internal logic of the tsoDispatcher allows it to create streams multiple times, but our tests needs + // single stable access to the inner stream, we do not allow it to create it more than once in these tests. + creating := new(atomic.Bool) + // To avoid data race on reading `stream` and `streamInner` fields. + created := new(atomic.Bool) + createStream := func(ctx context.Context) *tsoStream { + if !creating.CompareAndSwap(false, true) { + s.re.FailNow("testTSODispatcherSuite: trying to create stream more than once, which is unsupported in this tests") + } + s.streamInner = newMockTSOStreamImpl(ctx, resultModeGenerateOnSignal) + s.stream = newTSOStream(ctx, mockStreamURL, s.streamInner) + created.Store(true) + return s.stream + } + s.dispatcher = newTSODispatcher(context.Background(), globalDCLocation, defaultMaxTSOBatchSize, newMockTSOServiceProvider(s.option, createStream)) + s.reqPool = &sync.Pool{ + New: func() any { + return &tsoRequest{ + done: make(chan error, 1), + physical: 0, + logical: 0, + dcLocation: globalDCLocation, + } + }, + } + + s.dispatcherWg.Add(1) + go s.dispatcher.handleDispatcher(&s.dispatcherWg) + + // Perform a request to ensure the stream must be created. + + { + ctx := context.Background() + req := s.sendReq(ctx) + s.reqMustNotReady(req) + // Wait until created + for !created.Load() { + time.Sleep(time.Millisecond) + } + s.streamInner.generateNext() + s.reqMustReady(req) + } + s.re.True(created.Load()) + s.re.NotNil(s.stream) +} + +func (s *testTSODispatcherSuite) TearDownTest() { + s.dispatcher.close() + s.streamInner.stop() + s.dispatcherWg.Wait() + s.stream.WaitForClosed() + s.streamInner = nil + s.stream = nil + s.dispatcher = nil + s.reqPool = nil +} + +func (s *testTSODispatcherSuite) getReq(ctx context.Context) *tsoRequest { + req := s.reqPool.Get().(*tsoRequest) + req.clientCtx = context.Background() + req.requestCtx = ctx + req.physical = 0 + req.logical = 0 + req.start = time.Now() + req.pool = s.reqPool + return req +} + +func (s *testTSODispatcherSuite) sendReq(ctx context.Context) *tsoRequest { + req := s.getReq(ctx) + s.dispatcher.push(req) + return req +} + +func (s *testTSODispatcherSuite) reqMustNotReady(req *tsoRequest) { + _, _, err := req.waitTimeout(time.Millisecond * 50) + s.re.Error(err) + s.re.ErrorIs(err, context.DeadlineExceeded) +} + +func (s *testTSODispatcherSuite) reqMustReady(req *tsoRequest) (physical int64, logical int64) { + physical, logical, err := req.waitTimeout(time.Second) + s.re.NoError(err) + return physical, logical +} + +func TestTSODispatcherTestSuite(t *testing.T) { + suite.Run(t, new(testTSODispatcherSuite)) +} + +func (s *testTSODispatcherSuite) TestBasic() { + ctx := context.Background() + req := s.sendReq(ctx) + s.reqMustNotReady(req) + s.streamInner.generateNext() + s.reqMustReady(req) +} + +func (s *testTSODispatcherSuite) checkIdleTokenCount(expectedTotal int) { + // When the tsoDispatcher is idle, the dispatcher loop will acquire a token and wait for requests. Therefore + // there should be N-1 free tokens remaining. + spinStart := time.Now() + for time.Since(spinStart) < time.Second { + if s.dispatcher.tokenCount != expectedTotal { + continue + } + if len(s.dispatcher.tokenCh) == expectedTotal-1 { + break + } + } + s.re.Equal(expectedTotal, s.dispatcher.tokenCount) + s.re.Len(s.dispatcher.tokenCh, expectedTotal-1) +} + +func (s *testTSODispatcherSuite) testStaticConcurrencyImpl(concurrency int) { + ctx := context.Background() + s.option.setTSOClientRPCConcurrency(concurrency) + + // Make sure the state of the mock stream is clear. Unexpected batching may make the requests sent to the stream + // less than expected, causing there are more `generateNext` signals or generated results. + s.re.Empty(s.streamInner.resultCh) + + // The dispatcher may block on fetching requests, which is after checking concurrency option. Perform a request + // to make sure the concurrency setting takes effect. + req := s.sendReq(ctx) + s.reqMustNotReady(req) + s.streamInner.generateNext() + s.reqMustReady(req) + + // For concurrent mode, the actual token count is twice the concurrency. + // Note that the concurrency is a hint, and it's allowed to have more than `concurrency` requests running. + tokenCount := concurrency + if concurrency > 1 { + tokenCount = concurrency * 2 + } + s.checkIdleTokenCount(tokenCount) + + // As the failpoint `tsoDispatcherConcurrentModeNoDelay` is set, tsoDispatcher won't collect requests in blocking + // way. And as `reqMustNotReady` delays for a while, requests shouldn't be batched as long as there are free tokens. + // The first N requests (N=tokenCount) will each be a single batch, occupying a token. The last 3 are blocked, + // and will be batched together once there is a free token. + reqs := make([]*tsoRequest, 0, tokenCount+3) + + for i := 0; i < tokenCount+3; i++ { + req := s.sendReq(ctx) + s.reqMustNotReady(req) + reqs = append(reqs, req) + } + + // The dispatcher won't process more request batches if tokens are used up. + // Note that `reqMustNotReady` contains a delay, which makes it nearly impossible that dispatcher is processing the + // second batch but not finished yet. + // Also note that in current implementation, the tsoStream tries to receive the next result before checking + // the `tsoStream.pendingRequests` queue. Changing this behavior may need to update this test. + for i := 0; i < tokenCount+3; i++ { + expectedPending := tokenCount + 1 - i + if expectedPending > tokenCount { + expectedPending = tokenCount + } + if expectedPending < 0 { + expectedPending = 0 + } + + // Spin for a while as the dispatcher loop may have not finished sending next batch to pendingRequests + spinStart := time.Now() + for time.Since(spinStart) < time.Second { + if expectedPending == len(s.stream.pendingRequests) { + break + } + } + s.re.Len(s.stream.pendingRequests, expectedPending) + + req := reqs[i] + // The last 3 requests should be in a single batch. Don't need to generate new results for the last 2. + if i <= tokenCount { + s.reqMustNotReady(req) + s.streamInner.generateNext() + } + s.reqMustReady(req) + } +} + +func (s *testTSODispatcherSuite) TestConcurrentRPC() { + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return")) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherAlwaysCheckConcurrency", "return")) + defer func() { + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay")) + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherAlwaysCheckConcurrency")) + }() + + s.testStaticConcurrencyImpl(1) + s.testStaticConcurrencyImpl(2) + s.testStaticConcurrencyImpl(4) + s.testStaticConcurrencyImpl(16) +} + +func (s *testTSODispatcherSuite) TestBatchDelaying() { + ctx := context.Background() + s.option.setTSOClientRPCConcurrency(2) + + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return")) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency", `return("12ms")`)) + defer func() { + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay")) + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency")) + }() + + // Make sure concurrency option takes effect. + req := s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) + + // Trigger the check. + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("6ms")`)) + defer func() { + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration")) + }() + req = s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) + + // Try other concurrency. + s.option.setTSOClientRPCConcurrency(3) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("4ms")`)) + req = s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) + + s.option.setTSOClientRPCConcurrency(4) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("3ms")`)) + req = s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) +} + func BenchmarkTSODispatcherHandleRequests(b *testing.B) { log.SetLevel(zapcore.FatalLevel) @@ -80,7 +349,7 @@ func BenchmarkTSODispatcherHandleRequests(b *testing.B) { return req } - dispatcher := newTSODispatcher(ctx, globalDCLocation, defaultMaxTSOBatchSize, newMockTSOServiceProvider(newOption())) + dispatcher := newTSODispatcher(ctx, globalDCLocation, defaultMaxTSOBatchSize, newMockTSOServiceProvider(newOption(), nil)) var wg sync.WaitGroup wg.Add(1) diff --git a/client/tso_request.go b/client/tso_request.go index fb2ae2bb92e..5c959673a8b 100644 --- a/client/tso_request.go +++ b/client/tso_request.go @@ -60,6 +60,11 @@ func (req *tsoRequest) tryDone(err error) { // Wait will block until the TSO result is ready. func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { + return req.waitCtx(req.requestCtx) +} + +// waitCtx waits for the TSO result with specified ctx, while not using req.requestCtx. +func (req *tsoRequest) waitCtx(ctx context.Context) (physical int64, logical int64, err error) { // If tso command duration is observed very high, the reason could be it // takes too long for Wait() be called. start := time.Now() @@ -78,13 +83,20 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { cmdDurationWait.Observe(now.Sub(start).Seconds()) cmdDurationTSO.Observe(now.Sub(req.start).Seconds()) return - case <-req.requestCtx.Done(): - return 0, 0, errors.WithStack(req.requestCtx.Err()) + case <-ctx.Done(): + return 0, 0, errors.WithStack(ctx.Err()) case <-req.clientCtx.Done(): return 0, 0, errors.WithStack(req.clientCtx.Err()) } } +// waitTimeout waits for the TSO result for limited time. Currently only for test purposes. +func (req *tsoRequest) waitTimeout(timeout time.Duration) (physical int64, logical int64, err error) { + ctx, cancel := context.WithTimeout(req.requestCtx, timeout) + defer cancel() + return req.waitCtx(ctx) +} + type tsoRequestFastFail struct { err error } diff --git a/client/tso_stream.go b/client/tso_stream.go index 479beff2c6a..142ad71c6b9 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -18,11 +18,13 @@ import ( "context" "fmt" "io" + "math" "sync" "sync/atomic" "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" @@ -214,6 +216,8 @@ type tsoStream struct { state atomic.Int32 stoppedWithErr atomic.Pointer[error] + estimatedLatencyMicros atomic.Uint64 + ongoingRequestCountGauge prometheus.Gauge ongoingRequests atomic.Int32 } @@ -226,7 +230,10 @@ const ( var streamIDAlloc atomic.Int32 -const invalidStreamID = "" +const ( + invalidStreamID = "" + maxPendingRequestsInTSOStream = 64 +) func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAdapter) *tsoStream { streamID := fmt.Sprintf("%s-%d", serverURL, streamIDAlloc.Add(1)) @@ -238,7 +245,7 @@ func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAda stream: stream, streamID: streamID, - pendingRequests: make(chan batchedRequests, 64), + pendingRequests: make(chan batchedRequests, maxPendingRequestsInTSOStream), cancel: cancel, @@ -363,6 +370,27 @@ func (s *tsoStream) recvLoop(ctx context.Context) { s.ongoingRequestCountGauge.Set(0) }() + // For calculating the estimated RPC latency. + const ( + filterCutoffFreq float64 = 1.0 + filterNewSampleWeightUpperbound float64 = 0.2 + ) + // The filter applies on logarithm of the latency of each TSO RPC in microseconds. + filter := newRCFilter(filterCutoffFreq, filterNewSampleWeightUpperbound) + + updateEstimatedLatency := func(sampleTime time.Time, latency time.Duration) { + if latency < 0 { + // Unreachable + return + } + currentSample := math.Log(float64(latency.Microseconds())) + filteredValue := filter.update(sampleTime, currentSample) + micros := math.Exp(filteredValue) + s.estimatedLatencyMicros.Store(uint64(micros)) + // Update the metrics in seconds. + estimateTSOLatencyGauge.WithLabelValues(s.streamID).Set(micros * 1e-6) + } + recvLoop: for { select { @@ -383,14 +411,15 @@ recvLoop: hasReq = false } - durationSeconds := time.Since(currentReq.startTime).Seconds() + latency := time.Since(currentReq.startTime) + latencySeconds := latency.Seconds() if err != nil { // If a request is pending and error occurs, observe the duration it has cost. // Note that it's also possible that the stream is broken due to network without being requested. In this // case, `Recv` may return an error while no request is pending. if hasReq { - requestFailedDurationTSO.Observe(durationSeconds) + requestFailedDurationTSO.Observe(latencySeconds) } if err == io.EOF { finishWithErr = errors.WithStack(errs.ErrClientTSOStreamClosed) @@ -403,9 +432,9 @@ recvLoop: break recvLoop } - latencySeconds := durationSeconds requestDurationTSO.Observe(latencySeconds) tsoBatchSize.Observe(float64(res.count)) + updateEstimatedLatency(currentReq.startTime, latency) if res.count != uint32(currentReq.count) { finishWithErr = errors.WithStack(errTSOLength) @@ -421,6 +450,28 @@ recvLoop: } } +// EstimatedRPCLatency returns an estimation of the duration of each TSO RPC. If the stream has never handled any RPC, +// this function returns 0. +func (s *tsoStream) EstimatedRPCLatency() time.Duration { + failpoint.Inject("tsoStreamSimulateEstimatedRPCLatency", func(val failpoint.Value) { + if s, ok := val.(string); ok { + duration, err := time.ParseDuration(s) + if err != nil { + panic(err) + } + failpoint.Return(duration) + } else { + panic("invalid failpoint value for `tsoStreamSimulateEstimatedRPCLatency`: expected string") + } + }) + latencyUs := s.estimatedLatencyMicros.Load() + // Limit it at least 100us + if latencyUs < 100 { + latencyUs = 100 + } + return time.Microsecond * time.Duration(latencyUs) +} + // GetRecvError returns the error (if any) that has been encountered when receiving response asynchronously. func (s *tsoStream) GetRecvError() error { perr := s.stoppedWithErr.Load() @@ -434,3 +485,48 @@ func (s *tsoStream) GetRecvError() error { func (s *tsoStream) WaitForClosed() { s.wg.Wait() } + +// rcFilter is a simple implementation of a discrete-time low-pass filter. +// Ref: https://en.wikipedia.org/wiki/Low-pass_filter#Simple_infinite_impulse_response_filter +// There are some differences between this implementation and the wikipedia one: +// - Time-interval between each two samples is not necessarily a constant. We allow non-even sample interval by simply +// calculating the alpha (which is calculated by `dt / (rc + dt)`) dynamically for each sample, at the expense of +// losing some mathematical strictness. +// - Support specifying the upperbound of the new sample when updating. This can be an approach to avoid the output +// jumps drastically when the samples come in a low frequency. +type rcFilter struct { + rc float64 + newSampleWeightUpperBound float64 + value float64 + lastSampleTime time.Time + firstSampleArrived bool +} + +// newRCFilter initializes an rcFilter. `cutoff` is the cutoff frequency in Hertz. `newSampleWeightUpperbound` controls +// the upper limit of the weight of each incoming sample (pass 1 for unlimited). +func newRCFilter(cutoff float64, newSampleWeightUpperBound float64) rcFilter { + rc := 1.0 / (2.0 * math.Pi * cutoff) + return rcFilter{ + rc: rc, + newSampleWeightUpperBound: newSampleWeightUpperBound, + } +} + +func (f *rcFilter) update(sampleTime time.Time, newSample float64) float64 { + // Handle the first sample + if !f.firstSampleArrived { + f.firstSampleArrived = true + f.lastSampleTime = sampleTime + f.value = newSample + return newSample + } + + // Delta time. + dt := sampleTime.Sub(f.lastSampleTime).Seconds() + // `alpha` is the weight of the new sample, limited with `newSampleWeightUpperBound`. + alpha := math.Min(dt/(f.rc+dt), f.newSampleWeightUpperBound) + f.value = (1-alpha)*f.value + alpha*newSample + + f.lastSampleTime = sampleTime + return f.value +} diff --git a/client/tso_stream_test.go b/client/tso_stream_test.go index b09c54baf3a..ab6f2786ff3 100644 --- a/client/tso_stream_test.go +++ b/client/tso_stream_test.go @@ -17,6 +17,7 @@ package pd import ( "context" "io" + "math" "testing" "time" @@ -42,6 +43,14 @@ type resultMsg struct { breakStream bool } +type resultMode int + +const ( + resultModeManual resultMode = iota + resultModeGenerated + resultModeGenerateOnSignal +) + type mockTSOStreamImpl struct { ctx context.Context requestCh chan requestMsg @@ -49,21 +58,21 @@ type mockTSOStreamImpl struct { keyspaceID uint32 errorState error - autoGenerateResult bool + resultMode resultMode // Current progress of generating TSO results resGenPhysical, resGenLogical int64 } -func newMockTSOStreamImpl(ctx context.Context, autoGenerateResult bool) *mockTSOStreamImpl { +func newMockTSOStreamImpl(ctx context.Context, resultMode resultMode) *mockTSOStreamImpl { return &mockTSOStreamImpl{ ctx: ctx, requestCh: make(chan requestMsg, 64), resultCh: make(chan resultMsg, 64), keyspaceID: 0, - autoGenerateResult: autoGenerateResult, - resGenPhysical: 10000, - resGenLogical: 0, + resultMode: resultMode, + resGenPhysical: 10000, + resGenLogical: 0, } } @@ -82,6 +91,17 @@ func (s *mockTSOStreamImpl) Send(clusterID uint64, _keyspaceID, keyspaceGroupID } func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { + var needGenerateResult, needResultSignal bool + switch s.resultMode { + case resultModeManual: + needResultSignal = true + case resultModeGenerated: + needGenerateResult = true + case resultModeGenerateOnSignal: + needResultSignal = true + needGenerateResult = true + } + // This stream have ever receive an error, it returns the error forever. if s.errorState != nil { return tsoRequestResult{}, s.errorState @@ -130,12 +150,12 @@ func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { } s.errorState = res.err return tsoRequestResult{}, s.errorState - } else if s.autoGenerateResult { + } else if !needResultSignal { // Do not allow manually assigning result. panic("trying manually specifying result for mockTSOStreamImpl when it's auto-generating mode") } - } else if s.autoGenerateResult { - res = s.autoGenResult(req.count) + } else if !needResultSignal { + // Mark hasRes as true to skip receiving from resultCh. The actual value of the result will be generated later. hasRes = true } @@ -160,6 +180,10 @@ func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { } } + if needGenerateResult { + res = s.autoGenResult(req.count) + } + // Both res and req should be ready here. if res.err != nil { s.errorState = res.err @@ -168,11 +192,14 @@ func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { } func (s *mockTSOStreamImpl) autoGenResult(count int64) resultMsg { + if count >= (1 << 18) { + panic("requested count too large") + } physical := s.resGenPhysical logical := s.resGenLogical + count if logical >= (1 << 18) { - physical += logical >> 18 - logical &= (1 << 18) - 1 + physical += 1 + logical = count } s.resGenPhysical = physical @@ -190,6 +217,9 @@ func (s *mockTSOStreamImpl) autoGenResult(count int64) resultMsg { } func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count uint32) { + if s.resultMode != resultModeManual { + panic("trying to manually specifying tso result on generating mode") + } s.resultCh <- resultMsg{ r: tsoRequestResult{ physical: physical, @@ -201,6 +231,13 @@ func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count ui } } +func (s *mockTSOStreamImpl) generateNext() { + if s.resultMode != resultModeGenerateOnSignal { + panic("trying to signal generation when the stream is not generate-on-signal mode") + } + s.resultCh <- resultMsg{} +} + func (s *mockTSOStreamImpl) returnError(err error) { s.resultCh <- resultMsg{ err: err, @@ -233,7 +270,7 @@ type testTSOStreamSuite struct { func (s *testTSOStreamSuite) SetupTest() { s.re = require.New(s.T()) - s.inner = newMockTSOStreamImpl(context.Background(), false) + s.inner = newMockTSOStreamImpl(context.Background(), resultModeManual) s.stream = newTSOStream(context.Background(), mockStreamURL, s.inner) } @@ -454,10 +491,125 @@ func (s *testTSOStreamSuite) TestTSOStreamConcurrentRunning() { } } +func (s *testTSOStreamSuite) TestEstimatedLatency() { + s.inner.returnResult(100, 0, 1) + res := s.getResult(s.mustProcessRequestWithResultCh(1)) + s.re.NoError(res.err) + s.re.Equal(int64(100), res.result.physical) + s.re.Equal(int64(0), res.result.logical) + estimation := s.stream.EstimatedRPCLatency().Seconds() + s.re.Greater(estimation, 0.0) + s.re.InDelta(0.0, estimation, 0.01) + + // For each began request, record its startTime and send it to the result returning goroutine. + reqStartTimeCh := make(chan time.Time, maxPendingRequestsInTSOStream) + // Limit concurrent requests to be less than the capacity of tsoStream.pendingRequests. + tokenCh := make(chan struct{}, maxPendingRequestsInTSOStream-1) + for i := 0; i < 40; i++ { + tokenCh <- struct{}{} + } + // Return a result after 50ms delay for each requests + const delay = time.Millisecond * 50 + // The goroutine to delay and return the result. + go func() { + allocated := int64(1) + for reqStartTime := range reqStartTimeCh { + now := time.Now() + elapsed := now.Sub(reqStartTime) + if elapsed < delay { + time.Sleep(delay - elapsed) + } + s.inner.returnResult(100, allocated, 1) + allocated++ + } + }() + + // Limit the test time within 1s + startTime := time.Now() + resCh := make(chan (<-chan callbackInvocation), 100) + // The sending goroutine + go func() { + for time.Since(startTime) < time.Second { + <-tokenCh + reqStartTimeCh <- time.Now() + r := s.mustProcessRequestWithResultCh(1) + resCh <- r + } + close(reqStartTimeCh) + close(resCh) + }() + // Check the result + index := 0 + for r := range resCh { + // The first is 1 + index++ + res := s.getResult(r) + tokenCh <- struct{}{} + s.re.NoError(res.err) + s.re.Equal(int64(100), res.result.physical) + s.re.Equal(int64(index), res.result.logical) + } + + s.re.Greater(s.stream.EstimatedRPCLatency(), time.Duration(int64(0.9*float64(delay)))) + s.re.Less(s.stream.EstimatedRPCLatency(), time.Duration(math.Floor(1.1*float64(delay)))) +} + +func TestRCFilter(t *testing.T) { + re := require.New(t) + // Test basic calculation with frequency 1 + f := newRCFilter(1, 1) + now := time.Now() + // The first sample initializes the value. + re.Equal(10.0, f.update(now, 10)) + now = now.Add(time.Second) + expectedValue := 10 / (2*math.Pi + 1) + re.InEpsilon(expectedValue, f.update(now, 0), 1e-8) + expectedValue = expectedValue*(1/(2*math.Pi))/(1/(2*math.Pi)+2) + 100*2/(1/(2*math.Pi)+2) + now = now.Add(time.Second * 2) + re.InEpsilon(expectedValue, f.update(now, 100), 1e-8) + + // Test newSampleWeightUpperBound + f = newRCFilter(10, 0.5) + now = time.Now() + re.Equal(0.0, f.update(now, 0)) + now = now.Add(time.Second) + re.InEpsilon(1.0, f.update(now, 2), 1e-8) + now = now.Add(time.Second * 2) + re.InEpsilon(3.0, f.update(now, 5), 1e-8) + + // Test another cutoff frequency and weight upperbound. + f = newRCFilter(1/(2*math.Pi), 0.9) + now = time.Now() + re.Equal(1.0, f.update(now, 1)) + now = now.Add(time.Second) + re.InEpsilon(2.0, f.update(now, 3), 1e-8) + now = now.Add(time.Second * 2) + re.InEpsilon(6.0, f.update(now, 8), 1e-8) + now = now.Add(time.Minute) + re.InEpsilon(15.0, f.update(now, 16), 1e-8) + + // Test with dense samples + f = newRCFilter(1/(2*math.Pi), 0.9) + now = time.Now() + re.Equal(0.0, f.update(now, 0)) + lastOutput := 0.0 + // 10000 even samples in 1 second. + for i := 0; i < 10000; i++ { + now = now.Add(time.Microsecond * 100) + output := f.update(now, 1.0) + re.Greater(output, lastOutput) + re.Less(output, 1.0) + lastOutput = output + } + // Regarding the above samples as being close enough to a continuous function, the output after 1 second + // should be 1 - exp(-RC*t) = 1 - exp(-t). Here RC = 1/(2*pi*cutoff) = 1. + re.InDelta(0.63, lastOutput, 0.02) +} + func BenchmarkTSOStreamSendRecv(b *testing.B) { log.SetLevel(zapcore.FatalLevel) - streamInner := newMockTSOStreamImpl(context.Background(), true) + streamInner := newMockTSOStreamImpl(context.Background(), resultModeGenerated) stream := newTSOStream(context.Background(), mockStreamURL, streamInner) defer func() { streamInner.stop()