Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply timeout in the queue proxy on 'time to first byte'. #2696

Merged
merged 10 commits into from
Dec 15, 2018
2 changes: 1 addition & 1 deletion cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func main() {

server = h2c.NewServer(
fmt.Sprintf(":%d", queue.RequestQueuePort),
http.TimeoutHandler(http.HandlerFunc(handler), time.Duration(revisionTimeoutSeconds)*time.Second, "request timeout"))
queue.TimeToFirstByteTimeoutHandler(http.HandlerFunc(handler), time.Duration(revisionTimeoutSeconds)*time.Second, "request timeout"))

go server.ListenAndServe()
go setupAdminHandlers(adminServer)
Expand Down
166 changes: 166 additions & 0 deletions pkg/queue/timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queue

import (
"context"
"io"
"net/http"
"sync"
"time"
)

var defaultTimeoutBody = "<html><head><title>Timeout</title></head><body><h1>Timeout</h1></body></html>"

// TimeToFirstByteTimeoutHandler returns a Handler that runs h with the
// given time limit in which the first byte of the response must be written.
//
// The new Handler calls h.ServeHTTP to handle each request, but if a
// call runs for longer than its time limit, the handler responds with
// a 503 Service Unavailable error and the given message in its body.
// (If msg is empty, a suitable default message will be sent.)
// After such a timeout, writes by h to its ResponseWriter will return
// ErrHandlerTimeout.
//
// A panic from the underlying handler is propagated as-is to be able to
// make use of custom panic behavior by HTTP handlers. See
// https://golang.org/pkg/net/http/#Handler.
//
// The implementation is largely inspired by http.TimeoutHandler.
func TimeToFirstByteTimeoutHandler(h http.Handler, dt time.Duration, msg string) http.Handler {
markusthoemmes marked this conversation as resolved.
Show resolved Hide resolved
return &timeoutHandler{
handler: h,
body: msg,
dt: dt,
}
}

type timeoutHandler struct {
handler http.Handler
body string
dt time.Duration
}

func (h *timeoutHandler) errorBody() string {
if h.body != "" {
return h.body
}
return defaultTimeoutBody
}

