Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: Batch Events from HTTP Request and Response #829

Merged
merged 5 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions v2/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,14 @@ var (
ToMessage = binding.ToMessage

// Event Creation
NewEventFromHTTPRequest = http.NewEventFromHTTPRequest
NewEventFromHTTPResponse = http.NewEventFromHTTPResponse

NewEventFromHTTPRequest = http.NewEventFromHTTPRequest
NewEventFromHTTPResponse = http.NewEventFromHTTPResponse
NewEventsFromHTTPRequest = http.NewEventsFromHTTPRequest
NewEventsFromHTTPResponse = http.NewEventsFromHTTPResponse
NewHTTPRequestFromEvents = http.NewHTTPRequestFromEvents
NewHTTPRequestFromEvent = http.NewHTTPRequestFromEvent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: be consistent and switch order to FromEvent then FromEvents - easy one to miss when going through this list.

IsHTTPBatch = http.IsHTTPBatch

// HTTP Messages

Expand Down
5 changes: 5 additions & 0 deletions v2/binding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const (
EncodingEvent
// When the encoding is unknown (which means that the message is a non-event)
EncodingUnknown

// EncodingBatch is an instance of JSON Batched Events
EncodingBatch
)

func (e Encoding) String() string {
Expand All @@ -29,6 +32,8 @@ func (e Encoding) String() string {
return "structured"
case EncodingEvent:
return "event"
case EncodingBatch:
return "batch"
case EncodingUnknown:
return "unknown"
}
Expand Down
19 changes: 19 additions & 0 deletions v2/binding/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package format

import (
"encoding/json"
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -41,12 +42,30 @@ func (jsonFmt) Unmarshal(b []byte, e *event.Event) error {
return json.Unmarshal(b, e)
}

// JSONBatch is the built-in "application/cloudevents-batch+json" format.
var JSONBatch = jsonBatchFmt{}

type jsonBatchFmt struct{}

func (jb jsonBatchFmt) MediaType() string {
return event.ApplicationCloudEventsBatchJSON
}

func (jb jsonBatchFmt) Marshal(e *event.Event) ([]byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error threw me off until I saw your implementation in to_event.go - perhaps a short comment that this is handled by the other package?

return nil, errors.New("not supported for batch events")
}

func (jb jsonBatchFmt) Unmarshal(b []byte, e *event.Event) error {
return errors.New("not supported for batch events")
}

// built-in formats
var formats map[string]Format

func init() {
formats = map[string]Format{}
Add(JSON)
Add(JSONBatch)
}

// Lookup returns the format for contentType, or nil if not found.
Expand Down
20 changes: 20 additions & 0 deletions v2/binding/to_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package binding
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -21,6 +22,9 @@ import (
// ErrCannotConvertToEvent is a generic error when a conversion of a Message to an Event fails
var ErrCannotConvertToEvent = errors.New("cannot convert message to event")

// ErrCannotConvertToEvents is a generic error when a conversion of a Message to a Batched Event fails
var ErrCannotConvertToEvents = errors.New("cannot convert message to batched events")

// ToEvent translates a Message with a valid Structured or Binary representation to an Event.
// This function returns the Event generated from the Message and the original encoding of the message or
// an error that points the conversion error.
Expand Down Expand Up @@ -61,6 +65,22 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo
return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder)
}

// ToEvents translates a Batch Message and corresponding Reader data to a slice of Events.
// This function returns the Events generated from the body data, or an error that points
// to the conversion issue.
func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]event.Event, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: can we have a test with a partial invalid event in the batch request body? I would like to understand the semantics of this API, i.e. which states []event.Event can have depending on the input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally MessageReader should define ReadBatch, even though currently batching mode is only defined by HTTP protocol binding spec. I'm fine with not adding ReadBatch but I wonder if it makes sense to match the ToEvent signature and do the type switch in the function body. Or have ToEvents take a *Message instead of MessageReader.

I think it's fine to not initially supporting transformers on batched events until someone ask for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally MessageReader should define ReadBatch, even though currently batching mode is only defined by HTTP protocol binding spec.

I started playing with this, and as soon as that interface changes, it requires changes across lots of codebase, so I'm hesitant to make that change. Or maybe that change should be a subsequent PR for a larger architectural change?

I'm fine with not adding ReadBatch but I wonder if it makes sense to match the ToEvent signature and do the type switch in the function body. Or have ToEvents take a *Message instead of MessageReader.

Not sure I'm 100% sure I'm following here. Are you saying that there is a way I could get the io.Reader from the MessageReader? If so, I'd love some direction - it wasn't immediately obvious to me.

I think it's fine to not initially supporting transformers on batched events until someone ask for it.

👍🏻

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I'm 100% sure I'm following here. Are you saying that there is a way I could get the io.Reader from the MessageReader? If so, I'd love some direction - it wasn't immediately obvious to me.

You can do something like this:

switch mt := message.(type) {
case *http.Message:
  body := mt.BodyReader
  // Since Format doesn't support batch Marshalling, and we know it's structured batch json, we'll go direct to the
  // json.UnMarshall(), since that is the best way to support batch operations for now.
  var events []event.Event
  return events, json.NewDecoder(body).Decode(&events)
}

Then return an error for all other cases.

Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 interesting. Curious - why have the system return an error at runtime, when you could do the check at compile time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToEvents takes a MessageReader, which is more general than http.Message. In your case you have only one (http) caller but in theory it does not have to (be the only one).

You can either make ToEvents to handle only http message, in which case just use *http.Message as the second argument type and remove body, or make it a bit more generally applicable and consistent with ToEvent and pass MessageReader only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I see 🤔 that makes sense -- we could set ourselves up so if more implementations of Batch come down the pipe, we could handle them here in this place, and expand the switch. SGTM. I'll make the change to Message and do the type switch 👍🏻

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hit a pretty good wrinkle on making this change. I can't reference the concrete type at all within ToEvents -- because I get into a cyclic dependency issue.

package github.com/cloudevents/sdk-go/v2
        imports github.com/cloudevents/sdk-go/v2/binding
        imports github.com/cloudevents/sdk-go/v2/protocol/http
        imports github.com/cloudevents/sdk-go/v2/binding: import cycle not allowed

So my suggestion - leave this as is, and then as there is a need to create more implementation on Batch processing, implement a BatchEvents on MessageReader and do the work to implement that across the code base. I figure no point in doing all that work, if the demand may not be there.

messageEncoding := message.ReadEncoding()
if messageEncoding != EncodingBatch {
return nil, ErrCannotConvertToEvents
}

// Since Format doesn't support batch Marshalling, and we know it's structured batch json, we'll go direct to the
// json.UnMarshall(), since that is the best way to support batch operations for now.
var events []event.Event
err := json.NewDecoder(body).Decode(&events)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could directly return here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice trick. I like it 👍🏻

return events, err
}

