From ab3978c3656d6be24c6dc021588a5c54869a8dde Mon Sep 17 00:00:00 2001 From: Timur Zununbekov Date: Fri, 4 Jun 2021 22:01:45 +0600 Subject: [PATCH] Adapter's StartReceiver() method implemented (#5464) --- pkg/adapter/v2/cloudevents.go | 34 +++++--- pkg/adapter/v2/cloudevents_test.go | 128 +++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+), 13 deletions(-) diff --git a/pkg/adapter/v2/cloudevents.go b/pkg/adapter/v2/cloudevents.go index e1c07471fcb..169e1fd69d2 100644 --- a/pkg/adapter/v2/cloudevents.go +++ b/pkg/adapter/v2/cloudevents.go @@ -37,29 +37,34 @@ import ( // NewCloudEventsClient returns a client that will apply the ceOverrides to // outbound events and report outbound event counts. func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (cloudevents.Client, error) { - return newCloudEventsClientCRStatus(nil, target, ceOverrides, reporter, nil) + opts := make([]http.Option, 0) + if len(target) > 0 { + opts = append(opts, cloudevents.WithTarget(target)) + } + return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...) +} + +// NewCloudEventsClientWithOptions returns a client created with provided options +func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (cloudevents.Client, error) { + return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...) } // NewCloudEventsClientCRStatus returns a client CR status func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) { - return newCloudEventsClientCRStatus(env, "", nil, reporter, crStatusEventClient) + return newCloudEventsClientCRStatus(env, nil, reporter, crStatusEventClient) } -func newCloudEventsClientCRStatus(env EnvConfigAccessor, target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, - crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) { - - if target == "" && env != nil { - target = env.GetSink() - } +func newCloudEventsClientCRStatus(env EnvConfigAccessor, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, + crStatusEventClient *crstatusevent.CRStatusEventClient, opts ...http.Option) (cloudevents.Client, error) { pOpts := make([]http.Option, 0) - if len(target) > 0 { - pOpts = append(pOpts, cloudevents.WithTarget(target)) - } pOpts = append(pOpts, cloudevents.WithRoundTripper(&ochttp.Transport{ Propagation: tracecontextb3.TraceContextEgress, })) if env != nil { + if target := env.GetSink(); len(target) > 0 { + pOpts = append(pOpts, cloudevents.WithTarget(target)) + } if sinkWait := env.GetSinktimeout(); sinkWait > 0 { pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second)) } @@ -72,7 +77,10 @@ func newCloudEventsClientCRStatus(env EnvConfigAccessor, target string, ceOverri } } - p, err := cloudevents.NewHTTP(pOpts...) + // Make sure that explicitly set options have priority + opts = append(pOpts, opts...) + + p, err := cloudevents.NewHTTP(opts...) if err != nil { return nil, err } @@ -133,7 +141,7 @@ func (c *client) Request(ctx context.Context, out event.Event) (*event.Event, pr // StartReceiver implements client.StartReceiver func (c *client) StartReceiver(ctx context.Context, fn interface{}) error { - return errors.New("not implemented") + return c.ceClient.StartReceiver(ctx, fn) } func (c *client) applyOverrides(event *cloudevents.Event) { diff --git a/pkg/adapter/v2/cloudevents_test.go b/pkg/adapter/v2/cloudevents_test.go index f911eb71160..568b289e6fe 100644 --- a/pkg/adapter/v2/cloudevents_test.go +++ b/pkg/adapter/v2/cloudevents_test.go @@ -17,14 +17,23 @@ limitations under the License. package adapter import ( + "bytes" "context" + "fmt" + "io/ioutil" + "math/rand" + nethttp "net/http" + "net/url" "os" "strconv" + "sync" "testing" "time" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/google/go-cmp/cmp" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/source" @@ -293,6 +302,125 @@ func TestNewCloudEventsClient_request(t *testing.T) { } } +func TestNewCloudEventsClient_receiver(t *testing.T) { + sampleEvent := func() *cloudevents.Event { + event := cloudevents.NewEvent(cloudevents.VersionV1) + event.SetID("abc-123") + event.SetSource("unit/test") + event.SetType("unit.type") + event.SetDataContentType("application/json") + return &event + } + + testCases := map[string]struct { + Headers nethttp.Header + Body []byte + ExpectedBody string + WantTimeout bool + }{ + "binary event": { + Headers: map[string][]string{ + "content-type": {"application/json"}, + "ce-specversion": {"1.0"}, + "ce-id": {"abc-123"}, + "ce-type": {"unit.type"}, + "ce-source": {"unit/test"}, + }, + Body: []byte(`{"type": "binary"}`), + ExpectedBody: `{"type": "binary"}`, + }, + "structured event": { + Headers: map[string][]string{ + "content-type": {"application/cloudevents+json"}, + }, + Body: []byte(`{ + "specversion": "1.0", + "id": "abc-123", + "source": "unit/test", + "type": "unit.type", + "datacontenttype": "application/json", + "data": {"type": "structured"} + }`), + ExpectedBody: ` {"type": "structured"}`, + }, + "malformed event": { + Headers: map[string][]string{ + "ce-type": {"err.event"}, + }, + WantTimeout: true, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + port := rand.Intn(16383) + 49152 // IANA Dynamic Ports range + ceClient, err := NewCloudEventsClientWithOptions(nil, &mockReporter{}, cloudevents.WithPort(port)) + if err != nil { + t.Errorf("failed to create CE client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + events := make(chan event.Event, 1) + defer close(events) + errors := make(chan error, 1) + defer close(errors) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + wg.Done() + err := ceClient.StartReceiver(ctx, func(event event.Event) error { + events <- event + return nil + }) + if err != nil { + errors <- err + } + }() + wg.Wait() // wait for the above goroutine to be running + time.Sleep(50 * time.Millisecond) // wait for the receiver to start + + target, err := url.Parse(fmt.Sprintf("http://localhost:%d", port)) + if err != nil { + t.Errorf("failed to parse target URL: %v", err) + } + + req := &nethttp.Request{ + Method: "POST", + URL: target, + Header: tc.Headers, + Body: ioutil.NopCloser(bytes.NewReader(tc.Body)), + ContentLength: int64(len(tc.Body)), + } + + _, err = nethttp.DefaultClient.Do(req) + if err != nil { + t.Errorf("failed to execute the request %s", err.Error()) + } + + select { + case <-ctx.Done(): + if !tc.WantTimeout { + t.Errorf("unexpected receiver timeout") + } + case got := <-events: + if tc.WantTimeout { + t.Errorf("unexpected event received") + } + if diff := cmp.Diff(sampleEvent().Context, got.Context); diff != "" { + t.Errorf("unexpected events.Context (-want, +got) = %v", diff) + } + if diff := cmp.Diff(tc.ExpectedBody, string(got.Data())); diff != "" { + t.Errorf("unexpected events.Data (-want, +got) = %v", diff) + } + case err := <-errors: + t.Errorf("failed to start receiver: %v", err) + } + }) + } +} + func validateSent(t *testing.T, ce *test.TestCloudEventsClient, want string) { if got := len(ce.Sent()); got != 1 { t.Error("Expected 1 event to be sent, got", got)