Skip to content

Conversation

@Kunal1522
Copy link
Contributor

Fixes #3202

What's in the test

Main test function:

  • TestMetadata_Subscriptions - The actual test. Sets up a Broker, deploys two functions (subscriber and producer), fires an event, and checks the subscriber's logs to confirm it received the event through the Broker.
    Helper functions:
  • createBroker(t, namespace, name) - Creates a Knative Broker using kubectl apply. Waits for it to be ready before returning.
  • deleteBroker(t, namespace, name) - Cleanup helper that deletes the Broker when the test finishes.
  • waitForTrigger(t, namespace, functionName) - Waits for the Knative Trigger (created by func subscribe) to reach Ready state. Without this, events might get lost if we send them too early.
    Unused legacy helpers (kept for potential future use):
  • sendEventToBrokerWithRetry - Retry wrapper for sending events (not currently used, test uses direct HTTP instead)
  • sendEventToBrokerInternal - In-cluster event sending using Kubernetes port-forwarding (not currently used, test deploys a Producer function instead)

This test verifies the full Knative Eventing flow using func subscribe:
- Creates a producer function that sends CloudEvents to the broker
- Creates a subscriber function that receives events via a Trigger
- Validates the complete event delivery pipeline

Fixes knative#3202
@knative-prow
Copy link

knative-prow bot commented Jan 2, 2026

Hi @Kunal1522. Thanks for your PR.

I'm waiting for a github.com member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@knative-prow knative-prow bot added needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Jan 2, 2026
@codecov
Copy link

codecov bot commented Jan 2, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 55.24%. Comparing base (039b926) to head (f0ea757).
⚠️ Report is 29 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3324      +/-   ##
==========================================
+ Coverage   55.09%   55.24%   +0.14%     
==========================================
  Files         170      170              
  Lines       19850    19963     +113     
==========================================
+ Hits        10937    11029      +92     
- Misses       7845     7855      +10     
- Partials     1068     1079      +11     
Flag Coverage Δ
e2e 39.80% <ø> (+0.21%) ⬆️
e2e go 35.41% <ø> (-0.21%) ⬇️
e2e node 30.96% <ø> (-0.18%) ⬇️
e2e python 35.07% <ø> (-0.29%) ⬇️
e2e quarkus 31.11% <ø> (-0.18%) ⬇️
e2e rust 30.57% <ø> (-0.17%) ⬇️
e2e springboot 30.60% <ø> (-0.18%) ⬇️
e2e typescript 31.07% <ø> (-0.18%) ⬇️
integration 17.41% <ø> (-0.08%) ⬇️
unit macos-14 45.11% <ø> (+0.14%) ⬆️
unit macos-latest 45.11% <ø> (+0.14%) ⬆️
unit ubuntu-24.04-arm 45.23% <ø> (+0.15%) ⬆️
unit ubuntu-latest 46.00% <ø> (+0.13%) ⬆️
unit windows-latest 45.13% <ø> (+0.14%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

The subscribe command doesn't support the -v (verbose) flag that newCmd()
automatically adds when FUNC_E2E_VERBOSE=true. This causes test failures.

Changed to use exec.Command directly for the subscribe call to bypass
the automatic verbose flag addition while keeping it for other commands.
Check for broker-ingress service before running the test to gracefully
skip if Knative Eventing is not installed or not ready yet.
The test was failing in CI because it tried to send events before
the broker-ingress service was registered in DNS. This adds a retry
loop to wait for the service to be available before proceeding.

Fixes timing issues in fresh cluster environments.
@Kunal1522 Kunal1522 marked this pull request as draft January 2, 2026 21:14
@knative-prow knative-prow bot added the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jan 2, 2026
@Kunal1522 Kunal1522 marked this pull request as ready for review January 3, 2026 10:21
@knative-prow knative-prow bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jan 3, 2026
- Remove unused sendEventToBroker* functions
- Use callback pattern for reliable event verification
- Clean up comments and reduce code size
@Kunal1522
Copy link
Contributor Author

Kunal1522 commented Jan 4, 2026

i spent quite a bit of time trying different approaches to get the producer, subscriber, and broker pods to talk back to the test, basically saying "hey, i got the event." turns out a lot of things like reading pod logs, writing files inside containers, or hitting the broker directly don't really work because of container isolation. so instead of the test trying to pull info from pods, i just flipped it. now the subscriber pushes a callback to the test. the test spins up a small http server on a dynamic port, passes the url via CALLBACK_URL env var, and when the subscriber gets an event it posts the event id back. a go channel catches this and the test knows everything worked. hopefully this solves the todo at line 558-563 e2e/e2e_metadata_test.go

@Kunal1522
Copy link
Contributor Author

/ok-to-test

@knative-prow
Copy link

knative-prow bot commented Jan 4, 2026

@Kunal1522: Cannot trigger testing until a trusted user reviews the PR and leaves an /ok-to-test message.

Details

In response to this:

/ok-to-test

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@intojhanurag
Copy link
Member

intojhanurag commented Jan 5, 2026

i spent quite a bit of time trying different approaches to get the producer, subscriber, and broker pods to talk back to the test, basically saying "hey, i got the event." turns out a lot of things like reading pod logs, writing files inside containers, or hitting the broker directly don't really work because of container isolation. so instead of the test trying to pull info from pods, i just flipped it. now the subscriber pushes a callback to the test. the test spins up a small http server on a dynamic port, passes the url via CALLBACK_URL env var, and when the subscriber gets an event it posts the event id back. a go channel catches this and the test knows everything worked. hopefully this solves the todo at line 558-563 e2e/e2e_metadata_test.go

Yeah It seems good now , we should use defer for cleanup i think, not sure :)
/cc @gauron99

