Skip to content

Commit

Permalink
rename transport to protocol. (#389)
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Nichols <snichols@vmware.com>
  • Loading branch information
n3wscott authored Mar 16, 2020
1 parent c9bd2c3 commit 6017d94
Show file tree
Hide file tree
Showing 74 changed files with 140 additions and 134 deletions.
6 changes: 3 additions & 3 deletions alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/cloudevents/sdk-go/pkg/context"
"github.com/cloudevents/sdk-go/pkg/event"
"github.com/cloudevents/sdk-go/pkg/observability"
"github.com/cloudevents/sdk-go/pkg/transport"
"github.com/cloudevents/sdk-go/pkg/transport/http"
"github.com/cloudevents/sdk-go/pkg/protocol"
"github.com/cloudevents/sdk-go/pkg/protocol/http"
"github.com/cloudevents/sdk-go/pkg/types"
)

Expand Down Expand Up @@ -93,7 +93,7 @@ var (
// Event Creation

NewEvent = event.New
NewResult = transport.NewResult
NewResult = protocol.NewResult

NewHTTPResponse = http.NewResult

Expand Down
4 changes: 2 additions & 2 deletions cmd/samples/amqp/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

ce "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/client"
ceamqp "github.com/cloudevents/sdk-go/pkg/transport/amqp"
ceamqp "github.com/cloudevents/sdk-go/pkg/protocol/amqp"
amqp "pack.ag/amqp"
)

