Skip to content

Commit

Permalink
test for retry when disconnected, with logging
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Feb 7, 2024
1 parent d41b01d commit 14e0aa4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
14 changes: 13 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,14 @@ func (cs *clientStream) commitAttempt() {
// retried, the bool indicates whether it is being retried transparently.
func (a *csAttempt) shouldRetry(err error) (bool, error) {
cs := a.cs

logger.Error("Should retry?", err.Error())
if cs.finished || cs.committed || a.drop {
// RPC is finished or committed or was dropped by the picker; cannot retry.
logger.Error(cs.finished, cs.committed, a.drop)
return false, err
}
if a.s == nil && a.allowTransparentRetry {
logger.Error("transparent")
return true, nil
}
// Wait for the trailers.
Expand All @@ -636,16 +638,19 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) {
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
logger.Error("first attempt unprocessed")
return true, nil
}
if cs.cc.dopts.disableRetry {
logger.Error("retry disabled")
return false, err
}

pushback := 0
hasPushback := false
if a.s != nil {
if !a.s.TrailersOnly() {
logger.Error("not trailers only")
return false, err
}

Expand Down Expand Up @@ -676,15 +681,18 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) {

rp := cs.methodConfig.RetryPolicy
if rp == nil || !rp.RetryableStatusCodes[code] {
logger.Error("not retryable")
return false, err
}

// Note: the ordering here is important; we count this as a failure
// only if the code matched a retryable code.
if cs.retryThrottler.throttle() {
logger.Error("throttled")
return false, err
}
if cs.numRetries+1 >= rp.MaxAttempts {
logger.Error("max attempts")
return false, err
}

Expand All @@ -698,19 +706,23 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) {
if max := float64(rp.MaxBackoff); cur > max {
cur = max
}
logger.Errorf("RP=%+v; fact=%v, cur=%v", rp, fact, cur)
dur = time.Duration(grpcrand.Int63n(int64(cur)))
cs.numRetriesSincePushback++
}

// TODO(dfawley): we could eagerly fail here if dur puts us past the
// deadline, but unsure if it is worth doing.
logger.Error("Timer with duration", dur)
t := time.NewTimer(dur)
select {
case <-t.C:
cs.numRetries++
logger.Error("time to retry")
return false, nil
case <-cs.ctx.Done():
t.Stop()
logger.Error("rpc deadline reached")
return false, status.FromContextError(cs.ctx.Err()).Err()
}
}
Expand Down
31 changes: 31 additions & 0 deletions test/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,37 @@ import (
testpb "google.golang.org/grpc/interop/grpc_testing"
)

func (s) TestRetryConnectionErrors(t *testing.T) {
// Nothing's running on localhost:9090!
cc, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{

Check failure on line 51 in test/retry_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.21)

unnecessary use of fmt.Sprintf (S1039)
"methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}],
"retryPolicy": {
"MaxAttempts": 5,
"InitialBackoff": "1s",
"MaxBackoff": "1s",
"BackoffMultiplier": 1,
"RetryableStatusCodes": ["UNAVAILABLE"]
}
}]
}`)),
)
defer cc.Close()

Check failure on line 64 in test/retry_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.21)

should check returned error before deferring cc.Close() (SA5001)
if err != nil {
t.Fatal("Error creating client:", err)
}
client := testpb.NewTestServiceClient(cc)
start := time.Now()
_, err = client.EmptyCall(context.Background(), &testpb.Empty{})
// Based on the config I would expect the elapsed time to be at least 5 seconds
// But in reality it's around 1s
fmt.Println("Elapsed: ", time.Since(start))
fmt.Println("Error: ", err)
fmt.Println("Status: ", cc.GetState().String())
}

func (s) TestRetryUnary(t *testing.T) {
i := -1
ss := &stubserver.StubServer{
Expand Down

0 comments on commit 14e0aa4

Please sign in to comment.