Skip to content

Commit

Permalink
Implement gochan protocol. (#418)
Browse files Browse the repository at this point in the history
* adding an example protocol based on the test protocol channel based protocol

Signed-off-by: Scott Nichols <snichols@vmware.com>

* Adding test for protocol.

Signed-off-by: Scott Nichols <snichols@vmware.com>

* Promote the test protocol to a real protocol.

Signed-off-by: Scott Nichols <snichols@vmware.com>

* No need to chan chan all over.

Signed-off-by: Scott Nichols <snichols@vmware.com>
  • Loading branch information
n3wscott authored Mar 23, 2020
1 parent 9226857 commit 123def5
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 97 deletions.
14 changes: 7 additions & 7 deletions v2/client/test/mock_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
protocoltest "github.com/cloudevents/sdk-go/v2/protocol/test"
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
)

// MockSenderClient returns a client that can Send() event.
Expand All @@ -36,7 +36,7 @@ func NewMockSenderClient(t *testing.T, chanSize int, opts ...client.Option) (cli
}
}(messageCh, eventCh)

c, err := client.New(protocoltest.ChanSender(messageCh), opts...)
c, err := client.New(gochan.Sender(messageCh), opts...)
require.NoError(t, err)

return c, eventCh
Expand All @@ -61,7 +61,7 @@ func NewMockRequesterClient(t *testing.T, chanSize int, replierFn func(inMessage
return nil, err
}

chanRequester := protocoltest.ChanRequester{
chanRequester := gochan.Requester{
Ch: messageCh,
Reply: replier,
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func NewMockReceiverClient(t *testing.T, chanSize int, opts ...client.Option) (c
}
}(messageCh, eventCh)

c, err := client.New(protocoltest.ChanReceiver(messageCh), opts...)
c, err := client.New(gochan.Receiver(messageCh), opts...)
require.NoError(t, err)

return c, eventCh
Expand All @@ -128,7 +128,7 @@ func NewMockResponderClient(t *testing.T, chanSize int, opts ...client.Option) (
inMessageCh := make(chan binding.Message)

outEventCh := make(chan ClientMockResponse, chanSize)
outMessageCh := make(chan protocoltest.ChanResponderResponse)
outMessageCh := make(chan gochan.ChanResponderResponse)

// Input piping
go func(messageCh chan<- binding.Message, eventCh <-chan event.Event) {
Expand All @@ -144,7 +144,7 @@ func NewMockResponderClient(t *testing.T, chanSize int, opts ...client.Option) (
}(inMessageCh, inEventCh)

// Output piping
go func(messageCh <-chan protocoltest.ChanResponderResponse, eventCh chan<- ClientMockResponse) {
go func(messageCh <-chan gochan.ChanResponderResponse, eventCh chan<- ClientMockResponse) {
for {
select {
case m, ok := <-messageCh:
Expand All @@ -164,7 +164,7 @@ func NewMockResponderClient(t *testing.T, chanSize int, opts ...client.Option) (
}
}(outMessageCh, outEventCh)

c, err := client.New(&protocoltest.ChanResponder{In: inMessageCh, Out: outMessageCh}, opts...)
c, err := client.New(&gochan.Responder{In: inMessageCh, Out: outMessageCh}, opts...)
require.NoError(t, err)

return c, inEventCh, outEventCh
Expand Down
50 changes: 50 additions & 0 deletions v2/cmd/samples/gochan/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"context"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
"log"
"time"
)

func main() {
c, err := cloudevents.NewClient(gochan.New(), cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client: %v", err)
}

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond*50)) // wait

// Start the receiver
go func() {
if err := c.StartReceiver(ctx, func(ctx context.Context, event cloudevents.Event) {
log.Printf("[receiver] %s", event)
}); err != nil && err.Error() != "context deadline exceeded" {
log.Fatalf("[receiver] start receiver returned an error: %s", err)
}
log.Println("[receiver] stopped")
}()

// Start sending the events
for i := 0; i < 10; i++ {
e := cloudevents.NewEvent()
e.SetType("com.cloudevents.sample.sent")
e.SetSource("https://github.com/cloudevents/sdk-go/v2/cmd/samples/gochan")
_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"id": i,
"message": "Hello, World!",
})

err := c.Send(ctx, e)
if err != nil {
log.Printf("[sender] failed to send: %v", err)
} else {
log.Printf("[sender] sent: %d", i)
}
}
// Wait for the timeout.
<-ctx.Done()
cancel()
log.Println("[sender] stopped")
}
4 changes: 4 additions & 0 deletions v2/protocol/gochan/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/*
Package channel implements the CloudEvent transport implementation using go chan.
*/
package gochan
35 changes: 35 additions & 0 deletions v2/protocol/gochan/protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package gochan

import (
"context"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
)

const (
defaultChanDepth = 20
)

// SendReceiver is a reference implementation for using the CloudEvents binding
// integration.
type SendReceiver struct {
sender protocol.Sender
receiver protocol.Receiver
}

func New() *SendReceiver {
ch := make(chan binding.Message, defaultChanDepth)

return &SendReceiver{
sender: Sender(ch),
receiver: Receiver(ch),
}
}

func (s *SendReceiver) Send(ctx context.Context, in binding.Message) (err error) {
return s.sender.Send(ctx, in)
}

func (r *SendReceiver) Receive(ctx context.Context) (binding.Message, error) {
return r.receiver.Receive(ctx)
}
140 changes: 140 additions & 0 deletions v2/protocol/gochan/protocol_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package gochan

import (
"context"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestNew(t *testing.T) {
got := New()
assert.NotNil(t, got)
}

func protocols(t *testing.T) []*SendReceiver {
return []*SendReceiver{New()}
}

func TestSend(t *testing.T) {
testCases := map[string]struct {
ctx context.Context
msg binding.Message
wantErr string
}{
"nil context": {
wantErr: "nil Context",
},
"nil message": {
ctx: context.TODO(),
wantErr: "nil Message",
},
}
for n, tc := range testCases {
for _, p := range protocols(t) {
t.Run(n, func(t *testing.T) {
err := p.Send(tc.ctx, tc.msg)
if tc.wantErr != "" {
if err == nil || err.Error() != tc.wantErr {
t.Fatalf("Expected error '%s'. Actual '%v'", tc.wantErr, err)
}
} else if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
})
}
}
}

func TestReceive(t *testing.T) {
testCases := map[string]struct {
ctx context.Context
want binding.Message
wantErr string
}{
"nil context": {
wantErr: "nil Context",
},
"timeout": {
ctx: context.TODO(),
wantErr: "context deadline exceeded",
},
}
for n, tc := range testCases {
for _, p := range protocols(t) {
t.Run(n, func(t *testing.T) {
ReceiveTest(t, p, tc.ctx, tc.want, tc.wantErr)
})
}
}
}

func TestSendReceive(t *testing.T) {
testCases := map[string]struct {
sendErr string
want binding.Message
receiveErr string
}{
"nil": {
sendErr: "nil Message",
receiveErr: "context deadline exceeded",
},
"empty event": {
want: func() binding.Message {
e := event.New()
return binding.ToMessage(&e)
}(),
},
"min event": {
want: func() binding.Message {
e := event.New()
e.SetSource("unittest/")
e.SetType("unit.test")
e.SetID("unit-test")
return binding.ToMessage(&e)
}(),
},
}
for n, tc := range testCases {
for _, p := range protocols(t) {
t.Run(n, func(t *testing.T) {
go func() {
wantErr := tc.sendErr
err := p.Send(context.Background(), tc.want)
if wantErr != "" {
if err == nil || err.Error() != wantErr {
t.Fatalf("Expected error '%s'. Actual '%v'", wantErr, err)
}
} else if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}()
ReceiveTest(t, p, context.Background(), tc.want, tc.receiveErr)
})
}
}
}

func ReceiveTest(t *testing.T, p *SendReceiver, ctx context.Context, want binding.Message, wantErr string) {
if ctx != nil {
var done context.CancelFunc
ctx, done = context.WithTimeout(ctx, time.Millisecond*10)
defer done()
}

got, err := p.Receive(ctx)
if wantErr != "" {
if err == nil || err.Error() != wantErr {
t.Fatalf("Expected error '%s'. Actual '%v'", wantErr, err)
}
} else if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("unexpected diff (-want, +got) = %v", diff)
}
}
31 changes: 31 additions & 0 deletions v2/protocol/gochan/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package gochan

import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/v2/protocol"
"io"

"github.com/cloudevents/sdk-go/v2/binding"
)

// Receiver implements Receiver by receiving Messages from a channel.
type Receiver <-chan binding.Message

func (r Receiver) Receive(ctx context.Context) (binding.Message, error) {
if ctx == nil {
return nil, fmt.Errorf("nil Context")
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case m, ok := <-r:
if !ok {
return nil, io.EOF
}
return m, nil
}
}

var _ protocol.Receiver = (*Receiver)(nil)
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package test
package gochan

import (
"context"
"errors"
"fmt"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
)

type ChanRequester struct {
type Requester struct {
Ch chan<- binding.Message
Reply func(message binding.Message) (binding.Message, error)
}

func (s *ChanRequester) Send(ctx context.Context, m binding.Message) (err error) {
func (s *Requester) Send(ctx context.Context, m binding.Message) (err error) {
if ctx == nil {
return fmt.Errorf("nil Context")
} else if m == nil {
return fmt.Errorf("nil Message")
}

defer func() {
err2 := m.Finish(err)
if err == nil {
Expand All @@ -28,7 +35,7 @@ func (s *ChanRequester) Send(ctx context.Context, m binding.Message) (err error)
}
}

func (s *ChanRequester) Request(ctx context.Context, m binding.Message) (res binding.Message, err error) {
func (s *Requester) Request(ctx context.Context, m binding.Message) (res binding.Message, err error) {
defer func() {
err2 := m.Finish(err)
if err == nil {
Expand All @@ -43,14 +50,14 @@ func (s *ChanRequester) Request(ctx context.Context, m binding.Message) (res bin
}
}

func (s *ChanRequester) Close(ctx context.Context) (err error) {
func (s *Requester) Close(ctx context.Context) (err error) {
defer func() {
if recover() != nil {
err = errors.New("trying to close a closed ChanSender")
err = errors.New("trying to close a closed Sender")
}
}()
close(s.Ch)
return nil
}

var _ protocol.Requester = (*ChanRequester)(nil)
var _ protocol.RequesterCloser = (*Requester)(nil)
Loading

0 comments on commit 123def5

Please sign in to comment.