diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index 6acd48339b..5fd4c8dee3 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -4,14 +4,23 @@ package e2e import ( + "bufio" "bytes" + "context" "encoding/json" "fmt" + "io" + "net/http" "os" + "os/exec" "path/filepath" + "regexp" + "strings" "testing" + "time" fn "knative.dev/func/pkg/functions" + fnhttp "knative.dev/func/pkg/http" ) // --------------------------------------------------------------------------- @@ -552,13 +561,217 @@ func Handle(w http.ResponseWriter, _ *http.Request) { } } -// TODO: TestMetadata_Subscriptions ensures that function instances can be -// subscribed to events. +// Tests the complete event flow using func subscribe func TestMetadata_Subscriptions(t *testing.T) { - // TODO - // Create a function which emits an event with as much defaults as possible - // Create a function which subscribes to those events - // Succeed the test as soon as it receives the event - // https://github.com/knative/func/issues/3202 - t.Skip("Subscription E2E tests not yet implemented") + brokerName := "default" + + createBrokerWithCheck(t, Namespace, brokerName) + + uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) + + eventReceived := waitForEvent(t, uniqueEventID) + + subscriber := "func-e2e-test-subscriber" + subscriberName := subscriber + subscriberRoot := fromCleanEnv(t, subscriberName) + if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(subscriberRoot, "handle.go"), + []byte(subscriberCode()), 0644); err != nil { + t.Fatal(err) + } + + subscribeCmd := exec.Command(Bin, "subscribe", "--filter", "type=test.event") + subscribeCmd.Stdout, subscribeCmd.Stderr = os.Stdout, os.Stderr + if err := subscribeCmd.Run(); err != nil { + t.Fatal(err) + } + + f, err := fn.NewFunction(subscriberRoot) + if err != nil { + t.Fatal(err) + } + if len(f.Deploy.Subscriptions) != 1 { + t.Fatalf("expected 1 subscription, got %d", len(f.Deploy.Subscriptions)) + } + + if err := newCmd(t, "deploy").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, subscriberName, Namespace) + + subscriberURL := fmt.Sprintf("http://%s.%s.%s", subscriberName, Namespace, Domain) + if !waitFor(t, subscriberURL, withTemplate("cloudevents")) { + t.Fatal("subscriber not ready") + } + waitForTrigger(t, Namespace, subscriberName) + + transport := fnhttp.NewRoundTripper() + defer transport.Close() + client := http.Client{ + Transport: transport, + Timeout: 30 * time.Second, + } + url := fmt.Sprintf("http://broker-ingress.knative-eventing.svc/%s/%s", Namespace, brokerName) + req, _ := http.NewRequestWithContext(t.Context(), "POST", url, strings.NewReader(`{}`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("ce-specversion", "1.0") + req.Header.Set("ce-type", "test.event") + req.Header.Set("ce-source", "producer") + req.Header.Set("ce-id", uniqueEventID) + + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Failed to invoke producer: %v", err) + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if resp.StatusCode != 202 { + t.Fatalf("Broker rejected event: code: %d, body: %q", resp.StatusCode, body) + } + t.Logf("Broker accepted event %s", uniqueEventID) + + select { + case receivedID := <-eventReceived: + t.Logf("Event flow verified (received: %s)", receivedID) + case <-time.After(60 * time.Second): + t.Fatal("Timeout: No callback from subscriber") + } +} + +func waitForEvent(t *testing.T, eventId string) <-chan string { + t.Helper() + + eventReceived := make(chan string, 10) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + pr, pw := io.Pipe() + cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*") + cmd.Stderr = io.Discard + cmd.Stdout = pw + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + err := cmd.Start() + if err != nil { + t.Fatal(err) + } + go func() { + r := bufio.NewReader(pr) + m, e := regexp.MatchReader(`EVENT_RECEIVED: id=`+eventId, r) + if e != nil { + panic(e) + } + if m { + eventReceived <- "OK" + close(eventReceived) + cancel() + } + _, _ = io.Copy(io.Discard, r) + }() + + return eventReceived +} + +// CloudEvents handler that logs events +func subscriberCode() string { + return `package function + +import ( + "context" + "fmt" + "github.com/cloudevents/sdk-go/v2/event" +) + +func Handle(ctx context.Context, e event.Event) (*event.Event, error) { + fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source()) + r := event.New() + r.SetID("response-" + e.ID()) + r.SetSource("subscriber") + r.SetType("test.response") + r.SetData("application/json", map[string]string{"status": "received"}) + return &r, nil +} +` +} + +// createBrokerWithCheck creates a Knative Broker +func createBrokerWithCheck(t *testing.T, namespace, name string) { + t.Helper() + + brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1 +kind: Broker +metadata: + name: %s + namespace: %s +`, name, namespace) + + cmd := exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(brokerYAML) + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("Failed to create broker: %v, output: %s", err, string(output)) + } + t.Cleanup(func() { + deleteBroker(t, namespace, name) + }) + t.Logf("Created broker %s in namespace %s", name, namespace) + + waitCmd := exec.Command("kubectl", "wait", "--for=condition=Ready", + fmt.Sprintf("broker/%s", name), "-n", namespace, "--timeout=60s") + waitCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + waitOutput, err := waitCmd.CombinedOutput() + if err != nil { + t.Logf("Broker not ready: %v, output: %s", err, string(waitOutput)) + } + t.Logf("Broker %s is ready", name) + + // Wait for broker-ingress service to be available + t.Log("Waiting for broker-ingress service to be available...") + for i := 0; i < 30; i++ { + checkCmd := exec.Command("kubectl", "get", "svc", "-n", "knative-eventing", "broker-ingress") + checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + if err := checkCmd.Run(); err == nil { + t.Log("broker-ingress service is available") + return + } + time.Sleep(2 * time.Second) + } + t.Fatal("broker-ingress service check timed out") +} + +// deleteBroker removes a Knative Broker from the given namespace. +func deleteBroker(t *testing.T, namespace, name string) { + t.Helper() + + cmd := exec.Command("kubectl", "delete", "broker", name, "-n", namespace, "--ignore-not-found") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not delete broker: %v, output: %s", err, string(output)) + return + } + t.Logf("Deleted broker %s from namespace %s", name, namespace) +} + +// waitForTrigger waits for the function's trigger to become ready. +func waitForTrigger(t *testing.T, namespace, functionName string) { + t.Helper() + + triggerName := fmt.Sprintf("%s-function-trigger-0", functionName) + + cmd := exec.Command("kubectl", "wait", "--for=condition=Ready", + fmt.Sprintf("trigger/%s", triggerName), "-n", namespace, "--timeout=60s") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: trigger may not be ready: %v, output: %s", err, string(output)) + } else { + t.Logf("Trigger %s is ready", triggerName) + } }