type messageToEventBuilder event.Event

var _ StructuredWriter = (*messageToEventBuilder)(nil)
Expand Down
3 changes: 3 additions & 0 deletions v2/protocol/http/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func (m *Message) ReadEncoding() binding.Encoding {
return binding.EncodingBinary
}
if m.format != nil {
if m.format == format.JSONBatch {
return binding.EncodingBatch
}
return binding.EncodingStructured
}
return binding.EncodingUnknown
Expand Down
62 changes: 62 additions & 0 deletions v2/protocol/http/utility.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package http

import (
"bytes"
"context"
"encoding/json"
nethttp "net/http"

"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -24,3 +26,63 @@ func NewEventFromHTTPResponse(resp *nethttp.Response) (*event.Event, error) {
msg := NewMessageFromHttpResponse(resp)
return binding.ToEvent(context.Background(), msg)
}

// NewEventsFromHTTPRequest returns a batched set of Events from a http.Request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better from a HTTP request ?

func NewEventsFromHTTPRequest(req *nethttp.Request) ([]event.Event, error) {
msg := NewMessageFromHttpRequest(req)
return binding.ToEvents(context.Background(), msg, msg.BodyReader)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: use req.Context() here? Not sure if that provides value/advantage in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was being consistent with all the other utility functions as previously written (they all used context.Background(). My suggestion would e that if we want to make that change it should probably be in a separate PR.

}

// NewEventsFromHTTPResponse returns a batched set of Events from a http.Response
func NewEventsFromHTTPResponse(resp *nethttp.Response) ([]event.Event, error) {
msg := NewMessageFromHttpResponse(resp)
return binding.ToEvents(context.Background(), msg, msg.BodyReader)
}

// NewHTTPRequestFromEvent creates a http.Request object that can be used with any http.Client for a singular event.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line wrapping seem off

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure what you mean here. The line is 116 chars (I usually wrap at 120), is that what you are referring to?

func NewHTTPRequestFromEvent(ctx context.Context, url string, event event.Event) (*nethttp.Request, error) {
if err := event.Validate(); err != nil {
return nil, err
}

req, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodPost, url, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I know this is a helper, but since we're hardcoding POST behavior here, would be nice to see this doc-ed in the comment. Not all webhooks use POST, unfortunately.

if err != nil {
return nil, err
}
if err := WriteRequest(ctx, (*binding.EventMessage)(&event), req); err != nil {
return nil, err
}

return req, nil
}

// NewHTTPRequestFromEvents creates a http.Request object that can be used with any http.Client for sending
// a batched set of events.
func NewHTTPRequestFromEvents(ctx context.Context, url string, events []event.Event) (*nethttp.Request, error) {
// Sending batch events is quite straightforward, as there is only JSON format, so a simple implementation.
for _, e := range events {
if err := e.Validate(); err != nil {
return nil, err
}
}
var buffer bytes.Buffer
err := json.NewEncoder(&buffer).Encode(events)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own education, here we just encode the events directly, but above for a single event we use WriteRequest which seems to have quite a bit more logic. Is any of that logic needed for batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way to send batch is in a JSON encoded format, so there is no need for much of that logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't need to worry about things like the "transformers"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 What is there to transform? From my reading of the batch spec, it's JSON in, JSON out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL I dunno. I just noticed that for the single event it has that transform stuff and I didn't want to miss anything. @lionelvillard any idea what this transform stuff is for and why we need it for single events but not batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figure at least we don't need it for the utility functions, since none of the existing functions in utility.go don't pass in transformers.

The good thing is, if there was a need to add an optional list of transformers to the method signature isn't going to break.

if err != nil {
return nil, err
}

request, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodPost, url, &buffer)
if err != nil {
return nil, err
}

request.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON)

