diff --git a/http2/server.go b/http2/server.go index de60fa88f..02c88b6b3 100644 --- a/http2/server.go +++ b/http2/server.go @@ -581,9 +581,11 @@ type serverConn struct { advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client curClientStreams uint32 // number of open streams initiated by the client curPushedStreams uint32 // number of open streams initiated by server push + curHandlers uint32 // number of running handler goroutines maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes streams map[uint32]*stream + unstartedHandlers []unstartedHandler initialStreamSendWindowSize int32 maxFrameSize int32 peerMaxHeaderListSize uint32 // zero means unknown (default) @@ -981,6 +983,8 @@ func (sc *serverConn) serve() { return case gracefulShutdownMsg: sc.startGracefulShutdownInternal() + case handlerDoneMsg: + sc.handlerDone() default: panic("unknown timer") } @@ -1020,6 +1024,7 @@ var ( idleTimerMsg = new(serverMessage) shutdownTimerMsg = new(serverMessage) gracefulShutdownMsg = new(serverMessage) + handlerDoneMsg = new(serverMessage) ) func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) } @@ -2017,8 +2022,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error { st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout) } - go sc.runHandler(rw, req, handler) - return nil + return sc.scheduleHandler(id, rw, req, handler) } func (sc *serverConn) upgradeRequest(req *http.Request) { @@ -2038,6 +2042,10 @@ func (sc *serverConn) upgradeRequest(req *http.Request) { sc.conn.SetReadDeadline(time.Time{}) } + // This is the first request on the connection, + // so start the handler directly rather than going + // through scheduleHandler. + sc.curHandlers++ go sc.runHandler(rw, req, sc.handler.ServeHTTP) } @@ -2278,8 +2286,62 @@ func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *response return &responseWriter{rws: rws} } +type unstartedHandler struct { + streamID uint32 + rw *responseWriter + req *http.Request + handler func(http.ResponseWriter, *http.Request) +} + +// scheduleHandler starts a handler goroutine, +// or schedules one to start as soon as an existing handler finishes. +func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) error { + sc.serveG.check() + maxHandlers := sc.advMaxStreams + if sc.curHandlers < maxHandlers { + sc.curHandlers++ + go sc.runHandler(rw, req, handler) + return nil + } + if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) { + return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm)) + } + sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{ + streamID: streamID, + rw: rw, + req: req, + handler: handler, + }) + return nil +} + +func (sc *serverConn) handlerDone() { + sc.serveG.check() + sc.curHandlers-- + i := 0 + maxHandlers := sc.advMaxStreams + for ; i < len(sc.unstartedHandlers); i++ { + u := sc.unstartedHandlers[i] + if sc.streams[u.streamID] == nil { + // This stream was reset before its goroutine had a chance to start. + continue + } + if sc.curHandlers >= maxHandlers { + break + } + sc.curHandlers++ + go sc.runHandler(u.rw, u.req, u.handler) + sc.unstartedHandlers[i] = unstartedHandler{} // don't retain references + } + sc.unstartedHandlers = sc.unstartedHandlers[i:] + if len(sc.unstartedHandlers) == 0 { + sc.unstartedHandlers = nil + } +} + // Run on its own goroutine. func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) { + defer sc.sendServeMsg(handlerDoneMsg) didPanic := true defer func() { rw.rws.stream.cancelCtx() diff --git a/http2/server_test.go b/http2/server_test.go index b99c5af54..22657cbfe 100644 --- a/http2/server_test.go +++ b/http2/server_test.go @@ -4664,3 +4664,116 @@ func TestServerWriteDoesNotRetainBufferAfterServerClose(t *testing.T) { st.ts.Config.Close() <-donec } + +func TestServerMaxHandlerGoroutines(t *testing.T) { + const maxHandlers = 10 + handlerc := make(chan chan bool) + donec := make(chan struct{}) + defer close(donec) + st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) { + stopc := make(chan bool, 1) + select { + case handlerc <- stopc: + case <-donec: + } + select { + case shouldPanic := <-stopc: + if shouldPanic { + panic(http.ErrAbortHandler) + } + case <-donec: + } + }, func(s *Server) { + s.MaxConcurrentStreams = maxHandlers + }) + defer st.Close() + + st.writePreface() + st.writeInitialSettings() + st.writeSettingsAck() + + // Make maxHandlers concurrent requests. + // Reset them all, but only after the handler goroutines have started. + var stops []chan bool + streamID := uint32(1) + for i := 0; i < maxHandlers; i++ { + st.writeHeaders(HeadersFrameParam{ + StreamID: streamID, + BlockFragment: st.encodeHeader(), + EndStream: true, + EndHeaders: true, + }) + stops = append(stops, <-handlerc) + st.fr.WriteRSTStream(streamID, ErrCodeCancel) + streamID += 2 + } + + // Start another request, and immediately reset it. + st.writeHeaders(HeadersFrameParam{ + StreamID: streamID, + BlockFragment: st.encodeHeader(), + EndStream: true, + EndHeaders: true, + }) + st.fr.WriteRSTStream(streamID, ErrCodeCancel) + streamID += 2 + + // Start another two requests. Don't reset these. + for i := 0; i < 2; i++ { + st.writeHeaders(HeadersFrameParam{ + StreamID: streamID, + BlockFragment: st.encodeHeader(), + EndStream: true, + EndHeaders: true, + }) + streamID += 2 + } + + // The initial maxHandlers handlers are still executing, + // so the last two requests don't start any new handlers. + select { + case <-handlerc: + t.Errorf("handler unexpectedly started while maxHandlers are already running") + case <-time.After(1 * time.Millisecond): + } + + // Tell two handlers to exit. + // The pending requests which weren't reset start handlers. + stops[0] <- false // normal exit + stops[1] <- true // panic + stops = stops[2:] + stops = append(stops, <-handlerc) + stops = append(stops, <-handlerc) + + // Make a bunch more requests. + // Eventually, the server tells us to go away. + for i := 0; i < 5*maxHandlers; i++ { + st.writeHeaders(HeadersFrameParam{ + StreamID: streamID, + BlockFragment: st.encodeHeader(), + EndStream: true, + EndHeaders: true, + }) + st.fr.WriteRSTStream(streamID, ErrCodeCancel) + streamID += 2 + } +Frames: + for { + f, err := st.readFrame() + if err != nil { + st.t.Fatal(err) + } + switch f := f.(type) { + case *GoAwayFrame: + if f.ErrCode != ErrCodeEnhanceYourCalm { + t.Errorf("err code = %v; want %v", f.ErrCode, ErrCodeEnhanceYourCalm) + } + break Frames + default: + } + } + + for _, s := range stops { + close(s) + } +}