Skip to content

Commit

Permalink
Adapter's StartReceiver() method implemented (#5464)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzununbekov authored Jun 4, 2021
1 parent 0f71d86 commit ab3978c
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 13 deletions.
34 changes: 21 additions & 13 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,34 @@ import (
// NewCloudEventsClient returns a client that will apply the ceOverrides to
// outbound events and report outbound event counts.
func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(nil, target, ceOverrides, reporter, nil)
opts := make([]http.Option, 0)
if len(target) > 0 {
opts = append(opts, cloudevents.WithTarget(target))
}
return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...)
}

// NewCloudEventsClientWithOptions returns a client created with provided options
func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...)
}

// NewCloudEventsClientCRStatus returns a client CR status
func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(env, "", nil, reporter, crStatusEventClient)
return newCloudEventsClientCRStatus(env, nil, reporter, crStatusEventClient)
}
func newCloudEventsClientCRStatus(env EnvConfigAccessor, target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter,
crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {

if target == "" && env != nil {
target = env.GetSink()
}
func newCloudEventsClientCRStatus(env EnvConfigAccessor, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter,
crStatusEventClient *crstatusevent.CRStatusEventClient, opts ...http.Option) (cloudevents.Client, error) {

pOpts := make([]http.Option, 0)
if len(target) > 0 {
pOpts = append(pOpts, cloudevents.WithTarget(target))
}
pOpts = append(pOpts, cloudevents.WithRoundTripper(&ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))

if env != nil {
if target := env.GetSink(); len(target) > 0 {
pOpts = append(pOpts, cloudevents.WithTarget(target))
}
if sinkWait := env.GetSinktimeout(); sinkWait > 0 {
pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second))
}
Expand All @@ -72,7 +77,10 @@ func newCloudEventsClientCRStatus(env EnvConfigAccessor, target string, ceOverri
}
}

p, err := cloudevents.NewHTTP(pOpts...)
// Make sure that explicitly set options have priority
opts = append(pOpts, opts...)

p, err := cloudevents.NewHTTP(opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -133,7 +141,7 @@ func (c *client) Request(ctx context.Context, out event.Event) (*event.Event, pr

// StartReceiver implements client.StartReceiver
func (c *client) StartReceiver(ctx context.Context, fn interface{}) error {
return errors.New("not implemented")
return c.ceClient.StartReceiver(ctx, fn)
}

func (c *client) applyOverrides(event *cloudevents.Event) {
Expand Down
128 changes: 128 additions & 0 deletions pkg/adapter/v2/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,23 @@ limitations under the License.
package adapter

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math/rand"
nethttp "net/http"
"net/url"
"os"
"strconv"
"sync"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/google/go-cmp/cmp"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/source"

Expand Down Expand Up @@ -293,6 +302,125 @@ func TestNewCloudEventsClient_request(t *testing.T) {
}
}

func TestNewCloudEventsClient_receiver(t *testing.T) {
sampleEvent := func() *cloudevents.Event {
event := cloudevents.NewEvent(cloudevents.VersionV1)
event.SetID("abc-123")
event.SetSource("unit/test")
event.SetType("unit.type")
event.SetDataContentType("application/json")
return &event
}

testCases := map[string]struct {
Headers nethttp.Header
Body []byte
ExpectedBody string
WantTimeout bool
}{
"binary event": {
Headers: map[string][]string{
"content-type": {"application/json"},
"ce-specversion": {"1.0"},
"ce-id": {"abc-123"},
"ce-type": {"unit.type"},
"ce-source": {"unit/test"},
},
Body: []byte(`{"type": "binary"}`),
ExpectedBody: `{"type": "binary"}`,
},
"structured event": {
Headers: map[string][]string{
"content-type": {"application/cloudevents+json"},
},
Body: []byte(`{
"specversion": "1.0",
"id": "abc-123",
"source": "unit/test",
"type": "unit.type",
"datacontenttype": "application/json",
"data": {"type": "structured"}
}`),
ExpectedBody: ` {"type": "structured"}`,
},
"malformed event": {
Headers: map[string][]string{
"ce-type": {"err.event"},
},
WantTimeout: true,
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
port := rand.Intn(16383) + 49152 // IANA Dynamic Ports range
ceClient, err := NewCloudEventsClientWithOptions(nil, &mockReporter{}, cloudevents.WithPort(port))
if err != nil {
t.Errorf("failed to create CE client: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

events := make(chan event.Event, 1)
defer close(events)
errors := make(chan error, 1)
defer close(errors)

var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
err := ceClient.StartReceiver(ctx, func(event event.Event) error {
events <- event
return nil
})
if err != nil {
errors <- err
}
}()
wg.Wait() // wait for the above goroutine to be running
time.Sleep(50 * time.Millisecond) // wait for the receiver to start

target, err := url.Parse(fmt.Sprintf("http://localhost:%d", port))
if err != nil {
t.Errorf("failed to parse target URL: %v", err)
}

req := &nethttp.Request{
Method: "POST",
URL: target,
Header: tc.Headers,
Body: ioutil.NopCloser(bytes.NewReader(tc.Body)),
ContentLength: int64(len(tc.Body)),
}

_, err = nethttp.DefaultClient.Do(req)
if err != nil {
t.Errorf("failed to execute the request %s", err.Error())
}

select {
case <-ctx.Done():
if !tc.WantTimeout {
t.Errorf("unexpected receiver timeout")
}
case got := <-events:
if tc.WantTimeout {
t.Errorf("unexpected event received")
}
if diff := cmp.Diff(sampleEvent().Context, got.Context); diff != "" {
t.Errorf("unexpected events.Context (-want, +got) = %v", diff)
}
if diff := cmp.Diff(tc.ExpectedBody, string(got.Data())); diff != "" {
t.Errorf("unexpected events.Data (-want, +got) = %v", diff)
}
case err := <-errors:
t.Errorf("failed to start receiver: %v", err)
}
})
}
}

func validateSent(t *testing.T, ce *test.TestCloudEventsClient, want string) {
if got := len(ce.Sent()); got != 1 {
t.Error("Expected 1 event to be sent, got", got)
Expand Down

0 comments on commit ab3978c

Please sign in to comment.