Skip to content

Commit

Permalink
Add missing alias imports; Moving to var-arg transformers (#390)
Browse files Browse the repository at this point in the history
* adding vararg transformers

Signed-off-by: Scott Nichols <snichols@vmware.com>

* drop if len, more nil checks

Signed-off-by: Scott Nichols <snichols@vmware.com>

* Move ToEvent to var arg Transformers.

Signed-off-by: Scott Nichols <snichols@vmware.com>
  • Loading branch information
n3wscott authored Mar 17, 2020
1 parent a4293fb commit f248e16
Show file tree
Hide file tree
Showing 22 changed files with 90 additions and 63 deletions.
10 changes: 9 additions & 1 deletion alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type HTTPProtocol = http.Protocol

type Encoding = binding.Encoding

// Message

type Message = binding.Message

const (
// ReadEncoding

Expand Down Expand Up @@ -95,12 +99,16 @@ var (
NewEvent = event.New
NewResult = protocol.NewResult

NewHTTPResponse = http.NewResult
NewHTTPResult = http.NewResult

// Message Creation

ToMessage = binding.ToMessage

// HTTP Messages

WriteHTTPRequest = http.WriteRequest

// Tracing

EnableTracing = observability.EnableTracing
Expand Down
2 changes: 1 addition & 1 deletion cmd/samples/httpb/responder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ func main() {
func gotEvent(ctx context.Context, event cloudevents.Event) (*event.Event, protocol.Result) {
fmt.Printf("Got Event: %+v\n", event)

return &event, cloudevents.NewHTTPResponse(http.StatusAccepted, "accept")
return &event, cloudevents.NewHTTPResult(http.StatusAccepted, "accept")
}
18 changes: 13 additions & 5 deletions legacy/pkg/binding/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ func (t TransformerFactories) StructuredTransformer(encoder StructuredEncoder) S
}
res := encoder
for _, b := range t {
if new := b.StructuredTransformer(res); new != nil {
res = new
if b == nil {
continue
}
if r := b.StructuredTransformer(res); r != nil {
res = r
} else {
return nil // Structured not supported!
}
Expand All @@ -42,8 +45,11 @@ func (t TransformerFactories) BinaryTransformer(encoder BinaryEncoder) BinaryEnc
}
res := encoder
for _, b := range t {
if new := b.BinaryTransformer(res); new != nil {
res = new
if b == nil {
continue
}
if r := b.BinaryTransformer(res); r != nil {
res = r
} else {
return nil // Binary not supported!
}
Expand All @@ -54,8 +60,10 @@ func (t TransformerFactories) BinaryTransformer(encoder BinaryEncoder) BinaryEnc
func (t TransformerFactories) EventTransformer() EventTransformer {
return func(e *ce.Event) error {
for _, factory := range t {
if factory == nil {
continue
}
f := factory.EventTransformer()

if f != nil {
err := f(e)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion legacy/pkg/bindings/amqp/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// Fill the provided amqpMessage with the message m.
// Using context you can tweak the encoding processing (more details on binding.Translate documentation).
func EncodeAMQPMessage(ctx context.Context, m binding.Message, amqpMessage *amqp.Message, transformerFactories binding.TransformerFactories) error {
func EncodeAMQPMessage(ctx context.Context, m binding.Message, amqpMessage *amqp.Message, transformerFactories ...binding.TransformerFactory) error {
structuredEncoder := (*amqpMessageEncoder)(amqpMessage)
binaryEncoder := (*amqpMessageEncoder)(amqpMessage)

Expand Down
4 changes: 2 additions & 2 deletions pkg/binding/buffering/copy_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func CopyMessage(ctx context.Context, m binding.Message, transformers binding.Tr
return nil, binding.ErrUnknownEncoding
}
if originalMessageEncoding == binding.EncodingEvent {
e, err := binding.ToEvent(ctx, m, transformers)
e, err := binding.ToEvent(ctx, m, transformers...)
if err != nil {
return nil, err
}
Expand All @@ -44,7 +44,7 @@ func CopyMessage(ctx context.Context, m binding.Message, transformers binding.Tr
} else if encoding == binding.EncodingBinary {
return &bm, err
} else {
e, err := binding.ToEvent(ctx, m, transformers)
e, err := binding.ToEvent(ctx, m, transformers...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/binding/buffering/copy_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestCopyMessage(t *testing.T) {
require.NoError(t, err)
// The copy can be read any number of times
for i := 0; i < 3; i++ {
got, err := binding.ToEvent(context.Background(), cpy, nil)
got, err := binding.ToEvent(context.Background(), cpy)
assert.NoError(t, err)
require.Equal(t, tt.encoding, cpy.ReadEncoding())
AssertEventEquals(t, ExToStr(t, tt.want), ExToStr(t, *got))
Expand All @@ -101,7 +101,7 @@ func TestCopyMessage(t *testing.T) {
require.NoError(t, err)
// The copy can be read any number of times
for i := 0; i < 3; i++ {
got, err := binding.ToEvent(context.Background(), cpy, nil)
got, err := binding.ToEvent(context.Background(), cpy)
assert.NoError(t, err)
require.Equal(t, tt.encoding, cpy.ReadEncoding())
AssertEventEquals(t, ExToStr(t, tt.want), ExToStr(t, *got))
Expand Down
2 changes: 1 addition & 1 deletion pkg/binding/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func MustJSON(e event.Event) []byte {

// Must convert the Message to event.Event
func MustToEvent(t *testing.T, ctx context.Context, m binding.Message) event.Event {
e, err := binding.ToEvent(ctx, m, nil)
e, err := binding.ToEvent(ctx, m)
require.NoError(t, err)
return *e
}
6 changes: 3 additions & 3 deletions pkg/binding/test/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ func RunTransformerTests(t *testing.T, ctx context.Context, tests []TransformerT
mockStructured := MockStructuredMessage{}
mockBinary := MockBinaryMessage{}

enc, err := binding.Write(ctx, inputMessage, &mockStructured, &mockBinary, tt.Transformers)
enc, err := binding.Write(ctx, inputMessage, &mockStructured, &mockBinary, tt.Transformers...)
require.NoError(t, err)

var e *event.Event
if enc == binding.EncodingStructured {
e, err = binding.ToEvent(ctx, &mockStructured, nil)
e, err = binding.ToEvent(ctx, &mockStructured)
require.NoError(t, err)
} else if enc == binding.EncodingBinary {
e, err = binding.ToEvent(ctx, &mockBinary, nil)
e, err = binding.ToEvent(ctx, &mockBinary)
require.NoError(t, err)
} else {
t.Fatalf("Unexpected encoding %v", enc)
Expand Down
18 changes: 13 additions & 5 deletions pkg/binding/to_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ var ErrCannotConvertToEvent = errors.New("cannot convert message to event")
// This function returns the Event generated from the Message and the original encoding of the message or
// an error that points the conversion error.
// transformers can be nil and this function guarantees that they are invoked only once during the encoding process.
func ToEvent(ctx context.Context, message MessageReader, transformers TransformerFactories) (*event.Event, error) {
func ToEvent(ctx context.Context, message MessageReader, transformers ...TransformerFactory) (*event.Event, error) {
messageEncoding := message.ReadEncoding()
if messageEncoding == EncodingEvent {
for m := message; m != nil; {
if em, ok := m.(*EventMessage); ok {
e := (*event.Event)(em)
if err := transformers.EventTransformer()(e); err != nil {
return nil, err
if transformers != nil {
var tf TransformerFactories
tf = transformers
if err := tf.EventTransformer()(e); err != nil {
return nil, err
}
}
return e, nil
}
Expand All @@ -50,8 +54,12 @@ func ToEvent(ctx context.Context, message MessageReader, transformers Transforme
); err != nil {
return nil, err
}
if err := transformers.EventTransformer()(&e); err != nil {
return nil, err
if transformers != nil {
var tf TransformerFactories
tf = transformers
if err := tf.EventTransformer()(&e); err != nil {
return nil, err
}
}
return &e, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/binding/to_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestToEvent(t *testing.T) {
e := tt.event.Clone()
inputMessage = binding.ToMessage(&e)
}
got, err := binding.ToEvent(context.Background(), inputMessage, nil)
got, err := binding.ToEvent(context.Background(), inputMessage)
require.NoError(t, err)
test.AssertEventEquals(t, test.ExToStr(t, tt.want), test.ExToStr(t, *got))
})
Expand Down
49 changes: 26 additions & 23 deletions pkg/binding/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ func (t TransformerFactories) StructuredTransformer(encoder StructuredWriter) St
return nil
}
res := encoder
if t != nil {
for _, b := range t {
if new := b.StructuredTransformer(res); new != nil {
res = new
} else {
return nil // Structured not supported!
}
for _, b := range t {
if b == nil {
continue
}
if r := b.StructuredTransformer(res); r != nil {
res = r
} else {
return nil // Structured not supported!
}
}
return res
Expand All @@ -45,29 +46,31 @@ func (t TransformerFactories) BinaryTransformer(encoder BinaryWriter) BinaryWrit
return nil
}
res := encoder
if t != nil {
for i, _ := range t {
if new := t[len(t)-i-1].BinaryTransformer(res); new != nil {
res = new
} else {
return nil // Binary not supported!
}
for i := range t {
b := t[len(t)-i-1]
if b == nil {
continue
}
if r := b.BinaryTransformer(res); r != nil {
res = r
} else {
return nil // Binary not supported!
}
}
return res
}

func (t TransformerFactories) EventTransformer() EventTransformer {
return func(e *event.Event) error {
if t != nil {
for _, factory := range t {
f := factory.EventTransformer()

if f != nil {
err := f(e)
if err != nil {
return err
}
for _, b := range t {
if b == nil {
continue
}
f := b.EventTransformer()
if f != nil {
err := f(e)
if err != nil {
return err
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/binding/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func Write(
message MessageReader,
structuredWriter StructuredWriter,
binaryWriter BinaryWriter,
transformers TransformerFactories,
transformers ...TransformerFactory,
) (Encoding, error) {
enc := message.ReadEncoding()
var err error
Expand All @@ -87,7 +87,7 @@ func Write(
}

var e *event.Event
e, err = ToEvent(ctx, message, transformers)
e, err = ToEvent(ctx, message, transformers...)
if err != nil {
return enc, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c *ceClient) Request(ctx context.Context, e event.Event) (*event.Event, er
}()
if err == nil {
fmt.Printf("%#v", msg)
if rs, err := binding.ToEvent(ctx, msg, nil); err != nil {
if rs, err := binding.ToEvent(ctx, msg); err != nil {
cecontext.LoggerFrom(ctx).Infow("failed calling ToEvent", zap.Error(err), zap.Any("resp", msg))
} else {
resp = rs
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p
}
}()

e, err := binding.ToEvent(ctx, m, nil)
e, err := binding.ToEvent(ctx, m)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/amqp/write_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// Fill the provided amqpMessage with the message m.
// Using context you can tweak the encoding processing (more details on binding.Write documentation).
func WriteMessage(ctx context.Context, m binding.Message, amqpMessage *amqp.Message, transformers binding.TransformerFactories) error {
func WriteMessage(ctx context.Context, m binding.Message, amqpMessage *amqp.Message, transformers ...binding.TransformerFactory) error {
structuredWriter := (*amqpMessageWriter)(amqpMessage)
binaryWriter := (*amqpMessageWriter)(amqpMessage)

Expand All @@ -24,7 +24,7 @@ func WriteMessage(ctx context.Context, m binding.Message, amqpMessage *amqp.Mess
m,
structuredWriter,
binaryWriter,
transformers,
transformers...,
)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/http/write_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// Fill the provided httpRequest with the message m.
// Using context you can tweak the encoding processing (more details on binding.Write documentation).
func WriteRequest(ctx context.Context, m binding.Message, httpRequest *http.Request, transformers binding.TransformerFactories) error {
func WriteRequest(ctx context.Context, m binding.Message, httpRequest *http.Request, transformers ...binding.TransformerFactory) error {
structuredWriter := (*httpRequestWriter)(httpRequest)
binaryWriter := (*httpRequestWriter)(httpRequest)

Expand All @@ -23,7 +23,7 @@ func WriteRequest(ctx context.Context, m binding.Message, httpRequest *http.Requ
m,
structuredWriter,
binaryWriter,
transformers,
transformers...,
)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/http/write_responsewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// Write out to the the provided httpResponseWriter with the message m.
// Using context you can tweak the encoding processing (more details on binding.Write documentation).
func WriteResponseWriter(ctx context.Context, m binding.Message, status int, rw http.ResponseWriter, transformers binding.TransformerFactories) error {
func WriteResponseWriter(ctx context.Context, m binding.Message, status int, rw http.ResponseWriter, transformers ...binding.TransformerFactory) error {
if status < 200 || status >= 600 {
status = http.StatusOK
}
Expand All @@ -25,7 +25,7 @@ func WriteResponseWriter(ctx context.Context, m binding.Message, status int, rw
m,
writer,
writer,
transformers,
transformers...,
)
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/http/write_responsewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestWriteHttpResponseWriter(t *testing.T) {
messageOut := NewMessage(res.Header(), ioutil.NopCloser(bytes.NewReader(res.Body.Bytes())))
require.Equal(t, tt.expectedEncoding, messageOut.ReadEncoding())

eventOut, err := binding.ToEvent(context.TODO(), messageOut, nil)
eventOut, err := binding.ToEvent(context.TODO(), messageOut)
require.NoError(t, err)
test.AssertEventEquals(t, eventIn, *eventOut)
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/kafka_sarama/message_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ func BenchmarkNewBinaryMessage(b *testing.B) {
func BenchmarkNewStructuredMessageToEvent(b *testing.B) {
for i := 0; i < b.N; i++ {
M = kafka_sarama.NewMessageFromConsumerMessage(structuredConsumerMessage)
Event, Err = binding.ToEvent(context.TODO(), M, nil)
Event, Err = binding.ToEvent(context.TODO(), M)
}
}

func BenchmarkNewBinaryMessageToEvent(b *testing.B) {
for i := 0; i < b.N; i++ {
M = kafka_sarama.NewMessageFromConsumerMessage(binaryConsumerMessage)
Event, Err = binding.ToEvent(context.TODO(), M, nil)
Event, Err = binding.ToEvent(context.TODO(), M)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/kafka_sarama/write_producer_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (

// Fill the provided producerMessage with the message m.
// Using context you can tweak the encoding processing (more details on binding.Write documentation).
func WriteProducerMessage(ctx context.Context, m binding.Message, producerMessage *sarama.ProducerMessage, transformerFactories binding.TransformerFactories) error {
func WriteProducerMessage(ctx context.Context, m binding.Message, producerMessage *sarama.ProducerMessage, transformerFactories ...binding.TransformerFactory) error {
enc := (*kafkaProducerMessageWriter)(producerMessage)

_, err := binding.Write(
ctx,
m,
enc,
enc,
transformerFactories,
transformerFactories...,
)
return err
}
Expand Down
Loading

0 comments on commit f248e16

Please sign in to comment.