Expand All @@ -37,7 +37,7 @@ func main() {
host, node, opts := sampleConfig()
t, err := ceamqp.New(host, node, opts...)
if err != nil {
log.Fatalf("Failed to create AMQP transport: %v", err)
log.Fatalf("Failed to create AMQP protocol: %v", err)
}
c, err := client.New(t)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/samples/amqp/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/client"
"github.com/cloudevents/sdk-go/pkg/event"
ceamqp "github.com/cloudevents/sdk-go/pkg/transport/amqp"
ceamqp "github.com/cloudevents/sdk-go/pkg/protocol/amqp"
"github.com/cloudevents/sdk-go/pkg/types"
"github.com/google/uuid"
"pack.ag/amqp"
Expand Down Expand Up @@ -72,7 +72,7 @@ func main() {
host, node, opts := sampleConfig()
t, err := ceamqp.New(host, node, opts...)
if err != nil {
log.Fatalf("Failed to create amqp transport: %v", err)
log.Fatalf("Failed to create amqp protocol: %v", err)
}
c, err := client.New(t)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/samples/httpb/responder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"net/http"
"os"

"github.com/cloudevents/sdk-go/pkg/transport"
"github.com/cloudevents/sdk-go/pkg/protocol"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/event"
Expand Down Expand Up @@ -47,7 +47,7 @@ func main() {
}
}

func gotEvent(ctx context.Context, event cloudevents.Event) (*event.Event, transport.Result) {
func gotEvent(ctx context.Context, event cloudevents.Event) (*event.Event, protocol.Result) {
fmt.Printf("Got Event: %+v\n", event)

return &event, cloudevents.NewHTTPResponse(http.StatusAccepted, "accept")
Expand Down
6 changes: 3 additions & 3 deletions cmd/samples/httptonats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

"github.com/cloudevents/sdk-go/pkg/client"
"github.com/cloudevents/sdk-go/pkg/event"
cloudeventshttp "github.com/cloudevents/sdk-go/pkg/transport/http"
cloudeventsnats "github.com/cloudevents/sdk-go/pkg/transport/nats"
cloudeventshttp "github.com/cloudevents/sdk-go/pkg/protocol/http"
cloudeventsnats "github.com/cloudevents/sdk-go/pkg/protocol/nats"
"github.com/kelseyhightower/envconfig"
)

Expand Down Expand Up @@ -66,7 +66,7 @@ func _main(args []string, env envConfig) int {

np, err := cloudeventsnats.New(env.NATSServer, env.Subject)
if err != nil {
log.Fatalf("failed to create nats transport, %s", err.Error())
log.Fatalf("failed to create nats protcol, %s", err.Error())
}
nc, err := client.New(np)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/samples/kafka/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/Shopify/sarama"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/transport/kafka_sarama"
"github.com/cloudevents/sdk-go/pkg/protocol/kafka_sarama"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/samples/kafka/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/Shopify/sarama"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/transport/kafka_sarama"
"github.com/cloudevents/sdk-go/pkg/protocol/kafka_sarama"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/samples/kafka/sender_and_receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/Shopify/sarama"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/transport/kafka_sarama"
"github.com/cloudevents/sdk-go/pkg/protocol/kafka_sarama"
)

func main() {
Expand Down
4 changes: 2 additions & 2 deletions cmd/samples/nats/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/cloudevents/sdk-go/pkg/client"
"github.com/cloudevents/sdk-go/pkg/event"
cloudeventsnats "github.com/cloudevents/sdk-go/pkg/transport/nats"
cloudeventsnats "github.com/cloudevents/sdk-go/pkg/protocol/nats"
"github.com/kelseyhightower/envconfig"
)

Expand All @@ -30,7 +30,7 @@ func main() {

p, err := cloudeventsnats.New(env.NATSServer, env.Subject)
if err != nil {
log.Fatalf("failed to create nats transport, %s", err.Error())
log.Fatalf("failed to create nats protocol, %s", err.Error())
}
c, err := client.New(p)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/samples/nats/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/cloudevents/sdk-go/pkg/client"
"github.com/cloudevents/sdk-go/pkg/event"
cloudeventsnats "github.com/cloudevents/sdk-go/pkg/transport/nats"
cloudeventsnats "github.com/cloudevents/sdk-go/pkg/protocol/nats"
"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
)
Expand Down Expand Up @@ -41,7 +41,7 @@ func main() {
for _, contentType := range []string{"application/json", "application/xml"} {
p, err := cloudeventsnats.New(env.NATSServer, env.Subject)
if err != nil {
log.Printf("failed to create nats transport, %s", err.Error())
log.Printf("failed to create nats protocol, %s", err.Error())
os.Exit(1)
}
c, err := client.New(p)
Expand Down
49 changes: 25 additions & 24 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/cloudevents/sdk-go/pkg/binding"
cecontext "github.com/cloudevents/sdk-go/pkg/context"
"go.uber.org/zap"
"io"
"sync"

"go.uber.org/zap"

"github.com/cloudevents/sdk-go/pkg/binding"
cecontext "github.com/cloudevents/sdk-go/pkg/context"
"github.com/cloudevents/sdk-go/pkg/event"
"github.com/cloudevents/sdk-go/pkg/transport"
"github.com/cloudevents/sdk-go/pkg/protocol"
)

// Client interface defines the runtime contract the CloudEvents client supports.
Expand All @@ -24,43 +25,43 @@ type Client interface {
Request(ctx context.Context, event event.Event) (*event.Event, error)

// StartReceiver will register the provided function for callback on receipt
// of a cloudevent. It will also start the underlying transport as it has
// of a cloudevent. It will also start the underlying protocol as it has
// been configured.
// This call is blocking.
// Valid fn signatures are:
// * func()
// * func() error
// * func(context.Context)
// * func(context.Context) transport.Result
// * func(context.Context) protocol.Result
// * func(event.Event)
// * func(event.Event) transport.Result
// * func(event.Event) protocol.Result
// * func(context.Context, event.Event)
// * func(context.Context, event.Event) transport.Result
// * func(context.Context, event.Event) protocol.Result
// * func(event.Event) *event.Event
// * func(event.Event) (*event.Event, transport.Result)
// * func(event.Event) (*event.Event, protocol.Result)
// * func(context.Context, event.Event) *event.Event
// * func(context.Context, event.Event) (*event.Event, transport.Result)
// * func(context.Context, event.Event) (*event.Event, protocol.Result)
StartReceiver(ctx context.Context, fn interface{}) error
}

// New produces a new client with the provided transport object and applied
// client options.
func New(protocol interface{}, opts ...Option) (Client, error) {
func New(obj interface{}, opts ...Option) (Client, error) {
c := &ceClient{}

if p, ok := protocol.(transport.Sender); ok {
if p, ok := obj.(protocol.Sender); ok {
c.sender = p
}
if p, ok := protocol.(transport.Requester); ok {
if p, ok := obj.(protocol.Requester); ok {
c.requester = p
}
if p, ok := protocol.(transport.Responder); ok {
if p, ok := obj.(protocol.Responder); ok {
c.responder = p
}
if p, ok := protocol.(transport.Receiver); ok {
if p, ok := obj.(protocol.Receiver); ok {
c.receiver = p
}
if p, ok := protocol.(transport.Opener); ok {
if p, ok := obj.(protocol.Opener); ok {
c.opener = p
}

Expand All @@ -71,12 +72,12 @@ func New(protocol interface{}, opts ...Option) (Client, error) {
}

type ceClient struct {
sender transport.Sender
requester transport.Requester
receiver transport.Receiver
responder transport.Responder
sender protocol.Sender
requester protocol.Requester
receiver protocol.Receiver
responder protocol.Responder
// Optional.
opener transport.Opener
opener protocol.Opener

outboundContextDecorators []func(context.Context) context.Context
invoker Invoker
Expand Down Expand Up @@ -167,10 +168,10 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
return err
}
if invoker.IsReceiver() && c.receiver == nil {
return fmt.Errorf("mismatched receiver callback without transport.Receiver supported by protocol")
return fmt.Errorf("mismatched receiver callback without protocol.Receiver supported by protocol")
}
if invoker.IsResponder() && c.responder == nil {
return fmt.Errorf("mismatched receiver callback without transport.Responder supported by protocol")
return fmt.Errorf("mismatched receiver callback without protocol.Responder supported by protocol")
}
c.invoker = invoker

Expand All @@ -189,7 +190,7 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
}

var msg binding.Message
var respFn transport.ResponseFn
var respFn protocol.ResponseFn
// Start Polling.
for {
if c.responder != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/client_default.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package client

import (
"github.com/cloudevents/sdk-go/pkg/transport/http"
"github.com/cloudevents/sdk-go/pkg/protocol/http"
)

// NewDefault provides the good defaults for the common case using an HTTP
Expand Down
6 changes: 3 additions & 3 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/cloudevents/sdk-go/pkg/client"
"github.com/cloudevents/sdk-go/pkg/event"
"github.com/cloudevents/sdk-go/pkg/transport"
cehttp "github.com/cloudevents/sdk-go/pkg/transport/http"
"github.com/cloudevents/sdk-go/pkg/protocol"
cehttp "github.com/cloudevents/sdk-go/pkg/protocol/http"
"github.com/cloudevents/sdk-go/pkg/types"
)

Expand Down Expand Up @@ -514,7 +514,7 @@ func TestTracedClientReceive(t *testing.T) {

ctx, cancel := context.WithCancel(context.TODO())
go func() {
err = c.StartReceiver(ctx, func(ctx context.Context, e event.Event) (*event.Event, transport.Result) {
err = c.StartReceiver(ctx, func(ctx context.Context, e event.Event) (*event.Event, protocol.Result) {
go func() {
_, span := client.TraceSpan(ctx, e)
defer span.End()
Expand Down
3 changes: 2 additions & 1 deletion pkg/client/http_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package client

import (
"context"
thttp "github.com/cloudevents/sdk-go/pkg/transport/http"
"net/http"

thttp "github.com/cloudevents/sdk-go/pkg/protocol/http"
)

func NewHTTPReceiveHandler(ctx context.Context, p *thttp.Protocol, fn interface{}) (*EventReceiver, error) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/client/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package client
import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/pkg/binding"
"github.com/cloudevents/sdk-go/pkg/transport"
"github.com/cloudevents/sdk-go/pkg/protocol"
)

type Invoker interface {
Invoke(context.Context, binding.Message, transport.ResponseFn) error
Invoke(context.Context, binding.Message, protocol.ResponseFn) error
IsReceiver() bool
IsResponder() bool
}
Expand All @@ -34,7 +35,7 @@ type receiveInvoker struct {
eventDefaulterFns []EventDefaulter
}

func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn transport.ResponseFn) (err error) {
func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn protocol.ResponseFn) (err error) {
defer func() {
if err2 := m.Finish(err); err2 == nil {
err = err2
Expand Down
14 changes: 7 additions & 7 deletions pkg/client/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"reflect"

"github.com/cloudevents/sdk-go/pkg/event"
"github.com/cloudevents/sdk-go/pkg/transport"
"github.com/cloudevents/sdk-go/pkg/protocol"
)

// Receive is the signature of a fn to be invoked for incoming cloudevents.
type ReceiveFull func(context.Context, event.Event) transport.Result
type ReceiveFull func(context.Context, event.Event) protocol.Result

type receiverFn struct {
numIn int
Expand All @@ -27,14 +27,14 @@ type receiverFn struct {

const (
inParamUsage = "expected a function taking either no parameters, one or more of (context.Context, event.Event) ordered"
outParamUsage = "expected a function returning one or mode of (*event.Event, transport.Result) ordered"
outParamUsage = "expected a function returning one or mode of (*event.Event, protocol.Result) ordered"
)

var (
contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
eventType = reflect.TypeOf((*event.Event)(nil)).Elem()
eventPtrType = reflect.TypeOf((*event.Event)(nil)) // want the ptr type
resultType = reflect.TypeOf((*transport.Result)(nil)).Elem()
resultType = reflect.TypeOf((*protocol.Result)(nil)).Elem()
)

// receiver creates a receiverFn wrapper class that is used by the client to
Expand Down Expand Up @@ -72,7 +72,7 @@ func receiver(fn interface{}) (*receiverFn, error) {
return r, nil
}

func (r *receiverFn) invoke(ctx context.Context, e event.Event) (*event.Event, transport.Result) {
func (r *receiverFn) invoke(ctx context.Context, e event.Event) (*event.Event, protocol.Result) {
args := make([]reflect.Value, 0, r.numIn)

if r.numIn > 0 {
Expand All @@ -84,7 +84,7 @@ func (r *receiverFn) invoke(ctx context.Context, e event.Event) (*event.Event, t
}
}
v := r.fnValue.Call(args)
var respOut transport.Result
var respOut protocol.Result
var eOut *event.Event
if r.numOut > 0 {
i := 0
Expand All @@ -95,7 +95,7 @@ func (r *receiverFn) invoke(ctx context.Context, e event.Event) (*event.Event, t
i++ // <-- note, need to inc i.
}
if r.hasResultOut {
if resp, ok := v[i].Interface().(transport.Result); ok {
if resp, ok := v[i].Interface().(protocol.Result); ok {
respOut = resp
}
}
Expand Down
Loading

0 comments on commit 6017d94

Please sign in to comment.