From fa7a31e4457119726ef45b85622ef5ca630e0f78 Mon Sep 17 00:00:00 2001 From: Kunal1522 Date: Fri, 2 Jan 2026 23:24:30 +0530 Subject: [PATCH 1/6] Implement TestMetadata_Subscriptions E2E test This test verifies the full Knative Eventing flow using func subscribe: - Creates a producer function that sends CloudEvents to the broker - Creates a subscriber function that receives events via a Trigger - Validates the complete event delivery pipeline Fixes #3202 --- e2e/e2e_metadata_test.go | 319 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 311 insertions(+), 8 deletions(-) diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index 6acd48339b..68ebc3b478 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -5,13 +5,20 @@ package e2e import ( "bytes" + "context" "encoding/json" "fmt" + "io" + "net/http" "os" + "os/exec" "path/filepath" + "strings" "testing" + "time" fn "knative.dev/func/pkg/functions" + "knative.dev/func/pkg/k8s" ) // --------------------------------------------------------------------------- @@ -552,13 +559,309 @@ func Handle(w http.ResponseWriter, _ *http.Request) { } } -// TODO: TestMetadata_Subscriptions ensures that function instances can be -// subscribed to events. +// TestMetadata_Subscriptions verifies the full event flow using Knative Eventing: +// Producer function -> Broker -> Trigger -> Subscriber function func TestMetadata_Subscriptions(t *testing.T) { - // TODO - // Create a function which emits an event with as much defaults as possible - // Create a function which subscribes to those events - // Succeed the test as soon as it receives the event - // https://github.com/knative/func/issues/3202 - t.Skip("Subscription E2E tests not yet implemented") + brokerName := "default" + createBroker(t, Namespace, brokerName) + defer deleteBroker(t, Namespace, brokerName) + + // Create subscriber function that receives CloudEvents + subscriberName := "func-e2e-test-subscriber" + subscriberRoot := fromCleanEnv(t, subscriberName) + + if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil { + t.Fatal(err) + } + + subscriberImpl := `package function + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/cloudevents/sdk-go/v2/event" +) + +func Handle(ctx context.Context, e event.Event) (*event.Event, error) { + os.WriteFile("/tmp/received_event", []byte(e.Type()), 0644) + fmt.Printf("Received event: type=%s, source=%s, id=%s\n", e.Type(), e.Source(), e.ID()) + + response := event.New() + response.SetID(fmt.Sprintf("response-%d", time.Now().UnixNano())) + response.SetSource("subscriber") + response.SetType("test.response") + response.SetData("application/json", map[string]string{ + "received_type": e.Type(), + "status": "received", + }) + return &response, nil +} +` + if err := os.WriteFile(filepath.Join(subscriberRoot, "handle.go"), []byte(subscriberImpl), 0644); err != nil { + t.Fatal(err) + } + + if err := newCmd(t, "subscribe", "--filter", "type=test.event").Run(); err != nil { + t.Fatal(err) + } + + // Verify subscription config + f, err := fn.NewFunction(subscriberRoot) + if err != nil { + t.Fatal(err) + } + if len(f.Deploy.Subscriptions) != 1 { + t.Fatalf("expected 1 subscription, got %d", len(f.Deploy.Subscriptions)) + } + + if err := newCmd(t, "deploy").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, subscriberName, Namespace) + + subscriberURL := fmt.Sprintf("http://%s.%s.%s", subscriberName, Namespace, Domain) + if !waitFor(t, subscriberURL, withTemplate("cloudevents")) { + t.Fatal("subscriber did not become ready") + } + t.Log("Subscriber deployed and ready") + waitForTrigger(t, Namespace, subscriberName) + + // Create producer function that sends CloudEvents to the broker + producerName := "func-e2e-test-producer" + _ = fromCleanEnv(t, producerName) + + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + producerImpl := fmt.Sprintf(`package function + +import ( + "fmt" + "io" + "net/http" + "strings" + "time" +) + +const ( + brokerNamespace = "%s" + brokerName = "%s" +) + +func Handle(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + w.WriteHeader(200) + fmt.Fprintf(w, "Producer is ready") + return + } + + brokerURL := fmt.Sprintf("http://broker-ingress.knative-eventing.svc.cluster.local/%%s/%%s", brokerNamespace, brokerName) + eventBody := `+"`"+`{"message": "hello from producer"}`+"`"+` + httpReq, err := http.NewRequest("POST", brokerURL, strings.NewReader(eventBody)) + if err != nil { + w.WriteHeader(500) + fmt.Fprintf(w, "Failed to create request: %%v", err) + return + } + + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("ce-specversion", "1.0") + httpReq.Header.Set("ce-type", "test.event") + httpReq.Header.Set("ce-source", "producer-function") + httpReq.Header.Set("ce-id", fmt.Sprintf("evt-%%d", time.Now().UnixNano())) + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(httpReq) + if err != nil { + w.WriteHeader(500) + fmt.Fprintf(w, "Failed to send to broker at %%s: %%v", brokerURL, err) + return + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + w.WriteHeader(200) + fmt.Fprintf(w, "Event sent successfully to %%s (status %%d)", brokerURL, resp.StatusCode) + } else { + w.WriteHeader(500) + fmt.Fprintf(w, "Broker %%s returned status %%d. Body: %%s", brokerURL, resp.StatusCode, string(body)) + } +} + +`, Namespace, brokerName) + + producerRoot, _ := os.Getwd() + if err := os.WriteFile(filepath.Join(producerRoot, "handle.go"), []byte(producerImpl), 0644); err != nil { + t.Fatal(err) + } + + if err := newCmd(t, "deploy").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, producerName, Namespace) + + producerURL := fmt.Sprintf("http://%s.%s.%s", producerName, Namespace, Domain) + if !waitFor(t, producerURL, withContentMatch("Producer is ready")) { + t.Fatal("producer did not become ready") + } + t.Log("Producer deployed and ready") + + // Invoke producer to trigger event flow + t.Log("Invoking producer to send event to broker...") + client := http.Client{Timeout: 30 * time.Second} + resp, err := client.Post(producerURL, "application/json", strings.NewReader("{}")) + if err != nil { + t.Fatalf("Failed to invoke producer: %v", err) + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + t.Logf("Producer response: %s (Status: %d)", string(body), resp.StatusCode) + + if resp.StatusCode != 200 { + t.Fatalf("Broker failed to accept event: Status %d. Body: %s", resp.StatusCode, string(body)) + } + + t.Log("Event sent to broker successfully. Waiting for subscriber...") + if !waitFor(t, subscriberURL, withTemplate("cloudevents")) { + t.Fatal("subscriber did not respond after event was sent") + } + t.Log("Event flow verified: Producer -> Broker -> Subscriber") +} + +// createBroker creates a Knative Broker in the given namespace. +func createBroker(t *testing.T, namespace, name string) { + t.Helper() + + brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1 +kind: Broker +metadata: + name: %s + namespace: %s +`, name, namespace) + + cmd := exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(brokerYAML) + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not create broker: %v, output: %s", err, string(output)) + return + } + t.Logf("Created broker %s in namespace %s", name, namespace) + + waitCmd := exec.Command("kubectl", "wait", "--for=condition=Ready", + fmt.Sprintf("broker/%s", name), "-n", namespace, "--timeout=60s") + waitCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + waitOutput, err := waitCmd.CombinedOutput() + if err != nil { + t.Logf("Warning: broker may not be ready: %v, output: %s", err, string(waitOutput)) + } else { + t.Logf("Broker %s is ready", name) + } +} + +// deleteBroker removes a Knative Broker from the given namespace. +func deleteBroker(t *testing.T, namespace, name string) { + t.Helper() + + cmd := exec.Command("kubectl", "delete", "broker", name, "-n", namespace, "--ignore-not-found") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not delete broker: %v, output: %s", err, string(output)) + return + } + t.Logf("Deleted broker %s from namespace %s", name, namespace) +} + +// waitForTrigger waits for the function's trigger to become ready. +func waitForTrigger(t *testing.T, namespace, functionName string) { + t.Helper() + + triggerName := fmt.Sprintf("%s-function-trigger-0", functionName) + + cmd := exec.Command("kubectl", "wait", "--for=condition=Ready", + fmt.Sprintf("trigger/%s", triggerName), "-n", namespace, "--timeout=60s") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: trigger may not be ready: %v, output: %s", err, string(output)) + } else { + t.Logf("Trigger %s is ready", triggerName) + } +} + +// sendEventToBrokerWithRetry attempts to send a CloudEvent to the broker with retries. +func sendEventToBrokerWithRetry(t *testing.T, brokerURL, eventType, eventSource string, maxRetries int) bool { + t.Helper() + + for i := 0; i < maxRetries; i++ { + if i > 0 { + t.Logf("Retry %d/%d for sending event to broker", i+1, maxRetries) + time.Sleep(5 * time.Second) + } + + success := sendEventToBrokerInternal(t, brokerURL, eventType, eventSource) + if success { + return true + } + } + return false +} + +// sendEventToBrokerInternal sends a CloudEvent to the broker using in-cluster networking. +func sendEventToBrokerInternal(t *testing.T, brokerURL, eventType, eventSource string) bool { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + clientConfig := k8s.GetClientConfig() + dialer, err := k8s.NewInClusterDialer(ctx, clientConfig) + if err != nil { + t.Logf("Warning: could not create in-cluster dialer: %v", err) + return false + } + defer dialer.Close() + + transport := &http.Transport{ + DialContext: dialer.DialContext, + } + client := &http.Client{ + Transport: transport, + Timeout: 30 * time.Second, + } + + eventID := fmt.Sprintf("test-event-%d", time.Now().UnixNano()) + eventJSON := fmt.Sprintf(`{"specversion":"1.0","type":"%s","source":"%s","id":"%s","datacontenttype":"application/json","data":{"message":"test subscription event"}}`, eventType, eventSource, eventID) + + req, err := http.NewRequestWithContext(ctx, "POST", brokerURL, strings.NewReader(eventJSON)) + if err != nil { + t.Logf("Warning: could not create request to broker: %v", err) + return false + } + + req.Header.Set("Content-Type", "application/cloudevents+json") + + resp, err := client.Do(req) + if err != nil { + t.Logf("Warning: could not send event to broker: %v", err) + return false + } + defer resp.Body.Close() + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + t.Logf("Event sent to broker successfully (status: %d)", resp.StatusCode) + return true + } + t.Logf("Broker responded with status: %d", resp.StatusCode) + return false } From f87bcef8a3d4e829990ee4a696b45d78dd2ab2c4 Mon Sep 17 00:00:00 2001 From: Kunal1522 Date: Sat, 3 Jan 2026 02:01:57 +0530 Subject: [PATCH 2/6] Fix subscribe command verbose flag compatibility The subscribe command doesn't support the -v (verbose) flag that newCmd() automatically adds when FUNC_E2E_VERBOSE=true. This causes test failures. Changed to use exec.Command directly for the subscribe call to bypass the automatic verbose flag addition while keeping it for other commands. --- e2e/e2e_metadata_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index 68ebc3b478..b0b3482d16 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -604,7 +604,12 @@ func Handle(ctx context.Context, e event.Event) (*event.Event, error) { t.Fatal(err) } - if err := newCmd(t, "subscribe", "--filter", "type=test.event").Run(); err != nil { + // Run func subscribe (without -v flag which subscribe doesn't support) + subscribeCmd := exec.Command(Bin, "subscribe", "--filter", "type=test.event") + subscribeCmd.Stdout = os.Stdout + subscribeCmd.Stderr = os.Stderr + t.Log("$ func subscribe --filter type=test.event") + if err := subscribeCmd.Run(); err != nil { t.Fatal(err) } From 29f18a0ce0b13a84b4fb80da169b83045281e7fa Mon Sep 17 00:00:00 2001 From: Kunal1522 Date: Sat, 3 Jan 2026 02:21:47 +0530 Subject: [PATCH 3/6] Add Knative Eventing availability check to subscription test Check for broker-ingress service before running the test to gracefully skip if Knative Eventing is not installed or not ready yet. --- e2e/e2e_metadata_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index b0b3482d16..3c2ba99211 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -562,6 +562,13 @@ func Handle(w http.ResponseWriter, _ *http.Request) { // TestMetadata_Subscriptions verifies the full event flow using Knative Eventing: // Producer function -> Broker -> Trigger -> Subscriber function func TestMetadata_Subscriptions(t *testing.T) { + // Verify Knative Eventing is installed + checkCmd := exec.Command("kubectl", "get", "svc", "-n", "knative-eventing", "broker-ingress") + checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + if err := checkCmd.Run(); err != nil { + t.Skip("Skipping test: Knative Eventing is not installed (broker-ingress service not found)") + } + brokerName := "default" createBroker(t, Namespace, brokerName) defer deleteBroker(t, Namespace, brokerName) From 471e4853612145b8992d9bd33bd0eee48988d937 Mon Sep 17 00:00:00 2001 From: Kunal1522 Date: Sat, 3 Jan 2026 02:25:58 +0530 Subject: [PATCH 4/6] Add broker-ingress service availability check The test was failing in CI because it tried to send events before the broker-ingress service was registered in DNS. This adds a retry loop to wait for the service to be available before proceeding. Fixes timing issues in fresh cluster environments. --- e2e/e2e_metadata_test.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index 3c2ba99211..6168b2b805 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -562,13 +562,6 @@ func Handle(w http.ResponseWriter, _ *http.Request) { // TestMetadata_Subscriptions verifies the full event flow using Knative Eventing: // Producer function -> Broker -> Trigger -> Subscriber function func TestMetadata_Subscriptions(t *testing.T) { - // Verify Knative Eventing is installed - checkCmd := exec.Command("kubectl", "get", "svc", "-n", "knative-eventing", "broker-ingress") - checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) - if err := checkCmd.Run(); err != nil { - t.Skip("Skipping test: Knative Eventing is not installed (broker-ingress service not found)") - } - brokerName := "default" createBroker(t, Namespace, brokerName) defer deleteBroker(t, Namespace, brokerName) @@ -776,6 +769,20 @@ metadata: } else { t.Logf("Broker %s is ready", name) } + + // Wait for broker-ingress service to be available (critical for CI) + // This ensures DNS has propagated before we try to send events + t.Log("Waiting for broker-ingress service to be available...") + for i := 0; i < 30; i++ { + checkCmd := exec.Command("kubectl", "get", "svc", "-n", "knative-eventing", "broker-ingress") + checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + if err := checkCmd.Run(); err == nil { + t.Log("broker-ingress service is available") + return + } + time.Sleep(2 * time.Second) + } + t.Log("Warning: broker-ingress service check timed out, proceeding anyway") } // deleteBroker removes a Knative Broker from the given namespace. From 963ccb7f85069e79282368a89f58c54bc2bdf99c Mon Sep 17 00:00:00 2001 From: Kunal1522 Date: Mon, 5 Jan 2026 02:12:27 +0530 Subject: [PATCH 5/6] fix: improve subscription test with webhook callback verification - Remove unused sendEventToBroker* functions - Use callback pattern for reliable event verification - Clean up comments and reduce code size --- e2e/e2e_metadata_test.go | 360 +++++++++++++++++++-------------------- 1 file changed, 171 insertions(+), 189 deletions(-) diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index 6168b2b805..6e018482b0 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -9,6 +9,7 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" "os" "os/exec" @@ -18,7 +19,6 @@ import ( "time" fn "knative.dev/func/pkg/functions" - "knative.dev/func/pkg/k8s" ) // --------------------------------------------------------------------------- @@ -559,61 +559,40 @@ func Handle(w http.ResponseWriter, _ *http.Request) { } } -// TestMetadata_Subscriptions verifies the full event flow using Knative Eventing: -// Producer function -> Broker -> Trigger -> Subscriber function +// Tests the complete event flow using func subscribe func TestMetadata_Subscriptions(t *testing.T) { brokerName := "default" - createBroker(t, Namespace, brokerName) + if !createBrokerWithCheck(t, Namespace, brokerName) { + t.Fatal("Failed to create broker") + } defer deleteBroker(t, Namespace, brokerName) - // Create subscriber function that receives CloudEvents + uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) + eventReceived := make(chan string, 10) + + callbackURL, cleanup := startCallbackServer(t, eventReceived) + defer cleanup() + subscriberName := "func-e2e-test-subscriber" subscriberRoot := fromCleanEnv(t, subscriberName) - if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil { t.Fatal(err) } - - subscriberImpl := `package function - -import ( - "context" - "fmt" - "os" - "time" - - "github.com/cloudevents/sdk-go/v2/event" -) - -func Handle(ctx context.Context, e event.Event) (*event.Event, error) { - os.WriteFile("/tmp/received_event", []byte(e.Type()), 0644) - fmt.Printf("Received event: type=%s, source=%s, id=%s\n", e.Type(), e.Source(), e.ID()) - - response := event.New() - response.SetID(fmt.Sprintf("response-%d", time.Now().UnixNano())) - response.SetSource("subscriber") - response.SetType("test.response") - response.SetData("application/json", map[string]string{ - "received_type": e.Type(), - "status": "received", - }) - return &response, nil -} -` - if err := os.WriteFile(filepath.Join(subscriberRoot, "handle.go"), []byte(subscriberImpl), 0644); err != nil { + if err := os.WriteFile(filepath.Join(subscriberRoot, "handle.go"), + []byte(subscriberCode()), 0644); err != nil { t.Fatal(err) } - // Run func subscribe (without -v flag which subscribe doesn't support) subscribeCmd := exec.Command(Bin, "subscribe", "--filter", "type=test.event") - subscribeCmd.Stdout = os.Stdout - subscribeCmd.Stderr = os.Stderr - t.Log("$ func subscribe --filter type=test.event") + subscribeCmd.Stdout, subscribeCmd.Stderr = os.Stdout, os.Stderr if err := subscribeCmd.Run(); err != nil { t.Fatal(err) } + if err := newCmd(t, "config", "envs", "add", + "--name=CALLBACK_URL", "--value="+callbackURL).Run(); err != nil { + t.Fatal(err) + } - // Verify subscription config f, err := fn.NewFunction(subscriberRoot) if err != nil { t.Fatal(err) @@ -629,117 +608,187 @@ func Handle(ctx context.Context, e event.Event) (*event.Event, error) { subscriberURL := fmt.Sprintf("http://%s.%s.%s", subscriberName, Namespace, Domain) if !waitFor(t, subscriberURL, withTemplate("cloudevents")) { - t.Fatal("subscriber did not become ready") + t.Fatal("subscriber not ready") } - t.Log("Subscriber deployed and ready") waitForTrigger(t, Namespace, subscriberName) - // Create producer function that sends CloudEvents to the broker producerName := "func-e2e-test-producer" - _ = fromCleanEnv(t, producerName) - + producerRoot := fromCleanEnv(t, producerName) if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { t.Fatal(err) } + if err := os.WriteFile(filepath.Join(producerRoot, "handle.go"), + []byte(producerCode(Namespace, brokerName)), 0644); err != nil { + t.Fatal(err) + } + if err := newCmd(t, "deploy").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, producerName, Namespace) + + producerURL := fmt.Sprintf("http://%s.%s.%s", producerName, Namespace, Domain) + if !waitFor(t, producerURL, withContentMatch("Producer is ready")) { + t.Fatal("producer not ready") + } + + client := http.Client{Timeout: 30 * time.Second} + resp, err := client.Post(producerURL+"?event_id="+uniqueEventID, "application/json", strings.NewReader("{}")) + if err != nil { + t.Fatalf("Failed to invoke producer: %v", err) + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if resp.StatusCode != 200 { + t.Fatalf("Broker rejected event: %s", body) + } + t.Logf("Broker accepted event %s", uniqueEventID) + + select { + case receivedID := <-eventReceived: + t.Logf("Event flow verified (received: %s)", receivedID) + case <-time.After(60 * time.Second): + t.Fatal("Timeout: No callback from subscriber") + } +} - producerImpl := fmt.Sprintf(`package function +// Starts HTTP server to receive callbacks from subscriber pod +func startCallbackServer(t *testing.T, ch chan<- string) (string, func()) { + t.Helper() + hostIP := getHostIPForCluster(t) + listener, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + port := listener.Addr().(*net.TCPAddr).Port + + mux := http.NewServeMux() + mux.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + select { + case ch <- string(body): + default: + } + w.WriteHeader(200) + }) + + srv := &http.Server{Handler: mux} + go srv.Serve(listener) + return fmt.Sprintf("http://%s:%d/callback", hostIP, port), func() { srv.Shutdown(context.Background()) } +} + +// CloudEvents handler that calls back to test server +func subscriberCode() string { + return `package function import ( + "bytes" + "context" "fmt" - "io" "net/http" - "strings" + "os" "time" + "github.com/cloudevents/sdk-go/v2/event" ) -const ( - brokerNamespace = "%s" - brokerName = "%s" +func Handle(ctx context.Context, e event.Event) (*event.Event, error) { + fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source()) + if url := os.Getenv("CALLBACK_URL"); url != "" { + c := &http.Client{Timeout: 5 * time.Second} + if resp, err := c.Post(url, "text/plain", bytes.NewBufferString(e.ID())); err == nil { + resp.Body.Close() + fmt.Printf("Callback sent to %s\n", url) + } else { + fmt.Printf("Callback failed: %v\n", err) + } + } + r := event.New() + r.SetID("response-" + e.ID()) + r.SetSource("subscriber") + r.SetType("test.response") + r.SetData("application/json", map[string]string{"status": "received"}) + return &r, nil +} +` +} + +// HTTP handler that sends CloudEvents to broker +func producerCode(namespace, broker string) string { + return fmt.Sprintf(`package function + +import ( + "fmt" + "io" + "net/http" + "strings" + "time" ) func Handle(w http.ResponseWriter, r *http.Request) { if r.Method == "GET" { - w.WriteHeader(200) - fmt.Fprintf(w, "Producer is ready") - return - } - - brokerURL := fmt.Sprintf("http://broker-ingress.knative-eventing.svc.cluster.local/%%s/%%s", brokerNamespace, brokerName) - eventBody := `+"`"+`{"message": "hello from producer"}`+"`"+` - httpReq, err := http.NewRequest("POST", brokerURL, strings.NewReader(eventBody)) - if err != nil { - w.WriteHeader(500) - fmt.Fprintf(w, "Failed to create request: %%v", err) + fmt.Fprint(w, "Producer is ready") return } - - httpReq.Header.Set("Content-Type", "application/json") - httpReq.Header.Set("ce-specversion", "1.0") - httpReq.Header.Set("ce-type", "test.event") - httpReq.Header.Set("ce-source", "producer-function") - httpReq.Header.Set("ce-id", fmt.Sprintf("evt-%%d", time.Now().UnixNano())) - - client := &http.Client{Timeout: 30 * time.Second} - resp, err := client.Do(httpReq) + eventID := r.URL.Query().Get("event_id") + if eventID == "" { + eventID = fmt.Sprintf("evt-%%d", time.Now().UnixNano()) + } + url := "http://broker-ingress.knative-eventing.svc.cluster.local/%s/%s" + req, _ := http.NewRequest("POST", url, strings.NewReader(`+"`"+`{}`+"`"+`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("ce-specversion", "1.0") + req.Header.Set("ce-type", "test.event") + req.Header.Set("ce-source", "producer") + req.Header.Set("ce-id", eventID) + resp, err := (&http.Client{Timeout: 30 * time.Second}).Do(req) if err != nil { - w.WriteHeader(500) - fmt.Fprintf(w, "Failed to send to broker at %%s: %%v", brokerURL, err) + http.Error(w, err.Error(), 500) return } defer resp.Body.Close() - body, _ := io.ReadAll(resp.Body) if resp.StatusCode >= 200 && resp.StatusCode < 300 { - w.WriteHeader(200) - fmt.Fprintf(w, "Event sent successfully to %%s (status %%d)", brokerURL, resp.StatusCode) + fmt.Fprintf(w, "Event %%s sent (%%d)", eventID, resp.StatusCode) } else { - w.WriteHeader(500) - fmt.Fprintf(w, "Broker %%s returned status %%d. Body: %%s", brokerURL, resp.StatusCode, string(body)) + http.Error(w, string(body), 500) } } +`, namespace, broker) +} -`, Namespace, brokerName) - - producerRoot, _ := os.Getwd() - if err := os.WriteFile(filepath.Join(producerRoot, "handle.go"), []byte(producerImpl), 0644); err != nil { - t.Fatal(err) - } - - if err := newCmd(t, "deploy").Run(); err != nil { - t.Fatal(err) - } - defer clean(t, producerName, Namespace) - - producerURL := fmt.Sprintf("http://%s.%s.%s", producerName, Namespace, Domain) - if !waitFor(t, producerURL, withContentMatch("Producer is ready")) { - t.Fatal("producer did not become ready") - } - t.Log("Producer deployed and ready") - - // Invoke producer to trigger event flow - t.Log("Invoking producer to send event to broker...") - client := http.Client{Timeout: 30 * time.Second} - resp, err := client.Post(producerURL, "application/json", strings.NewReader("{}")) - if err != nil { - t.Fatalf("Failed to invoke producer: %v", err) +// Returns host IP accessible from kind cluster +func getHostIPForCluster(t *testing.T) string { + t.Helper() + cmd := exec.Command("docker", "network", "inspect", "kind", "-f", "{{(index .IPAM.Config 0).Gateway}}") + output, err := cmd.Output() + if err == nil && len(output) > 0 { + ip := strings.TrimSpace(string(output)) + + if ip != "" && !strings.Contains(ip, ":") { + t.Logf("Using kind network gateway: %s", ip) + return ip + } } - defer resp.Body.Close() - body, _ := io.ReadAll(resp.Body) - t.Logf("Producer response: %s (Status: %d)", string(body), resp.StatusCode) - if resp.StatusCode != 200 { - t.Fatalf("Broker failed to accept event: Status %d. Body: %s", resp.StatusCode, string(body)) + cmd = exec.Command("ip", "route", "get", "1") + output, err = cmd.Output() + if err == nil { + // Parse output like "1.0.0.0 via 192.168.1.1 dev eth0 src 192.168.1.100" + fields := strings.Fields(string(output)) + for i, f := range fields { + if f == "src" && i+1 < len(fields) { + t.Logf("Using host IP from route: %s", fields[i+1]) + return fields[i+1] + } + } } - t.Log("Event sent to broker successfully. Waiting for subscriber...") - if !waitFor(t, subscriberURL, withTemplate("cloudevents")) { - t.Fatal("subscriber did not respond after event was sent") - } - t.Log("Event flow verified: Producer -> Broker -> Subscriber") + // Last resort: use common Docker bridge IP + t.Log("Warning: Could not determine host IP, using 172.17.0.1 (Docker default)") + return "172.17.0.1" } -// createBroker creates a Knative Broker in the given namespace. -func createBroker(t *testing.T, namespace, name string) { +// createBrokerWithCheck creates a Knative Broker and returns true if successful. +func createBrokerWithCheck(t *testing.T, namespace, name string) bool { t.Helper() brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1 @@ -755,8 +804,8 @@ metadata: output, err := cmd.CombinedOutput() if err != nil { - t.Logf("Warning: could not create broker: %v, output: %s", err, string(output)) - return + t.Logf("Failed to create broker: %v, output: %s", err, string(output)) + return false } t.Logf("Created broker %s in namespace %s", name, namespace) @@ -765,24 +814,24 @@ metadata: waitCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) waitOutput, err := waitCmd.CombinedOutput() if err != nil { - t.Logf("Warning: broker may not be ready: %v, output: %s", err, string(waitOutput)) - } else { - t.Logf("Broker %s is ready", name) + t.Logf("Broker not ready: %v, output: %s", err, string(waitOutput)) + return false } + t.Logf("Broker %s is ready", name) - // Wait for broker-ingress service to be available (critical for CI) - // This ensures DNS has propagated before we try to send events + // Wait for broker-ingress service to be available t.Log("Waiting for broker-ingress service to be available...") for i := 0; i < 30; i++ { checkCmd := exec.Command("kubectl", "get", "svc", "-n", "knative-eventing", "broker-ingress") checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) if err := checkCmd.Run(); err == nil { t.Log("broker-ingress service is available") - return + return true } time.Sleep(2 * time.Second) } - t.Log("Warning: broker-ingress service check timed out, proceeding anyway") + t.Log("broker-ingress service check timed out") + return false } // deleteBroker removes a Knative Broker from the given namespace. @@ -817,70 +866,3 @@ func waitForTrigger(t *testing.T, namespace, functionName string) { t.Logf("Trigger %s is ready", triggerName) } } - -// sendEventToBrokerWithRetry attempts to send a CloudEvent to the broker with retries. -func sendEventToBrokerWithRetry(t *testing.T, brokerURL, eventType, eventSource string, maxRetries int) bool { - t.Helper() - - for i := 0; i < maxRetries; i++ { - if i > 0 { - t.Logf("Retry %d/%d for sending event to broker", i+1, maxRetries) - time.Sleep(5 * time.Second) - } - - success := sendEventToBrokerInternal(t, brokerURL, eventType, eventSource) - if success { - return true - } - } - return false -} - -// sendEventToBrokerInternal sends a CloudEvent to the broker using in-cluster networking. -func sendEventToBrokerInternal(t *testing.T, brokerURL, eventType, eventSource string) bool { - t.Helper() - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - clientConfig := k8s.GetClientConfig() - dialer, err := k8s.NewInClusterDialer(ctx, clientConfig) - if err != nil { - t.Logf("Warning: could not create in-cluster dialer: %v", err) - return false - } - defer dialer.Close() - - transport := &http.Transport{ - DialContext: dialer.DialContext, - } - client := &http.Client{ - Transport: transport, - Timeout: 30 * time.Second, - } - - eventID := fmt.Sprintf("test-event-%d", time.Now().UnixNano()) - eventJSON := fmt.Sprintf(`{"specversion":"1.0","type":"%s","source":"%s","id":"%s","datacontenttype":"application/json","data":{"message":"test subscription event"}}`, eventType, eventSource, eventID) - - req, err := http.NewRequestWithContext(ctx, "POST", brokerURL, strings.NewReader(eventJSON)) - if err != nil { - t.Logf("Warning: could not create request to broker: %v", err) - return false - } - - req.Header.Set("Content-Type", "application/cloudevents+json") - - resp, err := client.Do(req) - if err != nil { - t.Logf("Warning: could not send event to broker: %v", err) - return false - } - defer resp.Body.Close() - - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - t.Logf("Event sent to broker successfully (status: %d)", resp.StatusCode) - return true - } - t.Logf("Broker responded with status: %d", resp.StatusCode) - return false -} From f0ea7578ccbfb14b952e0bdad3637673530a5408 Mon Sep 17 00:00:00 2001 From: Kunal1522 Date: Fri, 16 Jan 2026 04:10:31 +0530 Subject: [PATCH 6/6] refactor: simplify event subscription e2e test by removing custom producer and callback server, and using stern to verify event reception. --- e2e/e2e_metadata_test.go | 209 +++++++++++---------------------------- 1 file changed, 59 insertions(+), 150 deletions(-) diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index 6e018482b0..5fd4c8dee3 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -4,21 +4,23 @@ package e2e import ( + "bufio" "bytes" "context" "encoding/json" "fmt" "io" - "net" "net/http" "os" "os/exec" "path/filepath" + "regexp" "strings" "testing" "time" fn "knative.dev/func/pkg/functions" + fnhttp "knative.dev/func/pkg/http" ) // --------------------------------------------------------------------------- @@ -562,18 +564,15 @@ func Handle(w http.ResponseWriter, _ *http.Request) { // Tests the complete event flow using func subscribe func TestMetadata_Subscriptions(t *testing.T) { brokerName := "default" - if !createBrokerWithCheck(t, Namespace, brokerName) { - t.Fatal("Failed to create broker") - } - defer deleteBroker(t, Namespace, brokerName) + + createBrokerWithCheck(t, Namespace, brokerName) uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) - eventReceived := make(chan string, 10) - callbackURL, cleanup := startCallbackServer(t, eventReceived) - defer cleanup() + eventReceived := waitForEvent(t, uniqueEventID) - subscriberName := "func-e2e-test-subscriber" + subscriber := "func-e2e-test-subscriber" + subscriberName := subscriber subscriberRoot := fromCleanEnv(t, subscriberName) if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil { t.Fatal(err) @@ -588,10 +587,6 @@ func TestMetadata_Subscriptions(t *testing.T) { if err := subscribeCmd.Run(); err != nil { t.Fatal(err) } - if err := newCmd(t, "config", "envs", "add", - "--name=CALLBACK_URL", "--value="+callbackURL).Run(); err != nil { - t.Fatal(err) - } f, err := fn.NewFunction(subscriberRoot) if err != nil { @@ -612,34 +607,28 @@ func TestMetadata_Subscriptions(t *testing.T) { } waitForTrigger(t, Namespace, subscriberName) - producerName := "func-e2e-test-producer" - producerRoot := fromCleanEnv(t, producerName) - if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { - t.Fatal(err) - } - if err := os.WriteFile(filepath.Join(producerRoot, "handle.go"), - []byte(producerCode(Namespace, brokerName)), 0644); err != nil { - t.Fatal(err) - } - if err := newCmd(t, "deploy").Run(); err != nil { - t.Fatal(err) - } - defer clean(t, producerName, Namespace) - - producerURL := fmt.Sprintf("http://%s.%s.%s", producerName, Namespace, Domain) - if !waitFor(t, producerURL, withContentMatch("Producer is ready")) { - t.Fatal("producer not ready") + transport := fnhttp.NewRoundTripper() + defer transport.Close() + client := http.Client{ + Transport: transport, + Timeout: 30 * time.Second, } + url := fmt.Sprintf("http://broker-ingress.knative-eventing.svc/%s/%s", Namespace, brokerName) + req, _ := http.NewRequestWithContext(t.Context(), "POST", url, strings.NewReader(`{}`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("ce-specversion", "1.0") + req.Header.Set("ce-type", "test.event") + req.Header.Set("ce-source", "producer") + req.Header.Set("ce-id", uniqueEventID) - client := http.Client{Timeout: 30 * time.Second} - resp, err := client.Post(producerURL+"?event_id="+uniqueEventID, "application/json", strings.NewReader("{}")) + resp, err := client.Do(req) if err != nil { t.Fatalf("Failed to invoke producer: %v", err) } body, _ := io.ReadAll(resp.Body) resp.Body.Close() - if resp.StatusCode != 200 { - t.Fatalf("Broker rejected event: %s", body) + if resp.StatusCode != 202 { + t.Fatalf("Broker rejected event: code: %d, body: %q", resp.StatusCode, body) } t.Logf("Broker accepted event %s", uniqueEventID) @@ -651,56 +640,52 @@ func TestMetadata_Subscriptions(t *testing.T) { } } -// Starts HTTP server to receive callbacks from subscriber pod -func startCallbackServer(t *testing.T, ch chan<- string) (string, func()) { +func waitForEvent(t *testing.T, eventId string) <-chan string { t.Helper() - hostIP := getHostIPForCluster(t) - listener, err := net.Listen("tcp", ":0") + + eventReceived := make(chan string, 10) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + pr, pw := io.Pipe() + cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*") + cmd.Stderr = io.Discard + cmd.Stdout = pw + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + err := cmd.Start() if err != nil { - t.Fatalf("Failed to listen: %v", err) + t.Fatal(err) } - port := listener.Addr().(*net.TCPAddr).Port - - mux := http.NewServeMux() - mux.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) { - body, _ := io.ReadAll(r.Body) - select { - case ch <- string(body): - default: + go func() { + r := bufio.NewReader(pr) + m, e := regexp.MatchReader(`EVENT_RECEIVED: id=`+eventId, r) + if e != nil { + panic(e) } - w.WriteHeader(200) - }) + if m { + eventReceived <- "OK" + close(eventReceived) + cancel() + } + _, _ = io.Copy(io.Discard, r) + }() - srv := &http.Server{Handler: mux} - go srv.Serve(listener) - return fmt.Sprintf("http://%s:%d/callback", hostIP, port), func() { srv.Shutdown(context.Background()) } + return eventReceived } -// CloudEvents handler that calls back to test server +// CloudEvents handler that logs events func subscriberCode() string { return `package function import ( - "bytes" "context" "fmt" - "net/http" - "os" - "time" "github.com/cloudevents/sdk-go/v2/event" ) func Handle(ctx context.Context, e event.Event) (*event.Event, error) { fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source()) - if url := os.Getenv("CALLBACK_URL"); url != "" { - c := &http.Client{Timeout: 5 * time.Second} - if resp, err := c.Post(url, "text/plain", bytes.NewBufferString(e.ID())); err == nil { - resp.Body.Close() - fmt.Printf("Callback sent to %s\n", url) - } else { - fmt.Printf("Callback failed: %v\n", err) - } - } r := event.New() r.SetID("response-" + e.ID()) r.SetSource("subscriber") @@ -711,84 +696,8 @@ func Handle(ctx context.Context, e event.Event) (*event.Event, error) { ` } -// HTTP handler that sends CloudEvents to broker -func producerCode(namespace, broker string) string { - return fmt.Sprintf(`package function - -import ( - "fmt" - "io" - "net/http" - "strings" - "time" -) - -func Handle(w http.ResponseWriter, r *http.Request) { - if r.Method == "GET" { - fmt.Fprint(w, "Producer is ready") - return - } - eventID := r.URL.Query().Get("event_id") - if eventID == "" { - eventID = fmt.Sprintf("evt-%%d", time.Now().UnixNano()) - } - url := "http://broker-ingress.knative-eventing.svc.cluster.local/%s/%s" - req, _ := http.NewRequest("POST", url, strings.NewReader(`+"`"+`{}`+"`"+`)) - req.Header.Set("Content-Type", "application/json") - req.Header.Set("ce-specversion", "1.0") - req.Header.Set("ce-type", "test.event") - req.Header.Set("ce-source", "producer") - req.Header.Set("ce-id", eventID) - resp, err := (&http.Client{Timeout: 30 * time.Second}).Do(req) - if err != nil { - http.Error(w, err.Error(), 500) - return - } - defer resp.Body.Close() - body, _ := io.ReadAll(resp.Body) - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - fmt.Fprintf(w, "Event %%s sent (%%d)", eventID, resp.StatusCode) - } else { - http.Error(w, string(body), 500) - } -} -`, namespace, broker) -} - -// Returns host IP accessible from kind cluster -func getHostIPForCluster(t *testing.T) string { - t.Helper() - cmd := exec.Command("docker", "network", "inspect", "kind", "-f", "{{(index .IPAM.Config 0).Gateway}}") - output, err := cmd.Output() - if err == nil && len(output) > 0 { - ip := strings.TrimSpace(string(output)) - - if ip != "" && !strings.Contains(ip, ":") { - t.Logf("Using kind network gateway: %s", ip) - return ip - } - } - - cmd = exec.Command("ip", "route", "get", "1") - output, err = cmd.Output() - if err == nil { - // Parse output like "1.0.0.0 via 192.168.1.1 dev eth0 src 192.168.1.100" - fields := strings.Fields(string(output)) - for i, f := range fields { - if f == "src" && i+1 < len(fields) { - t.Logf("Using host IP from route: %s", fields[i+1]) - return fields[i+1] - } - } - } - - // Last resort: use common Docker bridge IP - t.Log("Warning: Could not determine host IP, using 172.17.0.1 (Docker default)") - return "172.17.0.1" -} - -// createBrokerWithCheck creates a Knative Broker and returns true if successful. -func createBrokerWithCheck(t *testing.T, namespace, name string) bool { +// createBrokerWithCheck creates a Knative Broker +func createBrokerWithCheck(t *testing.T, namespace, name string) { t.Helper() brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1 @@ -804,9 +713,11 @@ metadata: output, err := cmd.CombinedOutput() if err != nil { - t.Logf("Failed to create broker: %v, output: %s", err, string(output)) - return false + t.Fatalf("Failed to create broker: %v, output: %s", err, string(output)) } + t.Cleanup(func() { + deleteBroker(t, namespace, name) + }) t.Logf("Created broker %s in namespace %s", name, namespace) waitCmd := exec.Command("kubectl", "wait", "--for=condition=Ready", @@ -815,7 +726,6 @@ metadata: waitOutput, err := waitCmd.CombinedOutput() if err != nil { t.Logf("Broker not ready: %v, output: %s", err, string(waitOutput)) - return false } t.Logf("Broker %s is ready", name) @@ -826,12 +736,11 @@ metadata: checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) if err := checkCmd.Run(); err == nil { t.Log("broker-ingress service is available") - return true + return } time.Sleep(2 * time.Second) } - t.Log("broker-ingress service check timed out") - return false + t.Fatal("broker-ingress service check timed out") } // deleteBroker removes a Knative Broker from the given namespace.