Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 221 additions & 8 deletions e2e/e2e_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,23 @@
package e2e

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"testing"
"time"

fn "knative.dev/func/pkg/functions"
fnhttp "knative.dev/func/pkg/http"
)

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -552,13 +561,217 @@ func Handle(w http.ResponseWriter, _ *http.Request) {
}
}

// TODO: TestMetadata_Subscriptions ensures that function instances can be
// subscribed to events.
// Tests the complete event flow using func subscribe
func TestMetadata_Subscriptions(t *testing.T) {
// TODO
// Create a function which emits an event with as much defaults as possible
// Create a function which subscribes to those events
// Succeed the test as soon as it receives the event
// https://github.com/knative/func/issues/3202
t.Skip("Subscription E2E tests not yet implemented")
brokerName := "default"

createBrokerWithCheck(t, Namespace, brokerName)

uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano())

eventReceived := waitForEvent(t, uniqueEventID)

subscriber := "func-e2e-test-subscriber"
subscriberName := subscriber
subscriberRoot := fromCleanEnv(t, subscriberName)
if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(subscriberRoot, "handle.go"),
[]byte(subscriberCode()), 0644); err != nil {
t.Fatal(err)
}

subscribeCmd := exec.Command(Bin, "subscribe", "--filter", "type=test.event")
subscribeCmd.Stdout, subscribeCmd.Stderr = os.Stdout, os.Stderr
if err := subscribeCmd.Run(); err != nil {
t.Fatal(err)
}

f, err := fn.NewFunction(subscriberRoot)
if err != nil {
t.Fatal(err)
}
if len(f.Deploy.Subscriptions) != 1 {
t.Fatalf("expected 1 subscription, got %d", len(f.Deploy.Subscriptions))
}

if err := newCmd(t, "deploy").Run(); err != nil {
t.Fatal(err)
}
defer clean(t, subscriberName, Namespace)

subscriberURL := fmt.Sprintf("http://%s.%s.%s", subscriberName, Namespace, Domain)
if !waitFor(t, subscriberURL, withTemplate("cloudevents")) {
t.Fatal("subscriber not ready")
}
waitForTrigger(t, Namespace, subscriberName)

transport := fnhttp.NewRoundTripper()
defer transport.Close()
client := http.Client{
Transport: transport,
Timeout: 30 * time.Second,
}
url := fmt.Sprintf("http://broker-ingress.knative-eventing.svc/%s/%s", Namespace, brokerName)
req, _ := http.NewRequestWithContext(t.Context(), "POST", url, strings.NewReader(`{}`))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("ce-specversion", "1.0")
req.Header.Set("ce-type", "test.event")
req.Header.Set("ce-source", "producer")
req.Header.Set("ce-id", uniqueEventID)

resp, err := client.Do(req)
if err != nil {
t.Fatalf("Failed to invoke producer: %v", err)
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
if resp.StatusCode != 202 {
t.Fatalf("Broker rejected event: code: %d, body: %q", resp.StatusCode, body)
}
t.Logf("Broker accepted event %s", uniqueEventID)

select {
case receivedID := <-eventReceived:
t.Logf("Event flow verified (received: %s)", receivedID)
case <-time.After(60 * time.Second):
t.Fatal("Timeout: No callback from subscriber")
}
}

func waitForEvent(t *testing.T, eventId string) <-chan string {
t.Helper()

eventReceived := make(chan string, 10)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

pr, pw := io.Pipe()
cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*")
cmd.Stderr = io.Discard
cmd.Stdout = pw
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
err := cmd.Start()
if err != nil {
t.Fatal(err)
}
go func() {
r := bufio.NewReader(pr)
m, e := regexp.MatchReader(`EVENT_RECEIVED: id=`+eventId, r)
if e != nil {
panic(e)
}
if m {
eventReceived <- "OK"
close(eventReceived)
cancel()
}
_, _ = io.Copy(io.Discard, r)
}()

return eventReceived
}

// CloudEvents handler that logs events
func subscriberCode() string {
return `package function

import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/v2/event"
)

func Handle(ctx context.Context, e event.Event) (*event.Event, error) {
fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source())
r := event.New()
r.SetID("response-" + e.ID())
r.SetSource("subscriber")
r.SetType("test.response")
r.SetData("application/json", map[string]string{"status": "received"})
return &r, nil
}
`
}

// createBrokerWithCheck creates a Knative Broker
func createBrokerWithCheck(t *testing.T, namespace, name string) {
t.Helper()

brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: %s
namespace: %s
`, name, namespace)

cmd := exec.Command("kubectl", "apply", "-f", "-")
cmd.Stdin = strings.NewReader(brokerYAML)
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)

output, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("Failed to create broker: %v, output: %s", err, string(output))
}
t.Cleanup(func() {
deleteBroker(t, namespace, name)
})
t.Logf("Created broker %s in namespace %s", name, namespace)

waitCmd := exec.Command("kubectl", "wait", "--for=condition=Ready",
fmt.Sprintf("broker/%s", name), "-n", namespace, "--timeout=60s")
waitCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
waitOutput, err := waitCmd.CombinedOutput()
if err != nil {
t.Logf("Broker not ready: %v, output: %s", err, string(waitOutput))
}
t.Logf("Broker %s is ready", name)

// Wait for broker-ingress service to be available
t.Log("Waiting for broker-ingress service to be available...")
for i := 0; i < 30; i++ {
checkCmd := exec.Command("kubectl", "get", "svc", "-n", "knative-eventing", "broker-ingress")
checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
if err := checkCmd.Run(); err == nil {
t.Log("broker-ingress service is available")
return
}
time.Sleep(2 * time.Second)
}
t.Fatal("broker-ingress service check timed out")
}

// deleteBroker removes a Knative Broker from the given namespace.
func deleteBroker(t *testing.T, namespace, name string) {
t.Helper()

cmd := exec.Command("kubectl", "delete", "broker", name, "-n", namespace, "--ignore-not-found")
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)

output, err := cmd.CombinedOutput()
if err != nil {
t.Logf("Warning: could not delete broker: %v, output: %s", err, string(output))
return
}
t.Logf("Deleted broker %s from namespace %s", name, namespace)
}

// waitForTrigger waits for the function's trigger to become ready.
func waitForTrigger(t *testing.T, namespace, functionName string) {
t.Helper()

triggerName := fmt.Sprintf("%s-function-trigger-0", functionName)

cmd := exec.Command("kubectl", "wait", "--for=condition=Ready",
fmt.Sprintf("trigger/%s", triggerName), "-n", namespace, "--timeout=60s")
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)

output, err := cmd.CombinedOutput()
if err != nil {
t.Logf("Warning: trigger may not be ready: %v, output: %s", err, string(output))
} else {
t.Logf("Trigger %s is ready", triggerName)
}
}
Loading