return request, nil
}

// IsHTTPBatch returns of the current http.Request or http.Response is a batch event operation, by checking the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: returns of?

// header `Content-Type` value.
func IsHTTPBatch(header nethttp.Header) bool {
return header.Get(ContentType) == event.ApplicationCloudEventsBatchJSON
}
162 changes: 160 additions & 2 deletions v2/protocol/http/utility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@ package http
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/stretchr/testify/require"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewEventFromHttpRequest(t *testing.T) {
Expand Down Expand Up @@ -89,3 +92,158 @@ func TestNewEventFromHttpResponse(t *testing.T) {
})
}
}

func TestNewEventsFromHTTPRequest(t *testing.T) {
Copy link
Contributor

@lionelvillard lionelvillard Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test where NewEventsFromHTTPRequest returns an error (for instance when the request is an event in the JSON Event Format) and maybe another one with more than one event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this suggestion! Ended up leading me to the code to see how easy it would be to do a NewHTTPRequestFromEvent, so added that in as well.

Updates to tests also implemented.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add test cases which actually assert an error?

type expected struct {
len int
ids []string
}

fixtures := map[string]struct {
jsn string
expected expected
}{
"single": {
jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`,
expected: expected{
len: 1,
ids: []string{"id"},
},
},
"triple": {
jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id1","source":"source","specversion":"1.0","type":"type"},{"data":"foo","datacontenttype":"application/json","id":"id2","source":"source","specversion":"1.0","type":"type"},{"data":"foo","datacontenttype":"application/json","id":"id3","source":"source","specversion":"1.0","type":"type"}]`,
expected: expected{
len: 3,
ids: []string{"id1", "id2", "id3"},
},
},
}

for k, v := range fixtures {
t.Run(k, func(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte(v.jsn)))
req.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON)

events, err := NewEventsFromHTTPRequest(req)
require.NoError(t, err)
require.Len(t, events, v.expected.len)
for i, e := range events {
test.AssertEvent(t, e, test.IsValid())
require.Equal(t, v.expected.ids[i], events[i].ID())
}
})
}

