From e9cfad8c3164dad588e1d44cdfe3166049a86a79 Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Thu, 22 Dec 2022 14:36:50 -0800 Subject: [PATCH 1/5] http: Batch Events from HTTP Request and Response Simple implementations of NewEventsFromHTTPRequest and NewEventsFromHTTPResponse as a step towards providing some support for `application/cloudevents-batch+json`. This doesn't help with batch sending of Events, but this provides some basics of being able to process batch requests and/or responses from standard Go HTTP handling. This could be improved over time (See previous work on #301) as well, but optimised for having something that works to start, rather than implementing big design changes. Signed-off-by: Mark Mandel --- v2/alias.go | 6 ++++-- v2/binding/encoding.go | 5 +++++ v2/binding/format/format.go | 19 +++++++++++++++++++ v2/binding/to_event.go | 20 ++++++++++++++++++++ v2/protocol/http/message.go | 3 +++ v2/protocol/http/utility.go | 12 ++++++++++++ v2/protocol/http/utility_test.go | 26 ++++++++++++++++++++++++++ 7 files changed, 89 insertions(+), 2 deletions(-) diff --git a/v2/alias.go b/v2/alias.go index ed64b4c0c..47dad6a55 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -135,8 +135,10 @@ var ( ToMessage = binding.ToMessage // Event Creation - NewEventFromHTTPRequest = http.NewEventFromHTTPRequest - NewEventFromHTTPResponse = http.NewEventFromHTTPResponse + NewEventFromHTTPRequest = http.NewEventFromHTTPRequest + NewEventFromHTTPResponse = http.NewEventFromHTTPResponse + NewEventsFromHTTPRequest = http.NewEventsFromHTTPRequest + NewEventsFromHTTPResponse = http.NewEventsFromHTTPResponse // HTTP Messages diff --git a/v2/binding/encoding.go b/v2/binding/encoding.go index 16611a3d7..5070b7295 100644 --- a/v2/binding/encoding.go +++ b/v2/binding/encoding.go @@ -19,6 +19,9 @@ const ( EncodingEvent // When the encoding is unknown (which means that the message is a non-event) EncodingUnknown + + // EncodingBatch is an instance of JSON Batched Events + EncodingBatch ) func (e Encoding) String() string { @@ -29,6 +32,8 @@ func (e Encoding) String() string { return "structured" case EncodingEvent: return "event" + case EncodingBatch: + return "batch" case EncodingUnknown: return "unknown" } diff --git a/v2/binding/format/format.go b/v2/binding/format/format.go index 2d840025e..4db6db948 100644 --- a/v2/binding/format/format.go +++ b/v2/binding/format/format.go @@ -7,6 +7,7 @@ package format import ( "encoding/json" + "errors" "fmt" "strings" @@ -41,12 +42,30 @@ func (jsonFmt) Unmarshal(b []byte, e *event.Event) error { return json.Unmarshal(b, e) } +// JSONBatch is the built-in "application/cloudevents-batch+json" format. +var JSONBatch = jsonBatchFmt{} + +type jsonBatchFmt struct{} + +func (jb jsonBatchFmt) MediaType() string { + return event.ApplicationCloudEventsBatchJSON +} + +func (jb jsonBatchFmt) Marshal(e *event.Event) ([]byte, error) { + return nil, errors.New("not supported for batch events") +} + +func (jb jsonBatchFmt) Unmarshal(b []byte, e *event.Event) error { + return errors.New("not supported for batch events") +} + // built-in formats var formats map[string]Format func init() { formats = map[string]Format{} Add(JSON) + Add(JSONBatch) } // Lookup returns the format for contentType, or nil if not found. diff --git a/v2/binding/to_event.go b/v2/binding/to_event.go index 339a7833c..e6cfaacd2 100644 --- a/v2/binding/to_event.go +++ b/v2/binding/to_event.go @@ -8,6 +8,7 @@ package binding import ( "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -21,6 +22,9 @@ import ( // ErrCannotConvertToEvent is a generic error when a conversion of a Message to an Event fails var ErrCannotConvertToEvent = errors.New("cannot convert message to event") +// ErrCannotConvertToEvents is a generic error when a conversion of a Message to a Batched Event fails +var ErrCannotConvertToEvents = errors.New("cannot convert message to batched events") + // ToEvent translates a Message with a valid Structured or Binary representation to an Event. // This function returns the Event generated from the Message and the original encoding of the message or // an error that points the conversion error. @@ -61,6 +65,22 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder) } +// ToEvents translates a Batch Message and corresponding Reader data to a slice of Events. +// This function returns the Events generated from the body data, or an error that points +// to the conversion issue. +func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]event.Event, error) { + messageEncoding := message.ReadEncoding() + if messageEncoding != EncodingBatch { + return nil, ErrCannotConvertToEvents + } + + // Since Format doesn't support batch Marshalling, and we know it's structured batch json, we'll go direct to the + // json.UnMarshall(), since that is the best way to support batch operations for now. + var events []event.Event + err := json.NewDecoder(body).Decode(&events) + return events, err +} + type messageToEventBuilder event.Event var _ StructuredWriter = (*messageToEventBuilder)(nil) diff --git a/v2/protocol/http/message.go b/v2/protocol/http/message.go index e7e51d034..7a7c36f9b 100644 --- a/v2/protocol/http/message.go +++ b/v2/protocol/http/message.go @@ -92,6 +92,9 @@ func (m *Message) ReadEncoding() binding.Encoding { return binding.EncodingBinary } if m.format != nil { + if m.format == format.JSONBatch { + return binding.EncodingBatch + } return binding.EncodingStructured } return binding.EncodingUnknown diff --git a/v2/protocol/http/utility.go b/v2/protocol/http/utility.go index d46a33461..056ebe677 100644 --- a/v2/protocol/http/utility.go +++ b/v2/protocol/http/utility.go @@ -24,3 +24,15 @@ func NewEventFromHTTPResponse(resp *nethttp.Response) (*event.Event, error) { msg := NewMessageFromHttpResponse(resp) return binding.ToEvent(context.Background(), msg) } + +// NewEventsFromHTTPRequest returns a batched set of Events from a http.Request +func NewEventsFromHTTPRequest(req *nethttp.Request) ([]event.Event, error) { + msg := NewMessageFromHttpRequest(req) + return binding.ToEvents(context.Background(), msg, msg.BodyReader) +} + +// NewEventsFromHTTPResponse returns a batched set of Events from a http.Response +func NewEventsFromHTTPResponse(resp *nethttp.Response) ([]event.Event, error) { + msg := NewMessageFromHttpResponse(resp) + return binding.ToEvents(context.Background(), msg, msg.BodyReader) +} diff --git a/v2/protocol/http/utility_test.go b/v2/protocol/http/utility_test.go index 992751cb4..3869ceafd 100644 --- a/v2/protocol/http/utility_test.go +++ b/v2/protocol/http/utility_test.go @@ -8,6 +8,7 @@ package http import ( "bytes" "context" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -89,3 +90,28 @@ func TestNewEventFromHttpResponse(t *testing.T) { }) } } + +func TestNewEventsFromHTTPRequest(t *testing.T) { + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte(`[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`))) + req.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + + events, err := NewEventsFromHTTPRequest(req) + require.NoError(t, err) + require.Len(t, events, 1) + test.AssertEvent(t, events[0], test.IsValid()) +} + +func TestNewEventsFromHTTPResponse(t *testing.T) { + data := `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]` + resp := http.Response{ + Header: http.Header{ + "Content-Type": {event.ApplicationCloudEventsBatchJSON}, + }, + Body: io.NopCloser(bytes.NewReader([]byte(data))), + ContentLength: int64(len(data)), + } + events, err := NewEventsFromHTTPResponse(&resp) + require.NoError(t, err) + require.Len(t, events, 1) + test.AssertEvent(t, events[0], test.IsValid()) +} From 5361386fc39435cf3b808e5e6666bdcdae6e3c94 Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Fri, 6 Jan 2023 16:59:52 -0800 Subject: [PATCH 2/5] Implement NewHTTPRequestFromEvents Now you can send batch events with your very own http.Client. Signed-off-by: Mark Mandel --- v2/alias.go | 1 + v2/protocol/http/utility.go | 26 +++++++++++++++++++ v2/protocol/http/utility_test.go | 43 ++++++++++++++++++++++++++++++-- 3 files changed, 68 insertions(+), 2 deletions(-) diff --git a/v2/alias.go b/v2/alias.go index 47dad6a55..3dfab95c6 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -139,6 +139,7 @@ var ( NewEventFromHTTPResponse = http.NewEventFromHTTPResponse NewEventsFromHTTPRequest = http.NewEventsFromHTTPRequest NewEventsFromHTTPResponse = http.NewEventsFromHTTPResponse + NewHTTPRequestFromEvents = http.NewHTTPRequestFromEvents // HTTP Messages diff --git a/v2/protocol/http/utility.go b/v2/protocol/http/utility.go index 056ebe677..c1f16a7dd 100644 --- a/v2/protocol/http/utility.go +++ b/v2/protocol/http/utility.go @@ -6,7 +6,9 @@ package http import ( + "bytes" "context" + "encoding/json" nethttp "net/http" "github.com/cloudevents/sdk-go/v2/binding" @@ -36,3 +38,27 @@ func NewEventsFromHTTPResponse(resp *nethttp.Response) ([]event.Event, error) { msg := NewMessageFromHttpResponse(resp) return binding.ToEvents(context.Background(), msg, msg.BodyReader) } + +// NewHTTPRequestFromEvents creates a http.Request object that can be used with any http.Client. +func NewHTTPRequestFromEvents(ctx context.Context, url string, events []event.Event) (*nethttp.Request, error) { + // Sending batch events is quite straightforward, as there is only JSON format, so a simple implementation. + for _, e := range events { + if err := e.Validate(); err != nil { + return nil, err + } + } + var buffer bytes.Buffer + err := json.NewEncoder(&buffer).Encode(events) + if err != nil { + return nil, err + } + + request, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodPost, url, &buffer) + if err != nil { + return nil, err + } + + request.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + + return request, nil +} diff --git a/v2/protocol/http/utility_test.go b/v2/protocol/http/utility_test.go index 3869ceafd..131f517b2 100644 --- a/v2/protocol/http/utility_test.go +++ b/v2/protocol/http/utility_test.go @@ -14,11 +14,11 @@ import ( "net/http/httptest" "testing" - "github.com/stretchr/testify/require" - "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + "github.com/stretchr/testify/require" ) func TestNewEventFromHttpRequest(t *testing.T) { @@ -115,3 +115,42 @@ func TestNewEventsFromHTTPResponse(t *testing.T) { require.Len(t, events, 1) test.AssertEvent(t, events[0], test.IsValid()) } + +func TestNewHTTPRequestFromEvents(t *testing.T) { + var events []event.Event + e := event.New() + e.SetID(uuid.New().String()) + e.SetSource("example/uri") + e.SetType("example.type") + require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) + events = append(events, e.Clone()) + + e.SetID(uuid.New().String()) + require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"goodbye": "world"})) + events = append(events, e) + + require.Len(t, events, 2) + require.NotEqual(t, events[0], events[1]) + + // echo back what we get, so we can compare events at either side. + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set(ContentType, r.Header.Get(ContentType)) + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + require.NoError(t, r.Body.Close()) + _, err = w.Write(b) + require.NoError(t, err) + })) + defer ts.Close() + + req, err := NewHTTPRequestFromEvents(context.Background(), ts.URL, events) + require.NoError(t, err) + + resp, err := ts.Client().Do(req) + require.NoError(t, err) + + result, err := NewEventsFromHTTPResponse(resp) + require.NoError(t, err) + + require.Equal(t, events, result) +} From f5d82ff3dc5ddf481bdb62cedcb7efbde5c682ed Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Mon, 9 Jan 2023 14:19:23 -0800 Subject: [PATCH 3/5] Review updates * Expanded test for NewEventsFromHTTPRequest * Implemented NewHTTPRequestFromEvent as well Signed-off-by: Mark Mandel --- v2/protocol/http/utility.go | 20 ++++++- v2/protocol/http/utility_test.go | 95 +++++++++++++++++++++++++++++--- 2 files changed, 107 insertions(+), 8 deletions(-) diff --git a/v2/protocol/http/utility.go b/v2/protocol/http/utility.go index c1f16a7dd..4c6ef5f45 100644 --- a/v2/protocol/http/utility.go +++ b/v2/protocol/http/utility.go @@ -39,7 +39,25 @@ func NewEventsFromHTTPResponse(resp *nethttp.Response) ([]event.Event, error) { return binding.ToEvents(context.Background(), msg, msg.BodyReader) } -// NewHTTPRequestFromEvents creates a http.Request object that can be used with any http.Client. +// NewHTTPRequestFromEvent creates a http.Request object that can be used with any http.Client for a singular event. +func NewHTTPRequestFromEvent(ctx context.Context, url string, event event.Event) (*nethttp.Request, error) { + if err := event.Validate(); err != nil { + return nil, err + } + + req, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodPost, url, nil) + if err != nil { + return nil, err + } + if err := WriteRequest(ctx, (*binding.EventMessage)(&event), req); err != nil { + return nil, err + } + + return req, nil +} + +// NewHTTPRequestFromEvents creates a http.Request object that can be used with any http.Client for sending +// a batched set of events. func NewHTTPRequestFromEvents(ctx context.Context, url string, events []event.Event) (*nethttp.Request, error) { // Sending batch events is quite straightforward, as there is only JSON format, so a simple implementation. for _, e := range events { diff --git a/v2/protocol/http/utility_test.go b/v2/protocol/http/utility_test.go index 131f517b2..37b27e00f 100644 --- a/v2/protocol/http/utility_test.go +++ b/v2/protocol/http/utility_test.go @@ -12,6 +12,7 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "strings" "testing" "github.com/cloudevents/sdk-go/v2/binding" @@ -92,13 +93,58 @@ func TestNewEventFromHttpResponse(t *testing.T) { } func TestNewEventsFromHTTPRequest(t *testing.T) { - req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte(`[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`))) - req.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + type expected struct { + len int + ids []string + } - events, err := NewEventsFromHTTPRequest(req) - require.NoError(t, err) - require.Len(t, events, 1) - test.AssertEvent(t, events[0], test.IsValid()) + fixtures := map[string]struct { + jsn string + expected expected + }{ + "single": { + jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`, + expected: expected{ + len: 1, + ids: []string{"id"}, + }, + }, + "triple": { + jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id1","source":"source","specversion":"1.0","type":"type"},{"data":"foo","datacontenttype":"application/json","id":"id2","source":"source","specversion":"1.0","type":"type"},{"data":"foo","datacontenttype":"application/json","id":"id3","source":"source","specversion":"1.0","type":"type"}]`, + expected: expected{ + len: 3, + ids: []string{"id1", "id2", "id3"}, + }, + }, + } + + for k, v := range fixtures { + t.Run(k, func(t *testing.T) { + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte(v.jsn))) + req.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + + events, err := NewEventsFromHTTPRequest(req) + require.NoError(t, err) + require.Len(t, events, v.expected.len) + for i, e := range events { + test.AssertEvent(t, e, test.IsValid()) + require.Equal(t, v.expected.ids[i], events[i].ID()) + } + }) + } + + t.Run("bad request", func(t *testing.T) { + e := event.New() + e.SetID(uuid.New().String()) + e.SetSource("example/uri") + e.SetType("example.type") + require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) + req, err := NewHTTPRequestFromEvent(context.Background(), "http://localhost", e) + require.NoError(t, err) + + _, err = NewEventsFromHTTPRequest(req) + require.ErrorContainsf(t, err, "cannot convert message to batched events", "error should include message") + }) } func TestNewEventsFromHTTPResponse(t *testing.T) { @@ -116,6 +162,42 @@ func TestNewEventsFromHTTPResponse(t *testing.T) { test.AssertEvent(t, events[0], test.IsValid()) } +func TestNewHTTPRequestFromEvent(t *testing.T) { + e := event.New() + e.SetID(uuid.New().String()) + e.SetSource("example/uri") + e.SetType("example.type") + require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) + + // echo back what we get, so we can compare events at either side. + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set(ContentType, r.Header.Get(ContentType)) + // copy across structured headers + for k, v := range r.Header { + if strings.HasPrefix(k, "Ce-") { + w.Header()[k] = v + } + } + + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + require.NoError(t, r.Body.Close()) + _, err = w.Write(b) + require.NoError(t, err) + })) + defer ts.Close() + + req, err := NewHTTPRequestFromEvent(context.Background(), ts.URL, e) + require.NoError(t, err) + + resp, err := ts.Client().Do(req) + require.NoError(t, err) + + result, err := NewEventFromHTTPResponse(resp) + require.NoError(t, err) + require.Equal(t, &e, result) +} + func TestNewHTTPRequestFromEvents(t *testing.T) { var events []event.Event e := event.New() @@ -151,6 +233,5 @@ func TestNewHTTPRequestFromEvents(t *testing.T) { result, err := NewEventsFromHTTPResponse(resp) require.NoError(t, err) - require.Equal(t, events, result) } From c55cd83b3245d70fc1dd00732ba2096a607021d1 Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Tue, 10 Jan 2023 09:01:01 -0800 Subject: [PATCH 4/5] Add IsHTTPBatch IsHTTPBatch is a convenience function such that an end user could determine if a request or response is batch or not, and process it as such. Signed-off-by: Mark Mandel --- v2/alias.go | 3 +++ v2/protocol/http/utility.go | 6 ++++++ v2/protocol/http/utility_test.go | 12 ++++++++++++ 3 files changed, 21 insertions(+) diff --git a/v2/alias.go b/v2/alias.go index 3dfab95c6..337774ec4 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -135,11 +135,14 @@ var ( ToMessage = binding.ToMessage // Event Creation + NewEventFromHTTPRequest = http.NewEventFromHTTPRequest NewEventFromHTTPResponse = http.NewEventFromHTTPResponse NewEventsFromHTTPRequest = http.NewEventsFromHTTPRequest NewEventsFromHTTPResponse = http.NewEventsFromHTTPResponse NewHTTPRequestFromEvents = http.NewHTTPRequestFromEvents + NewHTTPRequestFromEvent = http.NewHTTPRequestFromEvent + IsHTTPBatch = http.IsHTTPBatch // HTTP Messages diff --git a/v2/protocol/http/utility.go b/v2/protocol/http/utility.go index 4c6ef5f45..9d194af0a 100644 --- a/v2/protocol/http/utility.go +++ b/v2/protocol/http/utility.go @@ -80,3 +80,9 @@ func NewHTTPRequestFromEvents(ctx context.Context, url string, events []event.Ev return request, nil } + +// IsHTTPBatch returns of the current http.Request or http.Response is a batch event operation, by checking the +// header `Content-Type` value. +func IsHTTPBatch(header nethttp.Header) bool { + return header.Get(ContentType) == event.ApplicationCloudEventsBatchJSON +} diff --git a/v2/protocol/http/utility_test.go b/v2/protocol/http/utility_test.go index 37b27e00f..a58cb0df2 100644 --- a/v2/protocol/http/utility_test.go +++ b/v2/protocol/http/utility_test.go @@ -19,6 +19,7 @@ import ( "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -235,3 +236,14 @@ func TestNewHTTPRequestFromEvents(t *testing.T) { require.NoError(t, err) require.Equal(t, events, result) } + +func TestIsHTTPBatch(t *testing.T) { + header := http.Header{} + assert.False(t, IsHTTPBatch(header)) + + header.Set(ContentType, event.ApplicationJSON) + assert.False(t, IsHTTPBatch(header)) + + header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + assert.True(t, IsHTTPBatch(header)) +} From 0cfc8f4f4126d6a5fb39ef6c80d23506b61d5068 Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Tue, 17 Jan 2023 15:24:05 -0800 Subject: [PATCH 5/5] Review updates * Reorder in alias.go * More coverage of TestNewEventsFromHTTPRequest * tests for `to_events(...)` * Better comment documentation. Signed-off-by: Mark Mandel --- v2/alias.go | 2 +- v2/binding/format/format.go | 3 + v2/binding/to_event.go | 3 +- v2/binding/to_event_test.go | 50 +++++++++++ v2/protocol/http/utility.go | 9 +- v2/protocol/http/utility_test.go | 145 ++++++++++++++++++------------- 6 files changed, 147 insertions(+), 65 deletions(-) diff --git a/v2/alias.go b/v2/alias.go index 337774ec4..2fbfaa9a7 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -140,8 +140,8 @@ var ( NewEventFromHTTPResponse = http.NewEventFromHTTPResponse NewEventsFromHTTPRequest = http.NewEventsFromHTTPRequest NewEventsFromHTTPResponse = http.NewEventsFromHTTPResponse - NewHTTPRequestFromEvents = http.NewHTTPRequestFromEvents NewHTTPRequestFromEvent = http.NewHTTPRequestFromEvent + NewHTTPRequestFromEvents = http.NewHTTPRequestFromEvents IsHTTPBatch = http.IsHTTPBatch // HTTP Messages diff --git a/v2/binding/format/format.go b/v2/binding/format/format.go index 4db6db948..6bdd1842b 100644 --- a/v2/binding/format/format.go +++ b/v2/binding/format/format.go @@ -51,6 +51,9 @@ func (jb jsonBatchFmt) MediaType() string { return event.ApplicationCloudEventsBatchJSON } +// Marshal will return an error for jsonBatchFmt since the Format interface doesn't support batch Marshalling, and we +// know it's structured batch json, we'll go direct to the json.UnMarshall() (see `ToEvents()`) since that is the best +// way to support batch operations for now. func (jb jsonBatchFmt) Marshal(e *event.Event) ([]byte, error) { return nil, errors.New("not supported for batch events") } diff --git a/v2/binding/to_event.go b/v2/binding/to_event.go index e6cfaacd2..d3332c158 100644 --- a/v2/binding/to_event.go +++ b/v2/binding/to_event.go @@ -77,8 +77,7 @@ func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]eve // Since Format doesn't support batch Marshalling, and we know it's structured batch json, we'll go direct to the // json.UnMarshall(), since that is the best way to support batch operations for now. var events []event.Event - err := json.NewDecoder(body).Decode(&events) - return events, err + return events, json.NewDecoder(body).Decode(&events) } type messageToEventBuilder event.Event diff --git a/v2/binding/to_event_test.go b/v2/binding/to_event_test.go index 0a0e00c00..185744a23 100644 --- a/v2/binding/to_event_test.go +++ b/v2/binding/to_event_test.go @@ -7,8 +7,12 @@ package binding_test import ( "context" + "io" + nethttp "net/http" + "strings" "testing" + "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/stretchr/testify/require" "github.com/cloudevents/sdk-go/v2/binding" @@ -159,3 +163,49 @@ func TestToEvent_transformers_applied_once(t *testing.T) { } }) } + +func TestToEvents(t *testing.T) { + fixture := map[string]struct { + contentType string + jsn string + expected func(*testing.T, []event.Event, error) + }{ + "valid event": { + jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`, + expected: func(t *testing.T, list []event.Event, err error) { + require.NoError(t, err) + require.Len(t, list, 1) + require.Equal(t, "id", list[0].ID()) + }, + }, + "invalid event": { + jsn: `[{"data":"foo","datacontenttype":"application/json","specversion":"0.1","type":"type"}]`, + expected: func(t *testing.T, _ []event.Event, err error) { + require.ErrorContains(t, err, "specversion: unknown value") + }, + }, + "bad content type": { + jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`, + contentType: event.ApplicationJSON, + expected: func(t *testing.T, _ []event.Event, err error) { + require.ErrorContains(t, err, "cannot convert message to batched events") + }, + }, + } + + for k, v := range fixture { + t.Run(k, func(t *testing.T) { + header := nethttp.Header{} + if len(v.contentType) == 0 { + header.Set(http.ContentType, event.ApplicationCloudEventsBatchJSON) + } else { + header.Set(http.ContentType, v.contentType) + } + + buf := io.NopCloser(strings.NewReader(v.jsn)) + msg := http.NewMessage(header, buf) + list, err := binding.ToEvents(context.Background(), msg, msg.BodyReader) + v.expected(t, list, err) + }) + } +} diff --git a/v2/protocol/http/utility.go b/v2/protocol/http/utility.go index 9d194af0a..350fc1cf6 100644 --- a/v2/protocol/http/utility.go +++ b/v2/protocol/http/utility.go @@ -27,19 +27,20 @@ func NewEventFromHTTPResponse(resp *nethttp.Response) (*event.Event, error) { return binding.ToEvent(context.Background(), msg) } -// NewEventsFromHTTPRequest returns a batched set of Events from a http.Request +// NewEventsFromHTTPRequest returns a batched set of Events from a HTTP Request func NewEventsFromHTTPRequest(req *nethttp.Request) ([]event.Event, error) { msg := NewMessageFromHttpRequest(req) return binding.ToEvents(context.Background(), msg, msg.BodyReader) } -// NewEventsFromHTTPResponse returns a batched set of Events from a http.Response +// NewEventsFromHTTPResponse returns a batched set of Events from a HTTP Response func NewEventsFromHTTPResponse(resp *nethttp.Response) ([]event.Event, error) { msg := NewMessageFromHttpResponse(resp) return binding.ToEvents(context.Background(), msg, msg.BodyReader) } // NewHTTPRequestFromEvent creates a http.Request object that can be used with any http.Client for a singular event. +// This is an HTTP POST action to the provided url. func NewHTTPRequestFromEvent(ctx context.Context, url string, event event.Event) (*nethttp.Request, error) { if err := event.Validate(); err != nil { return nil, err @@ -57,7 +58,7 @@ func NewHTTPRequestFromEvent(ctx context.Context, url string, event event.Event) } // NewHTTPRequestFromEvents creates a http.Request object that can be used with any http.Client for sending -// a batched set of events. +// a batched set of events. This is an HTTP POST action to the provided url. func NewHTTPRequestFromEvents(ctx context.Context, url string, events []event.Event) (*nethttp.Request, error) { // Sending batch events is quite straightforward, as there is only JSON format, so a simple implementation. for _, e := range events { @@ -81,7 +82,7 @@ func NewHTTPRequestFromEvents(ctx context.Context, url string, events []event.Ev return request, nil } -// IsHTTPBatch returns of the current http.Request or http.Response is a batch event operation, by checking the +// IsHTTPBatch returns if the current http.Request or http.Response is a batch event operation, by checking the // header `Content-Type` value. func IsHTTPBatch(header nethttp.Header) bool { return header.Get(ContentType) == event.ApplicationCloudEventsBatchJSON diff --git a/v2/protocol/http/utility_test.go b/v2/protocol/http/utility_test.go index a58cb0df2..a0583094e 100644 --- a/v2/protocol/http/utility_test.go +++ b/v2/protocol/http/utility_test.go @@ -94,14 +94,17 @@ func TestNewEventFromHttpResponse(t *testing.T) { } func TestNewEventsFromHTTPRequest(t *testing.T) { + type expected struct { - len int - ids []string + len int + ids []string + errorContains string } fixtures := map[string]struct { - jsn string - expected expected + jsn string + contentType string + expected expected }{ "single": { jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`, @@ -117,35 +120,44 @@ func TestNewEventsFromHTTPRequest(t *testing.T) { ids: []string{"id1", "id2", "id3"}, }, }, + "invalid header": { + jsn: `{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}`, + contentType: event.ApplicationJSON, + expected: expected{ + errorContains: "cannot convert message to batched events", + }, + }, + "invalid event": { + jsn: `[{"data":"foo","datacontenttype":"application/json","specversion":"0.1","type":"type"}]`, + expected: expected{ + errorContains: "specversion: unknown value", + }, + }, } for k, v := range fixtures { t.Run(k, func(t *testing.T) { req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte(v.jsn))) - req.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + if len(v.contentType) == 0 { + req.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + } else { + req.Header.Set(ContentType, v.contentType) + } events, err := NewEventsFromHTTPRequest(req) - require.NoError(t, err) - require.Len(t, events, v.expected.len) - for i, e := range events { - test.AssertEvent(t, e, test.IsValid()) - require.Equal(t, v.expected.ids[i], events[i].ID()) + if len(v.expected.errorContains) == 0 { + require.NoError(t, err) + require.Len(t, events, v.expected.len) + for i, e := range events { + test.AssertEvent(t, e, test.IsValid()) + require.Equal(t, v.expected.ids[i], events[i].ID()) + } + } else { + require.Error(t, err) + require.ErrorContainsf(t, err, v.expected.errorContains, "error should include message") } }) } - - t.Run("bad request", func(t *testing.T) { - e := event.New() - e.SetID(uuid.New().String()) - e.SetSource("example/uri") - e.SetType("example.type") - require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) - req, err := NewHTTPRequestFromEvent(context.Background(), "http://localhost", e) - require.NoError(t, err) - - _, err = NewEventsFromHTTPRequest(req) - require.ErrorContainsf(t, err, "cannot convert message to batched events", "error should include message") - }) } func TestNewEventsFromHTTPResponse(t *testing.T) { @@ -164,12 +176,6 @@ func TestNewEventsFromHTTPResponse(t *testing.T) { } func TestNewHTTPRequestFromEvent(t *testing.T) { - e := event.New() - e.SetID(uuid.New().String()) - e.SetSource("example/uri") - e.SetType("example.type") - require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) - // echo back what we get, so we can compare events at either side. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set(ContentType, r.Header.Get(ContentType)) @@ -188,33 +194,32 @@ func TestNewHTTPRequestFromEvent(t *testing.T) { })) defer ts.Close() - req, err := NewHTTPRequestFromEvent(context.Background(), ts.URL, e) - require.NoError(t, err) - - resp, err := ts.Client().Do(req) - require.NoError(t, err) + t.Run("valid event", func(t *testing.T) { + e := event.New() + e.SetID(uuid.New().String()) + e.SetSource("example/uri") + e.SetType("example.type") + require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) - result, err := NewEventFromHTTPResponse(resp) - require.NoError(t, err) - require.Equal(t, &e, result) -} + req, err := NewHTTPRequestFromEvent(context.Background(), ts.URL, e) + require.NoError(t, err) -func TestNewHTTPRequestFromEvents(t *testing.T) { - var events []event.Event - e := event.New() - e.SetID(uuid.New().String()) - e.SetSource("example/uri") - e.SetType("example.type") - require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) - events = append(events, e.Clone()) + resp, err := ts.Client().Do(req) + require.NoError(t, err) - e.SetID(uuid.New().String()) - require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"goodbye": "world"})) - events = append(events, e) + result, err := NewEventFromHTTPResponse(resp) + require.NoError(t, err) + require.Equal(t, &e, result) + }) - require.Len(t, events, 2) - require.NotEqual(t, events[0], events[1]) + t.Run("invalid event", func(t *testing.T) { + e := event.New() + _, err := NewHTTPRequestFromEvent(context.Background(), ts.URL, e) + require.ErrorContains(t, err, "id: MUST be a non-empty string") + }) +} +func TestNewHTTPRequestFromEvents(t *testing.T) { // echo back what we get, so we can compare events at either side. ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set(ContentType, r.Header.Get(ContentType)) @@ -226,15 +231,39 @@ func TestNewHTTPRequestFromEvents(t *testing.T) { })) defer ts.Close() - req, err := NewHTTPRequestFromEvents(context.Background(), ts.URL, events) - require.NoError(t, err) + t.Run("valid events", func(t *testing.T) { + var events []event.Event + e := event.New() + e.SetID(uuid.New().String()) + e.SetSource("example/uri") + e.SetType("example.type") + require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) + events = append(events, e.Clone()) - resp, err := ts.Client().Do(req) - require.NoError(t, err) + e.SetID(uuid.New().String()) + require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"goodbye": "world"})) + events = append(events, e) + + require.Len(t, events, 2) + require.NotEqual(t, events[0], events[1]) + + req, err := NewHTTPRequestFromEvents(context.Background(), ts.URL, events) + require.NoError(t, err) + + resp, err := ts.Client().Do(req) + require.NoError(t, err) + + result, err := NewEventsFromHTTPResponse(resp) + require.NoError(t, err) + require.Equal(t, events, result) + }) + + t.Run("invalid events", func(t *testing.T) { + events := []event.Event{event.New()} + _, err := NewHTTPRequestFromEvents(context.Background(), ts.URL, events) + require.ErrorContains(t, err, "id: MUST be a non-empty string") + }) - result, err := NewEventsFromHTTPResponse(resp) - require.NoError(t, err) - require.Equal(t, events, result) } func TestIsHTTPBatch(t *testing.T) {