Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling Pub/Sub to send and receive from multiple topics and subscriptions. #171

Merged
merged 7 commits into from
Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions cmd/samples/pubsub/converter/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ import (
"github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context"
"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
)

/*

curl -X POST -H "Content-Type: application/json" -d '{"id":123,"message":"hello world"}' http://localhost:8080

*/

type envConfig struct {
ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"`
TopicID string `envconfig:"PUBSUB_TOPIC" default:"demo_cloudevents" required:"true"`
Expand Down Expand Up @@ -44,7 +51,7 @@ func convert(ctx context.Context, m transport.Message, err error) (*cloudevents.
log.Printf("trying to recover from %v", err)

if msg, ok := m.(*pubsub.Message); ok {
tx := pubsub.TransportContextFrom(ctx)
tx := pscontext.TransportContextFrom(ctx)
// Make a new event and convert the message payload.
event := cloudevents.NewEvent()
event.SetSource("github.com/cloudevents/cmd/samples/pubsub/converter/receiver")
Expand Down Expand Up @@ -82,9 +89,3 @@ func main() {
log.Printf("will listen on %s/%s\n", env.TopicID, env.SubscriptionID)
log.Fatalf("failed to start receiver: %s", c.StartReceiver(ctx, gotEvent))
}

/*

curl -X POST -H "Content-Type: application/json" -d '{"id":123,"message":"hello world"}' http://localhost:8080

*/
16 changes: 8 additions & 8 deletions cmd/samples/pubsub/converter/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/kelseyhightower/envconfig"
)

/*

To view: gcloud pubsub subscriptions pull --auto-ack foo

To post: gcloud pubsub topics publish demo_cloudevents --message '{"id":123,"message":"hi from the terminal"}'

*/

type envConfig struct {
ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT" required:"true"`

Expand Down Expand Up @@ -69,11 +77,3 @@ func main() {
log.Printf("Could not publish message: %v", err)
}
}

/*

To view: gcloud pubsub subscriptions pull --auto-ack foo

To post: gcloud pubsub topics publish demo_cloudevents --message '{"id":123,"message":"hi from the terminal"}'

*/
89 changes: 89 additions & 0 deletions cmd/samples/pubsub/multireceiver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"context"
"fmt"
"log"
"os"
"strings"

"github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context"
"github.com/kelseyhightower/envconfig"
)

/*

To setup:

gcloud pubsub topics create demo_cloudevents
gcloud pubsub subscriptions create foo --topic=demo_cloudevents

To test:

gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}'

To fix a bad message:

gcloud pubsub subscriptions pull --auto-ack foo

*/

type envConfig struct {
ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"`

SubscriptionIDs string `envconfig:"PUBSUB_SUBSCRIPTIONS" default:"foo" required:"true"`
}

type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}

func receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
fmt.Printf("Event Context: %+v\n", event.Context)

fmt.Printf("Transport Context: %+v\n", pscontext.TransportContextFrom(ctx))

data := &Example{}
if err := event.DataAs(data); err != nil {
fmt.Printf("Got Data Error: %s\n", err.Error())
}
fmt.Printf("Data: %+v\n", data)

fmt.Printf("----------------------------\n")
return nil
}

func main() {
ctx := context.Background()

var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}

var opts []cepubsub.Option
opts = append(opts, cepubsub.WithProjectID(env.ProjectID))
for _, subscription := range strings.Split(env.SubscriptionIDs, ",") {
opts = append(opts, cepubsub.WithSubscriptionID(subscription))
}

t, err := cepubsub.New(context.Background(), opts...)
if err != nil {
log.Fatalf("failed to create pubsub transport, %s", err.Error())
}
c, err := client.New(t)
if err != nil {
log.Fatalf("failed to create client, %s", err.Error())
}

log.Println("Created client, listening...")

if err := c.StartReceiver(ctx, receive); err != nil {
log.Fatalf("failed to start pubsub receiver, %s", err.Error())
}
}
78 changes: 78 additions & 0 deletions cmd/samples/pubsub/multisender/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"context"
"log"
"os"
"strings"

"github.com/kelseyhightower/envconfig"

"github.com/cloudevents/sdk-go"
cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
)

/*

gcloud pubsub topics create ce1
gcloud pubsub topics create ce2
gcloud pubsub topics create ce3

gcloud pubsub subscriptions create ce1_sub --topic=ce1
gcloud pubsub subscriptions create ce2_sub --topic=ce2
gcloud pubsub subscriptions create ce3_sub --topic=ce3

*/

type envConfig struct {
ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT" required:"true"`

TopicIDs string `envconfig:"PUBSUB_TOPICS" default:"demo_cloudevents" required:"true"`
}