t.Run("bad request", func(t *testing.T) {
e := event.New()
e.SetID(uuid.New().String())
e.SetSource("example/uri")
e.SetType("example.type")
require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"}))
req, err := NewHTTPRequestFromEvent(context.Background(), "http://localhost", e)
require.NoError(t, err)

_, err = NewEventsFromHTTPRequest(req)
require.ErrorContainsf(t, err, "cannot convert message to batched events", "error should include message")
})
}

func TestNewEventsFromHTTPResponse(t *testing.T) {
data := `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`
resp := http.Response{
Header: http.Header{
"Content-Type": {event.ApplicationCloudEventsBatchJSON},
},
Body: io.NopCloser(bytes.NewReader([]byte(data))),
ContentLength: int64(len(data)),
}
events, err := NewEventsFromHTTPResponse(&resp)
require.NoError(t, err)
require.Len(t, events, 1)
test.AssertEvent(t, events[0], test.IsValid())
}

func TestNewHTTPRequestFromEvent(t *testing.T) {
e := event.New()
e.SetID(uuid.New().String())
e.SetSource("example/uri")
e.SetType("example.type")
require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"}))

// echo back what we get, so we can compare events at either side.
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set(ContentType, r.Header.Get(ContentType))
// copy across structured headers
for k, v := range r.Header {
if strings.HasPrefix(k, "Ce-") {
w.Header()[k] = v
}
}

b, err := io.ReadAll(r.Body)
require.NoError(t, err)
require.NoError(t, r.Body.Close())
_, err = w.Write(b)
require.NoError(t, err)
}))
defer ts.Close()

req, err := NewHTTPRequestFromEvent(context.Background(), ts.URL, e)
require.NoError(t, err)

resp, err := ts.Client().Do(req)
require.NoError(t, err)

result, err := NewEventFromHTTPResponse(resp)
require.NoError(t, err)
require.Equal(t, &e, result)
}

func TestNewHTTPRequestFromEvents(t *testing.T) {
var events []event.Event
e := event.New()
e.SetID(uuid.New().String())
e.SetSource("example/uri")
e.SetType("example.type")
require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"}))
events = append(events, e.Clone())

e.SetID(uuid.New().String())
require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"goodbye": "world"}))
events = append(events, e)

require.Len(t, events, 2)
require.NotEqual(t, events[0], events[1])

// echo back what we get, so we can compare events at either side.
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set(ContentType, r.Header.Get(ContentType))
b, err := io.ReadAll(r.Body)
require.NoError(t, err)
require.NoError(t, r.Body.Close())
_, err = w.Write(b)
require.NoError(t, err)
}))
defer ts.Close()

req, err := NewHTTPRequestFromEvents(context.Background(), ts.URL, events)
require.NoError(t, err)

resp, err := ts.Client().Do(req)
require.NoError(t, err)

result, err := NewEventsFromHTTPResponse(resp)
require.NoError(t, err)
require.Equal(t, events, result)
}

func TestIsHTTPBatch(t *testing.T) {
header := http.Header{}
assert.False(t, IsHTTPBatch(header))

header.Set(ContentType, event.ApplicationJSON)
assert.False(t, IsHTTPBatch(header))

header.Set(ContentType, event.ApplicationCloudEventsBatchJSON)
assert.True(t, IsHTTPBatch(header))
}