Skip to content

Commit

Permalink
Force the stress test to run within limited time to avoid ci-test-job…
Browse files Browse the repository at this point in the history
… to timeout.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Jun 9, 2023
1 parent b3ce3a3 commit f2a05e6
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *tsoProxyTestSuite) TestTSOProxyWorksWithCancellation() {
}
}()
for i := 0; i < 20; i++ {
s.verifyTSOProxy(s.streams, 100, true)
s.verifyTSOProxy(s.ctx, s.streams, 100, true)
}
}()
wg.Wait()
Expand All @@ -138,23 +138,29 @@ func (s *tsoProxyTestSuite) TestTSOProxyStress() {
streams := make([]pdpb.PD_TsoClient, 0)
cancelFuncs := make([]context.CancelFunc, 0)

// Start stress test for 90 seconds to avoid ci-test-job to timeout.
ctxTimeout, cancel := context.WithTimeout(s.ctx, 90*time.Second)
defer cancel()

// Push load from many concurrent clients in multiple rounds and increase the #client each round.
for i := 0; i < totalRounds; i++ {
fmt.Printf("Start the %dth round of stress test with %d concurrent clients.\n", i, len(streams)+clientsIncr)
fmt.Printf("start the %dth round of stress test with %d concurrent clients.\n", i, len(streams)+clientsIncr)
grpcClientConnsTemp, streamsTemp, cancelFuncsTemp :=
createTSOStreams(re, s.ctx, s.backendEndpoints, clientsIncr, false)
grpcClientConns = append(grpcClientConns, grpcClientConnsTemp...)
streams = append(streams, streamsTemp...)
cancelFuncs = append(cancelFuncs, cancelFuncsTemp...)
s.verifyTSOProxy(streams, 50, false)
s.verifyTSOProxy(ctxTimeout, streams, 50, false)
}
s.cleanupGRPCStreams(grpcClientConns, streams, cancelFuncs)
fmt.Println("the stress test completed.")

// Wait for the TSO Proxy to recover from the stress.
time.Sleep(recoverySLA)

// Verify the TSO Proxy can still work correctly after the stress.
s.verifyTSOProxy(s.streams, 100, true)
s.verifyTSOProxy(s.ctx, s.streams, 100, true)
fmt.Println("verified that the TSO Proxy can still work correctly after the stress.")
}

func (s *tsoProxyTestSuite) cleanupGRPCStreams(
Expand All @@ -180,7 +186,7 @@ func (s *tsoProxyTestSuite) cleanupGRPCStreams(
// gPRC and TSO failures are allowed, but the TSO Proxy should not panic, blocked or deadlocked.
// If it returns a timestamp, it should be a valid timestamp monotonic increasing.
func (s *tsoProxyTestSuite) verifyTSOProxy(
streams []pdpb.PD_TsoClient, requestsPerClient int, mustReliable bool,
ctx context.Context, streams []pdpb.PD_TsoClient, requestsPerClient int, mustReliable bool,
) {
re := s.Require()
reqs := s.generateRequests(requestsPerClient)
Expand All @@ -193,6 +199,12 @@ func (s *tsoProxyTestSuite) verifyTSOProxy(
defer wg.Done()
lastPhysical, lastLogical := int64(0), int64(0)
for i := 0; i < requestsPerClient; i++ {
select {
case <-ctx.Done():
return
default:
}

req := reqs[rand.Intn(requestsPerClient)]
err := streamCopy.Send(req)
if err != nil && !mustReliable {
Expand Down

0 comments on commit f2a05e6

Please sign in to comment.