// Basic data struct.
type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}

func main() {
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}

t, err := cepubsub.New(context.Background(),
cepubsub.WithProjectID(env.ProjectID))
if err != nil {
log.Printf("failed to create pubsub transport, %s", err.Error())
os.Exit(1)
}
c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Printf("failed to create client, %s", err.Error())
os.Exit(1)
}

for i, topic := range strings.Split(env.TopicIDs, ",") {
ctx := cecontext.WithTopic(context.Background(), topic)
event := cloudevents.NewEvent(cloudevents.VersionV03)
event.SetType("com.cloudevents.sample.sent")
event.SetSource("github.com/cloudevents/sdk-go/cmd/samples/pubsub/multisender/")
_ = event.SetData(&Example{
Sequence: i,
Message: "HELLO " + topic,
})

_, err = c.Send(ctx, event)

if err != nil {
log.Printf("failed to send: %v", err)
os.Exit(1)
}
}

os.Exit(0)
}
32 changes: 25 additions & 7 deletions cmd/samples/pubsub/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,28 @@ import (

"github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context"
"github.com/kelseyhightower/envconfig"
)

/*

To setup:

gcloud pubsub topics create demo_cloudevents
gcloud pubsub subscriptions create foo --topic=demo_cloudevents

To test:

gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}'

To fix a bad message:

gcloud pubsub subscriptions pull --auto-ack foo

*/

type envConfig struct {
ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"`

Expand All @@ -28,7 +46,7 @@ type Example struct {
func receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
fmt.Printf("Event Context: %+v\n", event.Context)

fmt.Printf("Transport Context: %+v\n", cloudeventspubsub.TransportContextFrom(ctx))
fmt.Printf("Transport Context: %+v\n", pscontext.TransportContextFrom(ctx))

data := &Example{}
if err := event.DataAs(data); err != nil {
Expand All @@ -49,10 +67,10 @@ func main() {
os.Exit(1)
}

t, err := cloudeventspubsub.New(context.Background(),
cloudeventspubsub.WithProjectID(env.ProjectID),
cloudeventspubsub.WithTopicID(env.TopicID),
cloudeventspubsub.WithSubscriptionID(env.SubscriptionID))
t, err := cepubsub.New(context.Background(),
cepubsub.WithProjectID(env.ProjectID),
cepubsub.WithTopicID(env.TopicID),
cepubsub.WithSubscriptionID(env.SubscriptionID))
if err != nil {
log.Fatalf("failed to create pubsub transport, %s", err.Error())
}
Expand All @@ -64,6 +82,6 @@ func main() {
log.Println("Created client, listening...")

if err := c.StartReceiver(ctx, receive); err != nil {
log.Fatalf("failed to start nats receiver, %s", err.Error())
log.Fatalf("failed to start pubsub receiver, %s", err.Error())
}
}
8 changes: 4 additions & 4 deletions cmd/samples/pubsub/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"

"github.com/cloudevents/sdk-go"
cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
"github.com/kelseyhightower/envconfig"
)

Expand All @@ -29,9 +29,9 @@ func main() {
os.Exit(1)
}

t, err := cloudeventspubsub.New(context.Background(),
cloudeventspubsub.WithProjectID(env.ProjectID),
cloudeventspubsub.WithTopicID(env.TopicID))
t, err := cepubsub.New(context.Background(),
cepubsub.WithProjectID(env.ProjectID),
cepubsub.WithTopicID(env.TopicID))
if err != nil {
log.Printf("failed to create pubsub transport, %s", err.Error())
os.Exit(1)
Expand Down
22 changes: 22 additions & 0 deletions pkg/cloudevents/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@ func TargetFrom(ctx context.Context) *url.URL {
return nil
}

// Opaque key type used to store topic
type topicKeyType struct{}

var topicKey = topicKeyType{}

// WithTopic returns back a new context with the given topic. Topic is intended to be transport dependent.
// For pubsub transport, `topic` should be a Pub/Sub Topic ID.
func WithTopic(ctx context.Context, topic string) context.Context {
return context.WithValue(ctx, topicKey, topic)
}

// TopicFrom looks in the given context and returns `topic` as a string if found and valid, otherwise "".
func TopicFrom(ctx context.Context) string {
c := ctx.Value(topicKey)
if c != nil {
if s, ok := c.(string); ok {
return s
}
}
return ""
}

// Opaque key type used to store encoding
type encodingKeyType struct{}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pubsub
package context

import (
"context"
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudevents/transport/pubsub/context/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package context_test
1 change: 0 additions & 1 deletion pkg/cloudevents/transport/pubsub/context_test.go

This file was deleted.

Loading