@intojhanurag
Copy link
Member

/ok-to-test

@knative-prow knative-prow bot added ok-to-test Indicates a non-member PR verified by an org member that is safe to test. and removed needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. labels Jan 5, 2026
@knative-prow knative-prow bot requested a review from gauron99 January 5, 2026 08:32
@matejvasek
Copy link
Contributor

PTAL @jrangelramos

Comment on lines 565 to 568
if !createBrokerWithCheck(t, Namespace, brokerName) {
t.Fatal("Failed to create broker")
}
defer deleteBroker(t, Namespace, brokerName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do the t.Fatal() from within createBrokerWithCheck(). Also you can install clean up in createBrokerWithCheck() using t.Cleanup() in there too. So these 4 lines turn into just one.

Comment on lines 573 to 574
callbackURL, cleanup := startCallbackServer(t, eventReceived)
defer cleanup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again install the cleanup routine in startCallbackServer() using t.Cleanup().

@matejvasek
Copy link
Contributor

I am fine with calling back host, however I am curious why just tapping to the consumer logs would not be sufficient?

}

// HTTP handler that sends CloudEvents to broker
func producerCode(namespace, broker string) string {
Copy link
Contributor

@matejvasek matejvasek Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this service? Cannot we directly push an event to the broker from the host?
like so:

// THIS RUNS DIRECTLY IN TEST ON HOST
transport := fnhttp.NewRoundTripper() // our special transport that can dial services in cluster
defer transport.Close()
client := http.Client{
    Transport: transport,
    Timeout:   30 * time.Second,
}
eventID := "xyz"
url := "http://broker-ingress.knative-eventing.svc/NAMESPACE/BROKER"
req, _ := http.NewRequest("POST", url, strings.NewReader(`{}`))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("ce-specversion", "1.0")
req.Header.Set("ce-type", "test.event")
req.Header.Set("ce-source", "producer")
req.Header.Set("ce-id", eventID)
resp, err := client.Do(req)
if err != nil {
    return err
}
defer resp.Body.Close()

Copy link
Contributor

@matejvasek matejvasek Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively you can temporarily expose the broker via ingress:

func exposeBroker(t *testing.T) {
	ingress := `apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: broker-ingress
  namespace: knative-eventing
spec:
  ingressClassName: contour-external
  rules:
    - host: broker.localtest.me
      http:
        paths:
          - backend:
              service:
                name: broker-ingress
                port:
                  number: 80
            pathType: Prefix
            path: /`

	cmd := exec.Command("kubectl", "apply", "-f", "-")
	cmd.Stdin = strings.NewReader(ingress)
	cmd.Stdout = os.Stderr
	cmd.Stderr = os.Stderr
	cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
	err := cmd.Run()
	if err != nil {
		t.Fatal(err)
	}

	t.Cleanup(func() {
		cmd := exec.Command("kubectl", "delete", "ingress", "broker-ingress", "-n", "knative-eventing", "--ignore-not-found")
		cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
		_ = cmd.Run()
	})
}

Then you can call broker from the host as http://broker.localtest.me/<namespace>/<broker>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we may consider exposing the broker permanently in hack/cluster.sh if it is useful for testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some consideration it would be probably best to just use the tunneling transport (fnhttp.NewRoundTripper()). It should work easily also on clusters other than our testing KinD cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am trying this out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this service? Cannot we directly push an event to the broker from the host? like so:

// THIS RUNS DIRECTLY IN TEST ON HOST
transport := fnhttp.NewRoundTripper() // our special transport that can dial services in cluster
defer transport.Close()
client := http.Client{
    Transport: transport,
    Timeout:   30 * time.Second,
}
eventID := "xyz"
url := "http://broker-ingress.knative-eventing.svc/NAMESPACE/BROKER"
req, _ := http.NewRequest("POST", url, strings.NewReader(`{}`))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("ce-specversion", "1.0")
req.Header.Set("ce-type", "test.event")
req.Header.Set("ce-source", "producer")
req.Header.Set("ce-id", eventID)
resp, err := client.Do(req)
if err != nil {
    return err
}
defer resp.Body.Close()

Thanks for pointing this out . I thought the broker URL (broker-ingress.knative-eventing.svc.cluster.local) was unreachable from the host since it uses Kubernetes internal DNS .so I deployed a producer function t to send event

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not really accessible. However we have custom dialer/transport that does tunneling for us.

transport := fnhttp.NewRoundTripper() // our special transport that can dial services in cluster
defer transport.Close()
client := http.Client{
    Transport: transport,
    Timeout:   30 * time.Second,
}

above code basically create Go HTTP client that internally may spawn helper pod in cluster and do TCP dial using that pod.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// NewRoundTripper returns new closable RoundTripper that first tries to dial connection in standard way,
// if the dial operation fails due to hostname resolution the RoundTripper tries to dial from in cluster pod.
//
// This is useful for accessing cluster internal services (pushing a CloudEvent into Knative broker).
func NewRoundTripper(opts ...Option) RoundTripCloser {

@matejvasek
Copy link
Contributor

Few comments, otherwise great job.

@matejvasek
Copy link
Contributor

matejvasek commented Jan 14, 2026

I am fine with calling back host, however I am curious why just tapping to the consumer logs would not be sufficient?

The thing is: for me it appears the callback is suppressed FW.
EDIT: or maybe it's not FW, maybe it's the fact I use rootless podman.
Either way it does not work on my machine.

@matejvasek
Copy link
Contributor

possible improvements:

diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go
index 6e018482b..d4d2c3791 100644
--- a/e2e/e2e_metadata_test.go
+++ b/e2e/e2e_metadata_test.go
@@ -4,21 +4,23 @@
 package e2e
 
 import (
+	"bufio"
 	"bytes"
 	"context"
 	"encoding/json"
 	"fmt"
 	"io"
-	"net"
 	"net/http"
 	"os"
 	"os/exec"
 	"path/filepath"
+	"regexp"
 	"strings"
 	"testing"
 	"time"
 
 	fn "knative.dev/func/pkg/functions"
+	fnhttp "knative.dev/func/pkg/http"
 )
 
 // ---------------------------------------------------------------------------
@@ -562,18 +564,15 @@ func Handle(w http.ResponseWriter, _ *http.Request) {
 // Tests the complete event flow using func subscribe
 func TestMetadata_Subscriptions(t *testing.T) {
 	brokerName := "default"
-	if !createBrokerWithCheck(t, Namespace, brokerName) {
-		t.Fatal("Failed to create broker")
-	}
-	defer deleteBroker(t, Namespace, brokerName)
+
+	createBrokerWithCheck(t, Namespace, brokerName)
 
 	uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano())
-	eventReceived := make(chan string, 10)
 
-	callbackURL, cleanup := startCallbackServer(t, eventReceived)
-	defer cleanup()
+	eventReceived := waitForEvent(t, uniqueEventID)
 
-	subscriberName := "func-e2e-test-subscriber"
+	subscriber := "func-e2e-test-subscriber"
+	subscriberName := subscriber
 	subscriberRoot := fromCleanEnv(t, subscriberName)
 	if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil {
 		t.Fatal(err)
@@ -588,10 +587,6 @@ func TestMetadata_Subscriptions(t *testing.T) {
 	if err := subscribeCmd.Run(); err != nil {
 		t.Fatal(err)
 	}
-	if err := newCmd(t, "config", "envs", "add",
-		"--name=CALLBACK_URL", "--value="+callbackURL).Run(); err != nil {
-		t.Fatal(err)
-	}
 
 	f, err := fn.NewFunction(subscriberRoot)
 	if err != nil {
@@ -612,34 +607,28 @@ func TestMetadata_Subscriptions(t *testing.T) {
 	}
 	waitForTrigger(t, Namespace, subscriberName)
 
-	producerName := "func-e2e-test-producer"
-	producerRoot := fromCleanEnv(t, producerName)
-	if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil {
-		t.Fatal(err)
-	}
-	if err := os.WriteFile(filepath.Join(producerRoot, "handle.go"),
-		[]byte(producerCode(Namespace, brokerName)), 0644); err != nil {
-		t.Fatal(err)
-	}
-	if err := newCmd(t, "deploy").Run(); err != nil {
-		t.Fatal(err)
-	}
-	defer clean(t, producerName, Namespace)
-
-	producerURL := fmt.Sprintf("http://%s.%s.%s", producerName, Namespace, Domain)
-	if !waitFor(t, producerURL, withContentMatch("Producer is ready")) {
-		t.Fatal("producer not ready")
+	transport := fnhttp.NewRoundTripper()
+	defer transport.Close()
+	client := http.Client{
+		Transport: transport,
+		Timeout:   30 * time.Second,
 	}
+	url := fmt.Sprintf("http://broker-ingress.knative-eventing.svc/%s/%s", Namespace, brokerName)
+	req, _ := http.NewRequestWithContext(t.Context(), "POST", url, strings.NewReader(`{}`))
+	req.Header.Set("Content-Type", "application/json")
+	req.Header.Set("ce-specversion", "1.0")
+	req.Header.Set("ce-type", "test.event")
+	req.Header.Set("ce-source", "producer")
+	req.Header.Set("ce-id", uniqueEventID)
 
-	client := http.Client{Timeout: 30 * time.Second}
-	resp, err := client.Post(producerURL+"?event_id="+uniqueEventID, "application/json", strings.NewReader("{}"))
+	resp, err := client.Do(req)
 	if err != nil {
 		t.Fatalf("Failed to invoke producer: %v", err)
 	}
 	body, _ := io.ReadAll(resp.Body)
 	resp.Body.Close()
-	if resp.StatusCode != 200 {
-		t.Fatalf("Broker rejected event: %s", body)
+	if resp.StatusCode != 202 {
+		t.Fatalf("Broker rejected event: code: %d, body: %q", resp.StatusCode, body)
 	}
 	t.Logf("Broker accepted event %s", uniqueEventID)
 
@@ -651,29 +640,38 @@ func TestMetadata_Subscriptions(t *testing.T) {
 	}
 }
 
-// Starts HTTP server to receive callbacks from subscriber pod
-func startCallbackServer(t *testing.T, ch chan<- string) (string, func()) {
+func waitForEvent(t *testing.T, eventId string) <-chan string {
 	t.Helper()
-	hostIP := getHostIPForCluster(t)
-	listener, err := net.Listen("tcp", ":0")
+
+	eventReceived := make(chan string, 10)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	t.Cleanup(cancel)
+
+	pr, pw := io.Pipe()
+	cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*")
+	cmd.Stderr = io.Discard
+	cmd.Stdout = pw
+	cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
+	err := cmd.Start()
 	if err != nil {
-		t.Fatalf("Failed to listen: %v", err)
+		t.Fatal(err)
 	}
-	port := listener.Addr().(*net.TCPAddr).Port
-
-	mux := http.NewServeMux()
-	mux.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) {
-		body, _ := io.ReadAll(r.Body)
-		select {
-		case ch <- string(body):
-		default:
+	go func() {
+		r := bufio.NewReader(pr)
+		m, e := regexp.MatchReader(`EVENT_RECEIVED: id=`+eventId, r)
+		if e != nil {
+			panic(e)
 		}
-		w.WriteHeader(200)
-	})
+		if m {
+			eventReceived <- "OK"
+			close(eventReceived)
+			cancel()
+		}
+		_, _ = io.Copy(io.Discard, r)
+	}()
 
-	srv := &http.Server{Handler: mux}
-	go srv.Serve(listener)
-	return fmt.Sprintf("http://%s:%d/callback", hostIP, port), func() { srv.Shutdown(context.Background()) }
+	return eventReceived
 }
 
 // CloudEvents handler that calls back to test server
@@ -681,26 +679,13 @@ func subscriberCode() string {
 	return `package function
 
 import (
-	"bytes"
 	"context"
 	"fmt"
-	"net/http"
-	"os"
-	"time"
 	"github.com/cloudevents/sdk-go/v2/event"
 )
 
 func Handle(ctx context.Context, e event.Event) (*event.Event, error) {
 	fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source())
-	if url := os.Getenv("CALLBACK_URL"); url != "" {
-		c := &http.Client{Timeout: 5 * time.Second}
-		if resp, err := c.Post(url, "text/plain", bytes.NewBufferString(e.ID())); err == nil {
-			resp.Body.Close()
-			fmt.Printf("Callback sent to %s\n", url)
-		} else {
-			fmt.Printf("Callback failed: %v\n", err)
-		}
-	}
 	r := event.New()
 	r.SetID("response-" + e.ID())
 	r.SetSource("subscriber")
@@ -711,84 +696,8 @@ func Handle(ctx context.Context, e event.Event) (*event.Event, error) {
 `
 }
 
-// HTTP handler that sends CloudEvents to broker
-func producerCode(namespace, broker string) string {
-	return fmt.Sprintf(`package function
-
-import (
-	"fmt"
-	"io"
-	"net/http"
-	"strings"
-	"time"
-)
-
-func Handle(w http.ResponseWriter, r *http.Request) {
-	if r.Method == "GET" {
-		fmt.Fprint(w, "Producer is ready")
-		return
-	}
-	eventID := r.URL.Query().Get("event_id")
-	if eventID == "" {
-		eventID = fmt.Sprintf("evt-%%d", time.Now().UnixNano())
-	}
-	url := "http://broker-ingress.knative-eventing.svc.cluster.local/%s/%s"
-	req, _ := http.NewRequest("POST", url, strings.NewReader(`+"`"+`{}`+"`"+`))
-	req.Header.Set("Content-Type", "application/json")
-	req.Header.Set("ce-specversion", "1.0")
-	req.Header.Set("ce-type", "test.event")
-	req.Header.Set("ce-source", "producer")
-	req.Header.Set("ce-id", eventID)
-	resp, err := (&http.Client{Timeout: 30 * time.Second}).Do(req)
-	if err != nil {
-		http.Error(w, err.Error(), 500)
-		return
-	}
-	defer resp.Body.Close()
-	body, _ := io.ReadAll(resp.Body)
-	if resp.StatusCode >= 200 && resp.StatusCode < 300 {
-		fmt.Fprintf(w, "Event %%s sent (%%d)", eventID, resp.StatusCode)
-	} else {
-		http.Error(w, string(body), 500)
-	}
-}
-`, namespace, broker)
-}
-
-// Returns host IP accessible from kind cluster
-func getHostIPForCluster(t *testing.T) string {
-	t.Helper()
-	cmd := exec.Command("docker", "network", "inspect", "kind", "-f", "{{(index .IPAM.Config 0).Gateway}}")
-	output, err := cmd.Output()
-	if err == nil && len(output) > 0 {
-		ip := strings.TrimSpace(string(output))
-
-		if ip != "" && !strings.Contains(ip, ":") {
-			t.Logf("Using kind network gateway: %s", ip)
-			return ip
-		}
-	}
-
-	cmd = exec.Command("ip", "route", "get", "1")
-	output, err = cmd.Output()
-	if err == nil {
-		// Parse output like "1.0.0.0 via 192.168.1.1 dev eth0 src 192.168.1.100"
-		fields := strings.Fields(string(output))
-		for i, f := range fields {
-			if f == "src" && i+1 < len(fields) {
-				t.Logf("Using host IP from route: %s", fields[i+1])
-				return fields[i+1]
-			}
-		}
-	}
-
-	// Last resort: use common Docker bridge IP
-	t.Log("Warning: Could not determine host IP, using 172.17.0.1 (Docker default)")
-	return "172.17.0.1"
-}
-
-// createBrokerWithCheck creates a Knative Broker and returns true if successful.
-func createBrokerWithCheck(t *testing.T, namespace, name string) bool {
+// createBrokerWithCheck creates a Knative Broker
+func createBrokerWithCheck(t *testing.T, namespace, name string) {
 	t.Helper()
 
 	brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1
@@ -804,9 +713,11 @@ metadata:
 
 	output, err := cmd.CombinedOutput()
 	if err != nil {
-		t.Logf("Failed to create broker: %v, output: %s", err, string(output))
-		return false
+		t.Fatalf("Failed to create broker: %v, output: %s", err, string(output))
 	}
+	t.Cleanup(func() {
+		deleteBroker(t, namespace, name)
+	})
 	t.Logf("Created broker %s in namespace %s", name, namespace)
 
 	waitCmd := exec.Command("kubectl", "wait", "--for=condition=Ready",
@@ -815,7 +726,6 @@ metadata:
 	waitOutput, err := waitCmd.CombinedOutput()
 	if err != nil {
 		t.Logf("Broker not ready: %v, output: %s", err, string(waitOutput))
-		return false
 	}
 	t.Logf("Broker %s is ready", name)
 
@@ -826,12 +736,11 @@ metadata:
 		checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
 		if err := checkCmd.Run(); err == nil {
 			t.Log("broker-ingress service is available")
-			return true
+			return
 		}
 		time.Sleep(2 * time.Second)
 	}
-	t.Log("broker-ingress service check timed out")
-	return false
+	t.Fatal("broker-ingress service check timed out")
 }
 
 // deleteBroker removes a Knative Broker from the given namespace.

@matejvasek
Copy link
Contributor

I am fine with calling back host, however I am curious why just tapping to the consumer logs would not be sufficient?

cannot you wait for the event via logs like this:

func waitForEvent(t *testing.T, eventId string) <-chan string {
	t.Helper()

	eventReceived := make(chan string, 10)

	ctx, cancel := context.WithCancel(context.Background())
	t.Cleanup(cancel)

	pr, pw := io.Pipe()
	cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*")
	cmd.Stderr = io.Discard
	cmd.Stdout = pw
	cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
	err := cmd.Start()
	if err != nil {
		t.Fatal(err)
	}
	go func() {
		r := bufio.NewReader(pr)
		m, e := regexp.MatchReader(`EVENT_RECEIVED: id=`+eventId, r)
		if e != nil {
			panic(e)
		}
		if m {
			eventReceived <- "OK"
			close(eventReceived)
			cancel()
		}
		_, _ = io.Copy(io.Discard, r)
	}()

	return eventReceived
}

@matejvasek
Copy link
Contributor

@Kunal1522 check out #3364

@Kunal1522
Copy link
Contributor Author

I am fine with calling back host, however I am curious why just tapping to the consumer logs would not be sufficient?

i originally tried to read logs but faced segmentation issues when spawning kubectl subprocesses to read files from the container. at last i could not resolve it so went with calling host.i think this will not be issue with stern approach u suggested .also maybe becoz of rootless podman on ur machine the subscriber pod had issues communicating back to http service.

@matejvasek
Copy link
Contributor

maybe becoz of rootless podman on ur machine the subscriber pod had issues communicating back to http service

Yes, probably because of how networking works by default on rootless podman it does not work for me. So I think we should abandon that call back to host. There may be more devs using rootless podman.

@Kunal1522
Copy link
Contributor Author

maybe becoz of rootless podman on ur machine the subscriber pod had issues communicating back to http service

Yes, probably because of how networking works by default on rootless podman it does not work for me. So I think we should abandon that call back to host. There may be more devs using rootless podman.

okay then i will go with design u suggested .am testing this on my end and will push it soon

…ducer and callback server, and using stern to verify event reception.
@matejvasek
Copy link
Contributor

/approve
/lgtm

@knative-prow knative-prow bot added the lgtm Indicates that a PR is ready to be merged. label Jan 16, 2026
@knative-prow
Copy link

knative-prow bot commented Jan 16, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: Kunal1522, matejvasek

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow knative-prow bot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jan 16, 2026
@knative-prow knative-prow bot merged commit 4f14a28 into knative:main Jan 16, 2026
43 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved Indicates a PR has been approved by an approver from all required OWNERS files. lgtm Indicates that a PR is ready to be merged. ok-to-test Indicates a non-member PR verified by an org member that is safe to test. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

E2E Subscription Tests

3 participants