func (h *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx, cancelCtx := context.WithCancel(r.Context())
defer cancelCtx()

done := make(chan struct{})
// The recovery value of a panic is written to this channel to be
// propagated (panicked with) again.
panicChan := make(chan interface{}, 1)
markusthoemmes marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still isn't closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://stackoverflow.com/a/8593986/3269863 suggests that channels don't need to be closed (that is: if you don't need to use the close as a signal as well).

I still pushed a commit to close the channels appropriately.

defer close(panicChan)

tw := &timeoutWriter{w: w}
go func() {
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()
h.handler.ServeHTTP(tw, r.WithContext(ctx))

// Closing the channel is not deferred to give the panic recovery
// precedence and not successfully complete the request by accident.
close(done)
markusthoemmes marked this conversation as resolved.
Show resolved Hide resolved
}()

timeout := time.After(h.dt)
for {
select {
case p := <-panicChan:
close(done)
panic(p)
markusthoemmes marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this leaks done on this and the timeout path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it only leaks here. The timeout path cancels the context. That doesn't make it panic so the handler's close(done) call is properly reached. To be sure, I tried adding a close call to the timeout path and that errored due to multiple close calls.

case <-done:
return
case <-timeout:
if tw.TimeoutAndWriteError(h.errorBody()) {
cancelCtx()
return
}
}
}
}

// timeoutWriter is a wrapper around an http.ResponseWriter. It guards
// writing an error response to whether or not the underlying writer has
// already been written to.
//
// If the underlying writer has not been written to, an error response is
// returned. If it has already been written to, the error is ignored and
// the response is allowed to continue.
type timeoutWriter struct {
w http.ResponseWriter

mu sync.Mutex
timedOut bool
wroteOnce bool
}
markusthoemmes marked this conversation as resolved.
Show resolved Hide resolved

var _ http.ResponseWriter = (*timeoutWriter)(nil)

func (tw *timeoutWriter) Header() http.Header { return tw.w.Header() }

func (tw *timeoutWriter) Write(p []byte) (int, error) {
tw.mu.Lock()
defer tw.mu.Unlock()
if tw.timedOut {
return 0, http.ErrHandlerTimeout
}

tw.wroteOnce = true
return tw.w.Write(p)
}

func (tw *timeoutWriter) WriteHeader(code int) {
tw.mu.Lock()
defer tw.mu.Unlock()
if tw.timedOut {
return
}

tw.wroteOnce = true
tw.w.WriteHeader(code)
}

// TimeoutAndError writes an error to the response write if
// nothing has been written on the writer before. Returns whether
// an error was written or not.
//
// If this writes an error, all subsequent calls to Write will
// result in http.ErrHandlerTimeout.
func (tw *timeoutWriter) TimeoutAndWriteError(msg string) bool {
tw.mu.Lock()
defer tw.mu.Unlock()

if !tw.wroteOnce {
tw.w.WriteHeader(http.StatusServiceUnavailable)
io.WriteString(tw.w, msg)

tw.timedOut = true
return true
}

return false
}
133 changes: 133 additions & 0 deletions pkg/queue/timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue

import (
"net/http"
"net/http/httptest"
"testing"
"time"
)

func TestTimeToFirstByteTimeoutHandler(t *testing.T) {
tests := []struct {
name string
timeout time.Duration
handler func(writeErrors chan error) http.Handler
timeoutMessage string
wantStatus int
wantBody string
wantWriteError bool
wantPanic bool
}{{
name: "all good",
timeout: 10 * time.Second,
handler: func(writeErrors chan error) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("hi"))
})
},
wantStatus: http.StatusOK,
wantBody: "hi",
}, {
name: "timeout",
timeout: 50 * time.Millisecond,
handler: func(writeErrors chan error) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
w.WriteHeader(http.StatusOK)
_, werr := w.Write([]byte("hi"))
writeErrors <- werr
})
},
wantStatus: http.StatusServiceUnavailable,
wantBody: defaultTimeoutBody,
wantWriteError: true,
}, {
name: "write then sleep",
timeout: 10 * time.Millisecond,
handler: func(writeErrors chan error) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
time.Sleep(50 * time.Millisecond)
w.Write([]byte("hi"))
})
},
wantStatus: http.StatusOK,
wantBody: "hi",
}, {
name: "custom timeout message",
timeout: 50 * time.Millisecond,
handler: func(writeErrors chan error) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
_, werr := w.Write([]byte("hi"))
writeErrors <- werr
})
},
timeoutMessage: "request timeout",
wantStatus: http.StatusServiceUnavailable,
wantBody: "request timeout",
wantWriteError: true,
}, {
name: "propagate panic",
timeout: 50 * time.Millisecond,
handler: func(writeErrors chan error) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
panic(http.ErrAbortHandler)
})
},
wantStatus: http.StatusServiceUnavailable,
wantBody: "request timeout",
wantPanic: true,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
req, err := http.NewRequest("GET", "/", nil)
if err != nil {
t.Fatal(err)
}

writeErrors := make(chan error, 1)
rr := httptest.NewRecorder()
handler := TimeToFirstByteTimeoutHandler(test.handler(writeErrors), test.timeout, test.timeoutMessage)

defer func() {
if test.wantPanic {
if recovered := recover(); recovered != http.ErrAbortHandler {
t.Error("Expected the handler to panic, but it didn't.")
}
}
}()
handler.ServeHTTP(rr, req)

if status := rr.Code; status != test.wantStatus {
t.Errorf("Handler returned wrong status code: got %v want %v", status, test.wantStatus)
}

if rr.Body.String() != test.wantBody {
t.Errorf("Handler returned unexpected body: got %q want %q", rr.Body.String(), test.wantBody)
}

if test.wantWriteError {
err := <-writeErrors
if err != http.ErrHandlerTimeout {
t.Errorf("Expected a timeout error, got %v", err)
}
}
})
}
}
32 changes: 20 additions & 12 deletions test/conformance/revision_timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,22 @@ func updateConfigWithTimeout(clients *test.Clients, names test.ResourceNames, re
}

