Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add health endpoint to event_display server #5608

Merged
merged 2 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions cmd/event_display/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Knative Authors
Copyright 2021 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,10 @@ import (
"context"
"fmt"
"log"
"net/http"

cloudevents "github.com/cloudevents/sdk-go/v2"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

/*
Expand All @@ -47,15 +49,42 @@ Data,
}
*/

// display prints the given Event in a human-readable format.
func display(event cloudevents.Event) {
fmt.Printf("☁️ cloudevents.Event\n%s", event.String())
fmt.Printf("☁️ cloudevents.Event\n%s", event)
}

func main() {
c, err := cloudevents.NewClientHTTP()
run(context.Background())
}

func run(ctx context.Context) {
c, err := cloudevents.NewClientHTTP(
cehttp.WithMiddleware(healthzMiddleware()),
)
Comment on lines +62 to +64
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3wscott I needed something quick so I used a Middleware for the time being, but this feels clumsy to me (see the implementation of healthzMiddleware() below which is especially hacky).

Since I'm going to refactor this for testability, I thought now was a good time to ask for your expertise :)

if err != nil {
log.Fatal("Failed to create client, ", err)
log.Fatal("Failed to create client: ", err)
}

log.Fatal(c.StartReceiver(context.Background(), display))
if err := c.StartReceiver(ctx, display); err != nil {
log.Fatal("Error during receiver's runtime: ", err)
}
}

// HTTP path of the health endpoint used for probing the service.
const healthzPath = "/healthz"

// healthzMiddleware returns a cehttp.Middleware which exposes a health
// endpoint by registering a handler in the multiplexer of the CloudEvents HTTP
// client.
func healthzMiddleware() cehttp.Middleware {
return func(next http.Handler) http.Handler {
next.(*http.ServeMux).Handle(healthzPath, http.HandlerFunc(handleHealthz))
return next
}
}

// handleHealthz is a http.Handler which responds to health requests.
func handleHealthz(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
86 changes: 86 additions & 0 deletions cmd/event_display/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2021 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"bytes"
"context"
"fmt"
"net/http"
"testing"
"time"
)

const ceClientURL = "http://localhost:8080"

func TestRun_HealthEndpoint(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)

go run(ctx)
if err := waitForClient(ctx); err != nil {
t.Fatal("Error waiting for CloudEvents receiver:", err)
}

const healthzURL = ceClientURL + healthzPath
const expectStatusCode = http.StatusNoContent

// GET request
resp, err := http.Get(healthzURL)
if err != nil {
t.Fatal("Error sending GET request to health endpoint:", err)
}
if gotStatusCode := resp.StatusCode; gotStatusCode != expectStatusCode {
t.Error("Unexpected status code sending GET request to health endpoint:", gotStatusCode)
}

// POST request
resp, err = http.Post(healthzURL, "text/plain", new(bytes.Buffer))
if err != nil {
t.Fatal("Error sending POST request to health endpoint:", err)
}
if gotStatusCode := resp.StatusCode; gotStatusCode != expectStatusCode {
t.Error("Unexpected status code sending POST request to health endpoint:", gotStatusCode)
}
}

// waitForClient sends requests to the local CloudEvents receiver address until
// a HTTP response is received, or until ctx is cancelled.
func waitForClient(ctx context.Context) error {
httpClient := http.DefaultClient
var httpErr error

tick := time.Tick(5 * time.Millisecond)

for {
select {
case <-tick:
req, err := http.NewRequestWithContext(ctx, http.MethodHead, ceClientURL, nil)
if err != nil {
return fmt.Errorf("creating HTTP request: %w", err)
}

if _, httpErr = httpClient.Do(req); httpErr == nil {
// got a response from CloudEvents client's HTTP receiver
return nil
}

case <-ctx.Done():
return fmt.Errorf("context cancelled: %s. The last HTTP error was: %s", ctx.Err(), httpErr)
}
}
}