Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@
// When set, the function will be called before the stream enters
// the blocking state.
NewStreamWaitingForResolver = func() {}

ActiveStreamTracker = func(created, deleted int) {}

Check failure on line 248 in internal/internal.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.23)

exported var ActiveStreamTracker should have comment or be unexported https://revive.run/r#exported

Check failure on line 248 in internal/internal.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.23)

parameter 'deleted' seems to be unused, consider removing or renaming it as _ https://revive.run/r#unused-parameter

Check failure on line 248 in internal/internal.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.23)

parameter 'created' seems to be unused, consider removing or renaming it as _ https://revive.run/r#unused-parameter
)

// HealthChecker defines the signature of the client-side LB channel health
Expand Down
2 changes: 2 additions & 0 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
}

t.activeStreams[streamID] = s
internal.ActiveStreamTracker(1, 0)
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
Expand Down Expand Up @@ -1310,6 +1311,7 @@ func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
if len(t.activeStreams) == 0 {
t.idle = time.Now()
}
internal.ActiveStreamTracker(0, 1)
}
t.mu.Unlock()

Expand Down
71 changes: 71 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3946,6 +3946,77 @@
}
}

func (s) TestServerStreaming_ClientTimeoutWithoutContextCancellation(t *testing.T) {
activeStreams := atomic.Int64{}

internal.ActiveStreamTracker = func(created, deleted int) {
activeStreams.Add(int64(created))
activeStreams.Add(-int64(deleted))
}

ss := &stubserver.StubServer{
StreamingOutputCallF: func(req *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {

Check failure on line 3958 in test/end2end_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.23)

parameter 'req' seems to be unused, consider removing or renaming it as _ https://revive.run/r#unused-parameter
// lets keep this busy until error
for {
if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil {
return err
}
}
},
}

if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(10)}); err != nil {
t.Fatalf("Starting stubServer: %v", err)
}
defer ss.Stop()

const numStreams = 100

wg := sync.WaitGroup{}
wg.Add(numStreams)
for j := 0; j < numStreams; j++ {
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

stream, err := ss.Client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
if err != nil {
return
}
defer stream.CloseSend()

//let keep receiving the streams until timeout
for {
_, _ = stream.Recv()
select {
case <-ctx.Done():
return
default:
time.Sleep(time.Second)
}
}
}()
}
wg.Wait()

const (
sleepEachLoop = 100 * time.Millisecond
loopCount = int(5 * time.Second / sleepEachLoop)
)

for i := 0; i < loopCount; i++ {
time.Sleep(sleepEachLoop)
if activeStreams.Load() == 0 {
break
}
}

if activeStreams.Load() != 0 {
t.Fatalf("leak streams: %d", activeStreams.Load())
}
}

// Tests the behavior of client for server-side streaming RPC when client sends zero request messages.
func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) {
t.Skip("blocked on i/7286")
Expand Down
Loading