// sendRequests send a request to "domain", returns error if unexpected response code, nil otherwise.
func sendRequest(logger *logging.BaseLogger, clients *test.Clients, domain string, sleepSeconds int, expectedResponseCode int) error {
func sendRequest(logger *logging.BaseLogger, clients *test.Clients, domain string, initialSleepSeconds int, sleepSeconds int, expectedResponseCode int) error {
client, err := pkgTest.NewSpoofingClient(clients.KubeClient, logger, domain, test.ServingFlags.ResolvableDomain)
if err != nil {
logger.Infof("Spoofing client failed: %v", err)
return err
}

initialSleepMs := initialSleepSeconds * 1000
sleepMs := sleepSeconds * 1000

start := time.Now().UnixNano()
defer func() {
end := time.Now().UnixNano()
logger.Infof("domain: %v, sleep: %v, request elapsed %.2f ms", domain, sleepSeconds*1000, float64(end-start)/1e6)
logger.Infof("domain: %v, initialSleep: %v, sleep: %v, request elapsed %.2f ms", domain, initialSleepMs, sleepMs, float64(end-start)/1e6)
}()
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s?timeout=%v", domain, sleepSeconds*1000), nil)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s?initialTimeout=%v&timeout=%v", domain, initialSleepMs, sleepMs), nil)
if err != nil {
logger.Infof("Failed new request: %v", err)
return err
Expand Down Expand Up @@ -193,22 +196,27 @@ func TestRevisionTimeout(t *testing.T) {
t.Fatalf("Error probing domain %s: %v", rev5sDomain, err)
}

if err := sendRequest(logger, clients, rev2sDomain, 0, http.StatusOK); err != nil {
// Quick sanity check
if err := sendRequest(logger, clients, rev2sDomain, 0, 0, http.StatusOK); err != nil {
t.Errorf("Failed request with sleep 0s with revision timeout 2s: %v", err)
}
if err := sendRequest(logger, clients, rev5sDomain, 0, http.StatusOK); err != nil {
if err := sendRequest(logger, clients, rev5sDomain, 0, 0, http.StatusOK); err != nil {
t.Errorf("Failed request with sleep 0s with revision timeout 5s: %v", err)
}
if err := sendRequest(logger, clients, rev2sDomain, 7, http.StatusServiceUnavailable); err != nil {
t.Errorf("Did not fail request with sleep 7s with revision timeout 2s: %v", err)

// Fail by surpassing the initial timeout.
if err := sendRequest(logger, clients, rev2sDomain, 5, 0, http.StatusServiceUnavailable); err != nil {
t.Errorf("Did not fail request with sleep 5s with revision timeout 2s: %v", err)
}
if err := sendRequest(logger, clients, rev5sDomain, 7, http.StatusServiceUnavailable); err != nil {
if err := sendRequest(logger, clients, rev5sDomain, 7, 0, http.StatusServiceUnavailable); err != nil {
t.Errorf("Did not fail request with sleep 7s with revision timeout 5s: %v", err)
}
if err := sendRequest(logger, clients, rev2sDomain, 3, http.StatusServiceUnavailable); err != nil {
t.Errorf("Did not fail request with sleep 3s with revision timeout 2s: %v", err)

// Not fail by not surpassing in the initial timeout, but in the overall request duration.
if err := sendRequest(logger, clients, rev2sDomain, 1, 3, http.StatusOK); err != nil {
t.Errorf("Did not fail request with sleep 1s/3s with revision timeout 2s: %v", err)
}
if err := sendRequest(logger, clients, rev5sDomain, 3, http.StatusOK); err != nil {
t.Errorf("Failed request with sleep 3s with revision timeout 5s: %v", err)
if err := sendRequest(logger, clients, rev5sDomain, 3, 3, http.StatusOK); err != nil {
t.Errorf("Failed request with sleep 3s/3s with revision timeout 5s: %v", err)
}
}
16 changes: 16 additions & 0 deletions test/test_images/timeout/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,24 @@ import (
)

func handler(w http.ResponseWriter, r *http.Request) {
// Sleep for a set amount of time before sending headers
if initialTimeout := r.URL.Query().Get("initialTimeout"); initialTimeout != "" {
parsed, _ := strconv.Atoi(initialTimeout)
time.Sleep(time.Duration(parsed) * time.Millisecond)
}

w.WriteHeader(http.StatusOK)

// Explicitly flush the already written data to trigger (or not)
// the time-to-first-byte timeout.
if f, ok := w.(http.Flusher); ok {
f.Flush()
}

// Sleep for a set amount of time before sending response
timeout, _ := strconv.Atoi(r.URL.Query().Get("timeout"))
time.Sleep(time.Duration(timeout) * time.Millisecond)

fmt.Fprintf(w, "Slept for %d milliseconds", timeout)
}

Expand Down