Skip to content

Commit

Permalink
Enabling Pub/Sub to send and receive from multiple topics and subscri…
Browse files Browse the repository at this point in the history
…ptions. (#171)

Signed-off-by: Scott Nichols <nicholss@google.com>
  • Loading branch information
n3wscott authored and markpeek committed Aug 19, 2019
1 parent 7ef8ff8 commit 54b73fe
Show file tree
Hide file tree
Showing 13 changed files with 559 additions and 164 deletions.
15 changes: 8 additions & 7 deletions cmd/samples/pubsub/converter/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ import (
"github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context"
"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
)

/*
curl -X POST -H "Content-Type: application/json" -d '{"id":123,"message":"hello world"}' http://localhost:8080
*/

type envConfig struct {
ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"`
TopicID string `envconfig:"PUBSUB_TOPIC" default:"demo_cloudevents" required:"true"`
Expand Down Expand Up @@ -44,7 +51,7 @@ func convert(ctx context.Context, m transport.Message, err error) (*cloudevents.
log.Printf("trying to recover from %v", err)

if msg, ok := m.(*pubsub.Message); ok {
tx := pubsub.TransportContextFrom(ctx)
tx := pscontext.TransportContextFrom(ctx)
// Make a new event and convert the message payload.
event := cloudevents.NewEvent()
event.SetSource("github.com/cloudevents/cmd/samples/pubsub/converter/receiver")
Expand Down Expand Up @@ -82,9 +89,3 @@ func main() {
log.Printf("will listen on %s/%s\n", env.TopicID, env.SubscriptionID)
log.Fatalf("failed to start receiver: %s", c.StartReceiver(ctx, gotEvent))
}

/*
curl -X POST -H "Content-Type: application/json" -d '{"id":123,"message":"hello world"}' http://localhost:8080
*/
16 changes: 8 additions & 8 deletions cmd/samples/pubsub/converter/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/kelseyhightower/envconfig"
)

/*
To view: gcloud pubsub subscriptions pull --auto-ack foo
To post: gcloud pubsub topics publish demo_cloudevents --message '{"id":123,"message":"hi from the terminal"}'
*/

type envConfig struct {
ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT" required:"true"`

Expand Down Expand Up @@ -69,11 +77,3 @@ func main() {
log.Printf("Could not publish message: %v", err)
}
}

/*
To view: gcloud pubsub subscriptions pull --auto-ack foo
To post: gcloud pubsub topics publish demo_cloudevents --message '{"id":123,"message":"hi from the terminal"}'
*/
89 changes: 89 additions & 0 deletions cmd/samples/pubsub/multireceiver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"context"
"fmt"
"log"
"os"
"strings"

"github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context"
"github.com/kelseyhightower/envconfig"
)

/*
To setup:
gcloud pubsub topics create demo_cloudevents
gcloud pubsub subscriptions create foo --topic=demo_cloudevents
To test:
gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}'
To fix a bad message:
gcloud pubsub subscriptions pull --auto-ack foo
*/

type envConfig struct {
ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"`

SubscriptionIDs string `envconfig:"PUBSUB_SUBSCRIPTIONS" default:"foo" required:"true"`
}

type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}

func receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
fmt.Printf("Event Context: %+v\n", event.Context)

fmt.Printf("Transport Context: %+v\n", pscontext.TransportContextFrom(ctx))

data := &Example{}
if err := event.DataAs(data); err != nil {
fmt.Printf("Got Data Error: %s\n", err.Error())
}
fmt.Printf("Data: %+v\n", data)

fmt.Printf("----------------------------\n")
return nil
}

func main() {
ctx := context.Background()

var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}

var opts []cepubsub.Option
opts = append(opts, cepubsub.WithProjectID(env.ProjectID))
for _, subscription := range strings.Split(env.SubscriptionIDs, ",") {
opts = append(opts, cepubsub.WithSubscriptionID(subscription))
}

t, err := cepubsub.New(context.Background(), opts...)
if err != nil {
log.Fatalf("failed to create pubsub transport, %s", err.Error())
}
c, err := client.New(t)
if err != nil {
log.Fatalf("failed to create client, %s", err.Error())
}

log.Println("Created client, listening...")

if err := c.StartReceiver(ctx, receive); err != nil {
log.Fatalf("failed to start pubsub receiver, %s", err.Error())
}
}
78 changes: 78 additions & 0 deletions cmd/samples/pubsub/multisender/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"context"
"log"
"os"
"strings"

"github.com/kelseyhightower/envconfig"

"github.com/cloudevents/sdk-go"
cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
)

/*
gcloud pubsub topics create ce1
gcloud pubsub topics create ce2
gcloud pubsub topics create ce3
gcloud pubsub subscriptions create ce1_sub --topic=ce1
gcloud pubsub subscriptions create ce2_sub --topic=ce2
gcloud pubsub subscriptions create ce3_sub --topic=ce3
*/

type envConfig struct {
ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT" required:"true"`

TopicIDs string `envconfig:"PUBSUB_TOPICS" default:"demo_cloudevents" required:"true"`
}

// Basic data struct.
type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}

func main() {
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}

t, err := cepubsub.New(context.Background(),
cepubsub.WithProjectID(env.ProjectID))
if err != nil {
log.Printf("failed to create pubsub transport, %s", err.Error())
os.Exit(1)
}
c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Printf("failed to create client, %s", err.Error())
os.Exit(1)
}

for i, topic := range strings.Split(env.TopicIDs, ",") {
ctx := cecontext.WithTopic(context.Background(), topic)
event := cloudevents.NewEvent(cloudevents.VersionV03)
event.SetType("com.cloudevents.sample.sent")
event.SetSource("github.com/cloudevents/sdk-go/cmd/samples/pubsub/multisender/")
_ = event.SetData(&Example{
Sequence: i,
Message: "HELLO " + topic,
})

_, err = c.Send(ctx, event)

if err != nil {
log.Printf("failed to send: %v", err)
os.Exit(1)
}
}

os.Exit(0)
}
32 changes: 25 additions & 7 deletions cmd/samples/pubsub/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,28 @@ import (

"github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context"
"github.com/kelseyhightower/envconfig"
)

/*
To setup:
gcloud pubsub topics create demo_cloudevents
gcloud pubsub subscriptions create foo --topic=demo_cloudevents
To test:
gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}'
To fix a bad message:
gcloud pubsub subscriptions pull --auto-ack foo
*/

type envConfig struct {
ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"`

Expand All @@ -28,7 +46,7 @@ type Example struct {
func receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
fmt.Printf("Event Context: %+v\n", event.Context)

fmt.Printf("Transport Context: %+v\n", cloudeventspubsub.TransportContextFrom(ctx))
fmt.Printf("Transport Context: %+v\n", pscontext.TransportContextFrom(ctx))

data := &Example{}
if err := event.DataAs(data); err != nil {
Expand All @@ -49,10 +67,10 @@ func main() {
os.Exit(1)
}

t, err := cloudeventspubsub.New(context.Background(),
cloudeventspubsub.WithProjectID(env.ProjectID),
cloudeventspubsub.WithTopicID(env.TopicID),
cloudeventspubsub.WithSubscriptionID(env.SubscriptionID))
t, err := cepubsub.New(context.Background(),
cepubsub.WithProjectID(env.ProjectID),
cepubsub.WithTopicID(env.TopicID),
cepubsub.WithSubscriptionID(env.SubscriptionID))
if err != nil {
log.Fatalf("failed to create pubsub transport, %s", err.Error())
}
Expand All @@ -64,6 +82,6 @@ func main() {
log.Println("Created client, listening...")

if err := c.StartReceiver(ctx, receive); err != nil {
log.Fatalf("failed to start nats receiver, %s", err.Error())
log.Fatalf("failed to start pubsub receiver, %s", err.Error())
}
}
8 changes: 4 additions & 4 deletions cmd/samples/pubsub/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"

"github.com/cloudevents/sdk-go"
cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
"github.com/kelseyhightower/envconfig"
)

Expand All @@ -29,9 +29,9 @@ func main() {
os.Exit(1)
}

t, err := cloudeventspubsub.New(context.Background(),
cloudeventspubsub.WithProjectID(env.ProjectID),
cloudeventspubsub.WithTopicID(env.TopicID))
t, err := cepubsub.New(context.Background(),
cepubsub.WithProjectID(env.ProjectID),
cepubsub.WithTopicID(env.TopicID))
if err != nil {
log.Printf("failed to create pubsub transport, %s", err.Error())
os.Exit(1)
Expand Down
22 changes: 22 additions & 0 deletions pkg/cloudevents/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@ func TargetFrom(ctx context.Context) *url.URL {
return nil
}

// Opaque key type used to store topic
type topicKeyType struct{}

var topicKey = topicKeyType{}

// WithTopic returns back a new context with the given topic. Topic is intended to be transport dependent.
// For pubsub transport, `topic` should be a Pub/Sub Topic ID.
func WithTopic(ctx context.Context, topic string) context.Context {
return context.WithValue(ctx, topicKey, topic)
}

// TopicFrom looks in the given context and returns `topic` as a string if found and valid, otherwise "".
func TopicFrom(ctx context.Context) string {
c := ctx.Value(topicKey)
if c != nil {
if s, ok := c.(string); ok {
return s
}
}
return ""
}

// Opaque key type used to store encoding
type encodingKeyType struct{}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pubsub
package context

import (
"context"
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudevents/transport/pubsub/context/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package context_test
1 change: 0 additions & 1 deletion pkg/cloudevents/transport/pubsub/context_test.go

This file was deleted.

Loading

0 comments on commit 54b73fe

Please sign in to comment.