From 0ae2e6cb63287a4a54bffb2916e492859ef878c4 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Fri, 9 Aug 2019 21:01:45 -0700 Subject: [PATCH 1/7] refactor pubsub to have an internal transport wrapper. Signed-off-by: Scott Nichols --- cmd/samples/pubsub/converter/receiver/main.go | 3 +- cmd/samples/pubsub/receiver/main.go | 30 +++- .../transport/pubsub/{ => context}/context.go | 2 +- .../transport/pubsub/context/context_test.go | 1 + .../transport/pubsub/context_test.go | 1 - .../transport/pubsub/internal/conneciton.go | 166 ++++++++++++++++++ pkg/cloudevents/transport/pubsub/transport.go | 165 ++++------------- 7 files changed, 227 insertions(+), 141 deletions(-) rename pkg/cloudevents/transport/pubsub/{ => context}/context.go (99%) create mode 100644 pkg/cloudevents/transport/pubsub/context/context_test.go delete mode 100644 pkg/cloudevents/transport/pubsub/context_test.go create mode 100644 pkg/cloudevents/transport/pubsub/internal/conneciton.go diff --git a/cmd/samples/pubsub/converter/receiver/main.go b/cmd/samples/pubsub/converter/receiver/main.go index 93f50f522..8476d8422 100644 --- a/cmd/samples/pubsub/converter/receiver/main.go +++ b/cmd/samples/pubsub/converter/receiver/main.go @@ -10,6 +10,7 @@ import ( "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" "github.com/google/uuid" "github.com/kelseyhightower/envconfig" ) @@ -44,7 +45,7 @@ func convert(ctx context.Context, m transport.Message, err error) (*cloudevents. log.Printf("trying to recover from %v", err) if msg, ok := m.(*pubsub.Message); ok { - tx := pubsub.TransportContextFrom(ctx) + tx := pscontext.TransportContextFrom(ctx) // Make a new event and convert the message payload. event := cloudevents.NewEvent() event.SetSource("github.com/cloudevents/cmd/samples/pubsub/converter/receiver") diff --git a/cmd/samples/pubsub/receiver/main.go b/cmd/samples/pubsub/receiver/main.go index 33f4836bb..2de8889d6 100644 --- a/cmd/samples/pubsub/receiver/main.go +++ b/cmd/samples/pubsub/receiver/main.go @@ -8,7 +8,8 @@ import ( "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/cloudevents/client" - cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" "github.com/kelseyhightower/envconfig" ) @@ -28,7 +29,7 @@ type Example struct { func receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { fmt.Printf("Event Context: %+v\n", event.Context) - fmt.Printf("Transport Context: %+v\n", cloudeventspubsub.TransportContextFrom(ctx)) + fmt.Printf("Transport Context: %+v\n", pscontext.TransportContextFrom(ctx)) data := &Example{} if err := event.DataAs(data); err != nil { @@ -49,10 +50,10 @@ func main() { os.Exit(1) } - t, err := cloudeventspubsub.New(context.Background(), - cloudeventspubsub.WithProjectID(env.ProjectID), - cloudeventspubsub.WithTopicID(env.TopicID), - cloudeventspubsub.WithSubscriptionID(env.SubscriptionID)) + t, err := cepubsub.New(context.Background(), + cepubsub.WithProjectID(env.ProjectID), + cepubsub.WithTopicID(env.TopicID), + cepubsub.WithSubscriptionID(env.SubscriptionID)) if err != nil { log.Fatalf("failed to create pubsub transport, %s", err.Error()) } @@ -67,3 +68,20 @@ func main() { log.Fatalf("failed to start nats receiver, %s", err.Error()) } } + +/* + +To setup: + +gcloud pubsub topics create demo_cloudevents +gcloud pubsub subscriptions create foo --topic=demo_cloudevents + +To test: + +gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}' + +To fix a bad message: + +gcloud pubsub subscriptions pull --auto-ack foo + +*/ diff --git a/pkg/cloudevents/transport/pubsub/context.go b/pkg/cloudevents/transport/pubsub/context/context.go similarity index 99% rename from pkg/cloudevents/transport/pubsub/context.go rename to pkg/cloudevents/transport/pubsub/context/context.go index 57ec916ab..e64a5ee71 100644 --- a/pkg/cloudevents/transport/pubsub/context.go +++ b/pkg/cloudevents/transport/pubsub/context/context.go @@ -1,4 +1,4 @@ -package pubsub +package context import ( "context" diff --git a/pkg/cloudevents/transport/pubsub/context/context_test.go b/pkg/cloudevents/transport/pubsub/context/context_test.go new file mode 100644 index 000000000..960b2b7c5 --- /dev/null +++ b/pkg/cloudevents/transport/pubsub/context/context_test.go @@ -0,0 +1 @@ +package context_test diff --git a/pkg/cloudevents/transport/pubsub/context_test.go b/pkg/cloudevents/transport/pubsub/context_test.go deleted file mode 100644 index fcd9a357a..000000000 --- a/pkg/cloudevents/transport/pubsub/context_test.go +++ /dev/null @@ -1 +0,0 @@ -package pubsub_test diff --git a/pkg/cloudevents/transport/pubsub/internal/conneciton.go b/pkg/cloudevents/transport/pubsub/internal/conneciton.go new file mode 100644 index 000000000..b29a3d1f2 --- /dev/null +++ b/pkg/cloudevents/transport/pubsub/internal/conneciton.go @@ -0,0 +1,166 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "log" + "sync" + "time" + + "cloud.google.com/go/pubsub" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" +) + +// Transport acts as both a pubsub topic and a pubsub subscription . +type Connection struct { + // AllowCreateTopic controls if the transport can create a topic if it does + // not exist. + AllowCreateTopic bool + + // AllowCreateSubscription controls if the transport can create a + // subscription if it does not exist. + AllowCreateSubscription bool + + ProjectID string + + Client *pubsub.Client + + TopicID string + topic *pubsub.Topic + topicWasCreated bool + topicOnce sync.Once + + SubscriptionID string + sub *pubsub.Subscription + subWasCreated bool + subOnce sync.Once +} + +func (c *Connection) getOrCreateTopic(ctx context.Context) (*pubsub.Topic, error) { + var err error + c.topicOnce.Do(func() { + var ok bool + // Load the topic. + topic := c.Client.Topic(c.TopicID) + ok, err = topic.Exists(ctx) + if err != nil { + return + } + // If the topic does not exist, create a new topic with the given name. + if !ok { + if !c.AllowCreateTopic { + err = fmt.Errorf("transport not allowed to create topic %q", c.TopicID) + return + } + topic, err = c.Client.CreateTopic(ctx, c.TopicID) + if err != nil { + return + } + c.topicWasCreated = true + } + // Success. + c.topic = topic + }) + if c.topic == nil { + return nil, fmt.Errorf("unable to create topic %q, %v", c.TopicID, err) + } + return c.topic, err +} + +func (c *Connection) DeleteTopic(ctx context.Context) error { + if c.topicWasCreated { + if err := c.topic.Delete(ctx); err != nil { + return err + } + c.topic = nil + c.topicWasCreated = false + c.topicOnce = sync.Once{} + } + return errors.New("topic was not created by pubsub transport") +} + +func (c *Connection) getOrCreateSubscription(ctx context.Context) (*pubsub.Subscription, error) { + var err error + c.subOnce.Do(func() { + // Load the topic. + var topic *pubsub.Topic + topic, err = c.getOrCreateTopic(ctx) + if err != nil { + return + } + // Load the subscription. + var ok bool + sub := c.Client.Subscription(c.SubscriptionID) + ok, err = sub.Exists(ctx) + if err != nil { + return + } + // If subscription doesn't exist, create it. + if !ok { + if !c.AllowCreateSubscription { + err = fmt.Errorf("transport not allowed to create subscription %q", c.SubscriptionID) + return + } + // Create a new subscription to the previously created topic + // with the given name. + // TODO: allow to use push config + allow setting the SubscriptionConfig. + sub, err = c.Client.CreateSubscription(ctx, c.SubscriptionID, pubsub.SubscriptionConfig{ + Topic: topic, + AckDeadline: 30 * time.Second, + RetentionDuration: 25 * time.Hour, + }) + if err != nil { + _ = c.Client.Close() + return + } + c.subWasCreated = true + } + // Success. + c.sub = sub + }) + if c.sub == nil { + return nil, fmt.Errorf("unable to create sunscription %q, %v", c.SubscriptionID, err) + } + return c.sub, err +} + +func (c *Connection) DeleteSubscription(ctx context.Context) error { + if c.subWasCreated { + if err := c.sub.Delete(ctx); err != nil { + return err + } + c.sub = nil + c.subWasCreated = false + c.subOnce = sync.Once{} + } + return errors.New("subscription was not created by pubsub transport") +} + +// Send +func (c *Connection) Publish(ctx context.Context, msg *pubsub.Message) (*cloudevents.Event, error) { + topic, err := c.getOrCreateTopic(ctx) + if err != nil { + return nil, err + } + + r := topic.Publish(ctx, msg) + _, err = r.Get(ctx) + return nil, err +} + +// Start +// NOTE: This is a blocking call. +func (c *Connection) Receive(ctx context.Context, fn func(context.Context, *pubsub.Message)) error { + sub, err := c.getOrCreateSubscription(ctx) + if err != nil { + return err + } + // Ok, ready to start pulling. + return sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { + log.Println("adding internal context.") + ctx = pscontext.WithTransportContext(ctx, pscontext.NewTransportContext(c.ProjectID, c.TopicID, c.SubscriptionID, "pull", m)) + fn(ctx, m) + }) +} diff --git a/pkg/cloudevents/transport/pubsub/transport.go b/pkg/cloudevents/transport/pubsub/transport.go index f249aa4e3..843cdffcd 100644 --- a/pkg/cloudevents/transport/pubsub/transport.go +++ b/pkg/cloudevents/transport/pubsub/transport.go @@ -2,10 +2,8 @@ package pubsub import ( "context" - "errors" "fmt" "sync" - "time" "go.uber.org/zap" @@ -13,6 +11,7 @@ import ( "github.com/cloudevents/sdk-go/pkg/cloudevents" cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/internal" ) // Transport adheres to transport.Transport. @@ -44,19 +43,12 @@ type Transport struct { // subscription if it does not exist. AllowCreateSubscription bool - projectID string - - client *pubsub.Client - - topicID string - topic *pubsub.Topic - topicWasCreated bool - topicOnce sync.Once - + projectID string + topicID string subscriptionID string - sub *pubsub.Subscription - subWasCreated bool - subOnce sync.Once + client *pubsub.Client + + connections map[string]*internal.Connection // Receiver Receiver transport.Receiver @@ -82,6 +74,10 @@ func New(ctx context.Context, opts ...Option) (*Transport, error) { // Success. t.client = client } + + if t.connections == nil { + t.connections = make(map[string]*internal.Connection, 0) + } return t, nil } @@ -115,106 +111,27 @@ func (t *Transport) loadCodec(ctx context.Context) bool { return true } -func (t *Transport) getOrCreateTopic(ctx context.Context) (*pubsub.Topic, error) { - var err error - t.topicOnce.Do(func() { - var ok bool - // Load the topic. - topic := t.client.Topic(t.topicID) - ok, err = topic.Exists(ctx) - if err != nil { - _ = t.client.Close() - return - } - // If the topic does not exist, create a new topic with the given name. - if !ok { - if !t.AllowCreateTopic { - err = fmt.Errorf("transport not allowed to create topic %q", t.topicID) - return - } - topic, err = t.client.CreateTopic(ctx, t.topicID) - if err != nil { - return - } - t.topicWasCreated = true - } - // Success. - t.topic = topic - }) - if t.topic == nil { - return nil, fmt.Errorf("unable to create topic %q", t.topicID) - } - return t.topic, err -} - -func (t *Transport) DeleteTopic(ctx context.Context) error { - if t.topicWasCreated { - if err := t.topic.Delete(ctx); err != nil { - return err - } - t.topic = nil - t.topicWasCreated = false - t.topicOnce = sync.Once{} - } - return errors.New("topic was not created by pubsub transport") +func (t *Transport) getConnectionKey(ctx context.Context, topic, subscription string) string { + return fmt.Sprintf("Topic:%s Subscription:%s", topic, subscription) } -func (t *Transport) getOrCreateSubscription(ctx context.Context) (*pubsub.Subscription, error) { - var err error - t.subOnce.Do(func() { - // Load the topic. - var topic *pubsub.Topic - topic, err = t.getOrCreateTopic(ctx) - if err != nil { - return - } - // Load the subscription. - var ok bool - sub := t.client.Subscription(t.subscriptionID) - ok, err = sub.Exists(ctx) - if err != nil { - _ = t.client.Close() - return - } - // If subscription doesn't exist, create it. - if !ok { - if !t.AllowCreateSubscription { - err = fmt.Errorf("transport not allowed to create subscription %q", t.subscriptionID) - return - } - // Create a new subscription to the previously created topic - // with the given name. - // TODO: allow to use push config + allow setting the SubscriptionConfig. - sub, err = t.client.CreateSubscription(ctx, t.subscriptionID, pubsub.SubscriptionConfig{ - Topic: topic, - AckDeadline: 30 * time.Second, - RetentionDuration: 25 * time.Hour, - }) - if err != nil { - _ = t.client.Close() - return - } - t.subWasCreated = true - } - // Success. - t.sub = sub - }) - if t.sub == nil { - return nil, fmt.Errorf("unable to create sunscription %q", t.subscriptionID) +func (t *Transport) getOrCreateConnection(ctx context.Context, topic, subscription string) *internal.Connection { + // Get. + key := t.getConnectionKey(ctx, topic, subscription) + if conn, ok := t.connections[key]; ok { + return conn } - return t.sub, err -} - -func (t *Transport) DeleteSubscription(ctx context.Context) error { - if t.subWasCreated { - if err := t.sub.Delete(ctx); err != nil { - return err - } - t.sub = nil - t.subWasCreated = false - t.subOnce = sync.Once{} + // Create. + conn := &internal.Connection{ + AllowCreateSubscription: t.AllowCreateSubscription, + AllowCreateTopic: t.AllowCreateTopic, + Client: t.client, + ProjectID: t.projectID, + TopicID: topic, + SubscriptionID: subscription, } - return errors.New("subscription was not created by pubsub transport") + t.connections[key] = conn + return conn } // Send implements Transport.Send @@ -223,28 +140,18 @@ func (t *Transport) Send(ctx context.Context, event cloudevents.Event) (*cloudev return nil, fmt.Errorf("unknown encoding set on transport: %d", t.Encoding) } - msg, err := t.codec.Encode(ctx, event) - if err != nil { - return nil, err - } + conn := t.getOrCreateConnection(ctx, t.topicID, t.subscriptionID) - topic, err := t.getOrCreateTopic(ctx) + msg, err := t.codec.Encode(ctx, event) if err != nil { return nil, err } if m, ok := msg.(*Message); ok { - - r := topic.Publish(ctx, &pubsub.Message{ + return conn.Publish(ctx, &pubsub.Message{ Attributes: m.Attributes, Data: m.Data, }) - - _, err := r.Get(ctx) - if err != nil { - return nil, err - } - return nil, nil } return nil, fmt.Errorf("failed to encode Event into a Message") @@ -275,14 +182,10 @@ func (t *Transport) StartReceiver(ctx context.Context) error { return fmt.Errorf("unknown encoding set on transport: %d", t.Encoding) } - sub, err := t.getOrCreateSubscription(ctx) - if err != nil { - return err - } - // Ok, ready to start pulling. - err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { - ctx = WithTransportContext(ctx, NewTransportContext(t.projectID, t.topicID, t.subscriptionID, "pull", m)) + conn := t.getOrCreateConnection(ctx, t.topicID, t.subscriptionID) + // Ok, ready to start pulling. + return conn.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { msg := &Message{ Attributes: m.Attributes, Data: m.Data, @@ -305,6 +208,4 @@ func (t *Transport) StartReceiver(ctx context.Context) error { } m.Ack() }) - - return err } From bba4603deca769b6aa154c58b03c5f39ebffc2a7 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Sat, 10 Aug 2019 16:45:52 -0700 Subject: [PATCH 2/7] working on multi impl. Signed-off-by: Scott Nichols --- cmd/samples/pubsub/multireceiver/main.go | 89 ++++++++++++ cmd/samples/pubsub/multisender/main.go | 76 +++++++++++ cmd/samples/pubsub/receiver/main.go | 2 +- go.mod | 1 + pkg/cloudevents/context/context.go | 22 +++ .../transport/pubsub/internal/conneciton.go | 47 +++++-- pkg/cloudevents/transport/pubsub/options.go | 9 +- pkg/cloudevents/transport/pubsub/transport.go | 127 +++++++++++++----- 8 files changed, 327 insertions(+), 46 deletions(-) create mode 100644 cmd/samples/pubsub/multireceiver/main.go create mode 100644 cmd/samples/pubsub/multisender/main.go diff --git a/cmd/samples/pubsub/multireceiver/main.go b/cmd/samples/pubsub/multireceiver/main.go new file mode 100644 index 000000000..77a69f286 --- /dev/null +++ b/cmd/samples/pubsub/multireceiver/main.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "strings" + + "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go/pkg/cloudevents/client" + cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" + "github.com/kelseyhightower/envconfig" +) + +type envConfig struct { + ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"` + + SubscriptionIDs string `envconfig:"PUBSUB_SUBSCRIPTIONS" default:"foo" required:"true"` +} + +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { + fmt.Printf("Event Context: %+v\n", event.Context) + + fmt.Printf("Transport Context: %+v\n", pscontext.TransportContextFrom(ctx)) + + data := &Example{} + if err := event.DataAs(data); err != nil { + fmt.Printf("Got Data Error: %s\n", err.Error()) + } + fmt.Printf("Data: %+v\n", data) + + fmt.Printf("----------------------------\n") + return nil +} + +func main() { + ctx := context.Background() + + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Printf("[ERROR] Failed to process env var: %s", err) + os.Exit(1) + } + + var opts []cepubsub.Option + opts = append(opts, cepubsub.WithProjectID(env.ProjectID)) + for _, subscription := range strings.Split(env.SubscriptionIDs, ",") { + opts = append(opts, cepubsub.WithSubscriptionID(subscription)) + } + + t, err := cepubsub.New(context.Background(), opts...) + if err != nil { + log.Fatalf("failed to create pubsub transport, %s", err.Error()) + } + c, err := client.New(t) + if err != nil { + log.Fatalf("failed to create client, %s", err.Error()) + } + + log.Println("Created client, listening...") + + if err := c.StartReceiver(ctx, receive); err != nil { + log.Fatalf("failed to start pubsub receiver, %s", err.Error()) + } +} + +/* + +To setup: + +gcloud pubsub topics create demo_cloudevents +gcloud pubsub subscriptions create foo --topic=demo_cloudevents + +To test: + +gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}' + +To fix a bad message: + +gcloud pubsub subscriptions pull --auto-ack foo + +*/ diff --git a/cmd/samples/pubsub/multisender/main.go b/cmd/samples/pubsub/multisender/main.go new file mode 100644 index 000000000..0f955529e --- /dev/null +++ b/cmd/samples/pubsub/multisender/main.go @@ -0,0 +1,76 @@ +package main + +import ( + "context" + "log" + "os" + "strings" + + "github.com/cloudevents/sdk-go" + cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" + cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + "github.com/kelseyhightower/envconfig" +) + +type envConfig struct { + ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT" required:"true"` + + TopicIDs string `envconfig:"PUBSUB_TOPICS" default:"demo_cloudevents" required:"true"` +} + +// Basic data struct. +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func main() { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Printf("[ERROR] Failed to process env var: %s", err) + os.Exit(1) + } + + t, err := cloudeventspubsub.New(context.Background(), + cloudeventspubsub.WithProjectID(env.ProjectID)) + if err != nil { + log.Printf("failed to create pubsub transport, %s", err.Error()) + os.Exit(1) + } + c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + log.Printf("failed to create client, %s", err.Error()) + os.Exit(1) + } + + for i, topic := range strings.Split(env.TopicIDs, ",") { + ctx := cecontext.WithTopic(context.Background(), topic) + event := cloudevents.NewEvent(cloudevents.VersionV03) + event.SetType("com.cloudevents.sample.sent") + event.SetSource("github.com/cloudevents/sdk-go/cmd/samples/pubsub/multisender/") + _ = event.SetData(&Example{ + Sequence: i, + Message: "HELLO " + topic, + }) + + _, err = c.Send(ctx, event) + + if err != nil { + log.Printf("failed to send: %v", err) + os.Exit(1) + } + } + + os.Exit(0) +} + +/* +gcloud pubsub topics create ce1 +gcloud pubsub topics create ce2 +gcloud pubsub topics create ce3 + +gcloud pubsub subscriptions create ce1_sub --topic=ce1 +gcloud pubsub subscriptions create ce2_sub --topic=ce2 +gcloud pubsub subscriptions create ce3_sub --topic=ce3 + +*/ diff --git a/cmd/samples/pubsub/receiver/main.go b/cmd/samples/pubsub/receiver/main.go index 2de8889d6..945e089f3 100644 --- a/cmd/samples/pubsub/receiver/main.go +++ b/cmd/samples/pubsub/receiver/main.go @@ -65,7 +65,7 @@ func main() { log.Println("Created client, listening...") if err := c.StartReceiver(ctx, receive); err != nil { - log.Fatalf("failed to start nats receiver, %s", err.Error()) + log.Fatalf("failed to start pubsub receiver, %s", err.Error()) } } diff --git a/go.mod b/go.mod index 0824342d2..26ba560e3 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Azure/go-autorest/autorest/to v0.2.0 // indirect github.com/Azure/go-autorest/autorest/validation v0.1.0 // indirect github.com/fortytw2/leaktest v1.3.0 // indirect + github.com/gogo/protobuf v1.2.0 // indirect github.com/google/go-cmp v0.3.0 github.com/google/uuid v1.1.1 github.com/kelseyhightower/envconfig v1.4.0 diff --git a/pkg/cloudevents/context/context.go b/pkg/cloudevents/context/context.go index 3a338ab5d..e580360f1 100644 --- a/pkg/cloudevents/context/context.go +++ b/pkg/cloudevents/context/context.go @@ -30,6 +30,28 @@ func TargetFrom(ctx context.Context) *url.URL { return nil } +// Opaque key type used to store topic +type topicKeyType struct{} + +var topicKey = topicKeyType{} + +// WithTopic returns back a new context with the given topic. Topic is intended to be transport dependent. +// For pubsub transport, `topic` should be a Pub/Sub Topic ID. +func WithTopic(ctx context.Context, topic string) context.Context { + return context.WithValue(ctx, topicKey, topic) +} + +// TopicFrom looks in the given context and returns `topic` as a string if found and valid, otherwise "". +func TopicFrom(ctx context.Context) string { + c := ctx.Value(topicKey) + if c != nil { + if s, ok := c.(string); ok { + return s + } + } + return "" +} + // Opaque key type used to store encoding type encodingKeyType struct{} diff --git a/pkg/cloudevents/transport/pubsub/internal/conneciton.go b/pkg/cloudevents/transport/pubsub/internal/conneciton.go index b29a3d1f2..e7dc7fd60 100644 --- a/pkg/cloudevents/transport/pubsub/internal/conneciton.go +++ b/pkg/cloudevents/transport/pubsub/internal/conneciton.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log" "sync" "time" @@ -13,7 +12,7 @@ import ( pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" ) -// Transport acts as both a pubsub topic and a pubsub subscription . +// Connection acts as either a pubsub topic or a pubsub subscription . type Connection struct { // AllowCreateTopic controls if the transport can create a topic if it does // not exist. @@ -36,8 +35,20 @@ type Connection struct { sub *pubsub.Subscription subWasCreated bool subOnce sync.Once + + // AckDeadline is Pub/Sub AckDeadline. + // Default is 30 seconds. + AckDeadline *time.Duration + // RetentionDuration is Pub/Sub RetentionDuration. + // Default is 25 hours. + RetentionDuration *time.Duration } +const ( + DefaultAckDeadline = 30 * time.Second + DefaultRetentionDuration = 25 * time.Hour +) + func (c *Connection) getOrCreateTopic(ctx context.Context) (*pubsub.Topic, error) { var err error c.topicOnce.Do(func() { @@ -69,6 +80,7 @@ func (c *Connection) getOrCreateTopic(ctx context.Context) (*pubsub.Topic, error return c.topic, err } +// DeleteTopic func (c *Connection) DeleteTopic(ctx context.Context) error { if c.topicWasCreated { if err := c.topic.Delete(ctx); err != nil { @@ -84,12 +96,6 @@ func (c *Connection) DeleteTopic(ctx context.Context) error { func (c *Connection) getOrCreateSubscription(ctx context.Context) (*pubsub.Subscription, error) { var err error c.subOnce.Do(func() { - // Load the topic. - var topic *pubsub.Topic - topic, err = c.getOrCreateTopic(ctx) - if err != nil { - return - } // Load the subscription. var ok bool sub := c.Client.Subscription(c.SubscriptionID) @@ -103,13 +109,30 @@ func (c *Connection) getOrCreateSubscription(ctx context.Context) (*pubsub.Subsc err = fmt.Errorf("transport not allowed to create subscription %q", c.SubscriptionID) return } + + // Load the topic. + var topic *pubsub.Topic + topic, err = c.getOrCreateTopic(ctx) + if err != nil { + return + } + // Default the ack deadline and retention duration config. + if c.AckDeadline == nil { + ackDeadline := DefaultAckDeadline + c.AckDeadline = &(ackDeadline) + } + if c.RetentionDuration == nil { + retentionDuration := DefaultRetentionDuration + c.RetentionDuration = &retentionDuration + } + // Create a new subscription to the previously created topic // with the given name. // TODO: allow to use push config + allow setting the SubscriptionConfig. sub, err = c.Client.CreateSubscription(ctx, c.SubscriptionID, pubsub.SubscriptionConfig{ Topic: topic, - AckDeadline: 30 * time.Second, - RetentionDuration: 25 * time.Hour, + AckDeadline: *c.AckDeadline, + RetentionDuration: *c.RetentionDuration, }) if err != nil { _ = c.Client.Close() @@ -126,6 +149,7 @@ func (c *Connection) getOrCreateSubscription(ctx context.Context) (*pubsub.Subsc return c.sub, err } +// DeleteSubscription func (c *Connection) DeleteSubscription(ctx context.Context) error { if c.subWasCreated { if err := c.sub.Delete(ctx); err != nil { @@ -138,7 +162,7 @@ func (c *Connection) DeleteSubscription(ctx context.Context) error { return errors.New("subscription was not created by pubsub transport") } -// Send +// Publish func (c *Connection) Publish(ctx context.Context, msg *pubsub.Message) (*cloudevents.Event, error) { topic, err := c.getOrCreateTopic(ctx) if err != nil { @@ -159,7 +183,6 @@ func (c *Connection) Receive(ctx context.Context, fn func(context.Context, *pubs } // Ok, ready to start pulling. return sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { - log.Println("adding internal context.") ctx = pscontext.WithTransportContext(ctx, pscontext.NewTransportContext(c.ProjectID, c.TopicID, c.SubscriptionID, "pull", m)) fn(ctx, m) }) diff --git a/pkg/cloudevents/transport/pubsub/options.go b/pkg/cloudevents/transport/pubsub/options.go index 27301dc38..423169e99 100644 --- a/pkg/cloudevents/transport/pubsub/options.go +++ b/pkg/cloudevents/transport/pubsub/options.go @@ -132,9 +132,15 @@ func WithTopicIDFromDefaultEnv() Option { } // WithSubscriptionID sets the subscription ID for pubsub transport. +// This option can be used multiple times. func WithSubscriptionID(subscriptionID string) Option { return func(t *Transport) error { - t.subscriptionID = subscriptionID + if t.subscriptions == nil { + t.subscriptions = make([]subscriptionWithTopic, 0) + } + t.subscriptions = append(t.subscriptions, subscriptionWithTopic{ + subscriptionID: subscriptionID, + }) return nil } } @@ -142,6 +148,7 @@ func WithSubscriptionID(subscriptionID string) Option { // WithSubscriptionIDFromEnv sets the subscription ID for pubsub transport from // a given environment variable name. func WithSubscriptionIDFromEnv(key string) Option { + // TODO: fix this method. return func(t *Transport) error { v := os.Getenv(key) if v == "" { diff --git a/pkg/cloudevents/transport/pubsub/transport.go b/pkg/cloudevents/transport/pubsub/transport.go index 843cdffcd..485e0a40a 100644 --- a/pkg/cloudevents/transport/pubsub/transport.go +++ b/pkg/cloudevents/transport/pubsub/transport.go @@ -21,6 +21,11 @@ const ( TransportName = "Pub/Sub" ) +type subscriptionWithTopic struct { + topicID string + subscriptionID string +} + // Transport acts as both a pubsub topic and a pubsub subscription . type Transport struct { // Encoding @@ -46,9 +51,14 @@ type Transport struct { projectID string topicID string subscriptionID string - client *pubsub.Client - connections map[string]*internal.Connection + gccMux sync.Mutex + + subscriptions []subscriptionWithTopic + client *pubsub.Client + + connectionsBySubscription map[string]*internal.Connection + connectionsByTopic map[string]*internal.Connection // Receiver Receiver transport.Receiver @@ -75,8 +85,12 @@ func New(ctx context.Context, opts ...Option) (*Transport, error) { t.client = client } - if t.connections == nil { - t.connections = make(map[string]*internal.Connection, 0) + if t.connectionsBySubscription == nil { + t.connectionsBySubscription = make(map[string]*internal.Connection, 0) + } + + if t.connectionsByTopic == nil { + t.connectionsByTopic = make(map[string]*internal.Connection, 0) } return t, nil } @@ -111,14 +125,27 @@ func (t *Transport) loadCodec(ctx context.Context) bool { return true } -func (t *Transport) getConnectionKey(ctx context.Context, topic, subscription string) string { - return fmt.Sprintf("Topic:%s Subscription:%s", topic, subscription) +func (t *Transport) getConnection(ctx context.Context, topic, subscription string) *internal.Connection { + if subscription != "" { + if conn, ok := t.connectionsBySubscription[subscription]; ok { + return conn + } + } + if topic != "" { + if conn, ok := t.connectionsByTopic[topic]; ok { + return conn + } + } + + return nil } func (t *Transport) getOrCreateConnection(ctx context.Context, topic, subscription string) *internal.Connection { + t.gccMux.Lock() + defer t.gccMux.Unlock() + // Get. - key := t.getConnectionKey(ctx, topic, subscription) - if conn, ok := t.connections[key]; ok { + if conn := t.getConnection(ctx, topic, subscription); conn != nil { return conn } // Create. @@ -130,7 +157,14 @@ func (t *Transport) getOrCreateConnection(ctx context.Context, topic, subscripti TopicID: topic, SubscriptionID: subscription, } - t.connections[key] = conn + // Save for later. + if subscription != "" { + t.connectionsBySubscription[subscription] = conn + } + if topic != "" { + t.connectionsByTopic[topic] = conn + } + return conn } @@ -140,7 +174,12 @@ func (t *Transport) Send(ctx context.Context, event cloudevents.Event) (*cloudev return nil, fmt.Errorf("unknown encoding set on transport: %d", t.Encoding) } - conn := t.getOrCreateConnection(ctx, t.topicID, t.subscriptionID) + topic := cecontext.TopicFrom(ctx) + if topic == "" { + topic = t.topicID + } + + conn := t.getOrCreateConnection(ctx, topic, "") msg, err := t.codec.Encode(ctx, event) if err != nil { @@ -182,30 +221,54 @@ func (t *Transport) StartReceiver(ctx context.Context) error { return fmt.Errorf("unknown encoding set on transport: %d", t.Encoding) } - conn := t.getOrCreateConnection(ctx, t.topicID, t.subscriptionID) + //cctx, cancel := context.WithCancel(ctx) + //defer cancel() + // TODO: real way is to setup a inner context with cancel and then call cancel + // when the parent called done down. but for now, hack hack hack. - // Ok, ready to start pulling. - return conn.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { - msg := &Message{ - Attributes: m.Attributes, - Data: m.Data, - } - event, err := t.codec.Decode(ctx, msg) - // If codec returns and error, try with the converter if it is set. - if err != nil && t.HasConverter() { - event, err = t.Converter.Convert(ctx, msg, err) - } - if err != nil { - logger.Errorw("failed to decode message", zap.Error(err)) - m.Nack() - return - } + // TODO: This is demo code. + var err error - if err := t.Receiver.Receive(ctx, *event, nil); err != nil { - logger.Warnw("pubsub receiver return err", zap.Error(err)) - m.Nack() + startSubscriber := func(sub subscriptionWithTopic) { + logger.Infof("starting subscriber for Topic %q, Subscription %q", sub.topicID, sub.subscriptionID) + conn := t.getOrCreateConnection(ctx, sub.topicID, sub.subscriptionID) + + logger.Info("conn is", conn) + if conn == nil { + err = fmt.Errorf("failed to find connection for Topic: %q, Subscription: %q", sub.topicID, sub.subscriptionID) return } - m.Ack() - }) + // Ok, ready to start pulling. + err = conn.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { + logger.Info("got an event!") + msg := &Message{ + Attributes: m.Attributes, + Data: m.Data, + } + event, err := t.codec.Decode(ctx, msg) + // If codec returns and error, try with the converter if it is set. + if err != nil && t.HasConverter() { + event, err = t.Converter.Convert(ctx, msg, err) + } + if err != nil { + logger.Errorw("failed to decode message", zap.Error(err)) + m.Nack() + return + } + + if err := t.Receiver.Receive(ctx, *event, nil); err != nil { + logger.Warnw("pubsub receiver return err", zap.Error(err)) + m.Nack() + return + } + m.Ack() + }) + } + + for _, sub := range t.subscriptions { + go startSubscriber(sub) + } + + <-ctx.Done() + return err } From 7b61368af5741c392a34c1137f6966e0362e92d9 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Mon, 12 Aug 2019 08:17:13 -0700 Subject: [PATCH 3/7] fix prod pubsub, collect errors. Signed-off-by: Scott Nichols --- cmd/samples/pubsub/multisender/main.go | 3 +- pkg/cloudevents/transport/pubsub/transport.go | 117 +++++++++++------- 2 files changed, 74 insertions(+), 46 deletions(-) diff --git a/cmd/samples/pubsub/multisender/main.go b/cmd/samples/pubsub/multisender/main.go index 0f955529e..c96082570 100644 --- a/cmd/samples/pubsub/multisender/main.go +++ b/cmd/samples/pubsub/multisender/main.go @@ -6,10 +6,11 @@ import ( "os" "strings" + "github.com/kelseyhightower/envconfig" + "github.com/cloudevents/sdk-go" cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" - "github.com/kelseyhightower/envconfig" ) type envConfig struct { diff --git a/pkg/cloudevents/transport/pubsub/transport.go b/pkg/cloudevents/transport/pubsub/transport.go index 485e0a40a..9db9b8749 100644 --- a/pkg/cloudevents/transport/pubsub/transport.go +++ b/pkg/cloudevents/transport/pubsub/transport.go @@ -2,12 +2,14 @@ package pubsub import ( "context" + "errors" "fmt" - "sync" - "go.uber.org/zap" + "strings" + "sync" "cloud.google.com/go/pubsub" + "github.com/cloudevents/sdk-go/pkg/cloudevents" cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" @@ -211,64 +213,89 @@ func (t *Transport) HasConverter() bool { return t.Converter != nil } +func (t *Transport) startSubscriber(ctx context.Context, sub subscriptionWithTopic, done func(error)) { + logger := cecontext.LoggerFrom(ctx) + logger.Infof("starting subscriber for Topic %q, Subscription %q", sub.topicID, sub.subscriptionID) + conn := t.getOrCreateConnection(ctx, sub.topicID, sub.subscriptionID) + + logger.Info("conn is", conn) + if conn == nil { + err := fmt.Errorf("failed to find connection for Topic: %q, Subscription: %q", sub.topicID, sub.subscriptionID) + done(err) + return + } + // Ok, ready to start pulling. + err := conn.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { + logger.Info("got an event!") + msg := &Message{ + Attributes: m.Attributes, + Data: m.Data, + } + event, err := t.codec.Decode(ctx, msg) + // If codec returns and error, try with the converter if it is set. + if err != nil && t.HasConverter() { + event, err = t.Converter.Convert(ctx, msg, err) + } + if err != nil { + logger.Errorw("failed to decode message", zap.Error(err)) + m.Nack() + return + } + + if err := t.Receiver.Receive(ctx, *event, nil); err != nil { + logger.Warnw("pubsub receiver return err", zap.Error(err)) + m.Nack() + return + } + m.Ack() + }) + done(err) +} + // StartReceiver implements Transport.StartReceiver // NOTE: This is a blocking call. func (t *Transport) StartReceiver(ctx context.Context) error { - logger := cecontext.LoggerFrom(ctx) - // Load the codec. if ok := t.loadCodec(ctx); !ok { return fmt.Errorf("unknown encoding set on transport: %d", t.Encoding) } - //cctx, cancel := context.WithCancel(ctx) - //defer cancel() - // TODO: real way is to setup a inner context with cancel and then call cancel - // when the parent called done down. but for now, hack hack hack. + cctx, cancel := context.WithCancel(ctx) + n := len(t.subscriptions) - // TODO: This is demo code. - var err error + // Make the channels for quit and errors. + quit := make(chan struct{}, n) + errc := make(chan error, n) - startSubscriber := func(sub subscriptionWithTopic) { - logger.Infof("starting subscriber for Topic %q, Subscription %q", sub.topicID, sub.subscriptionID) - conn := t.getOrCreateConnection(ctx, sub.topicID, sub.subscriptionID) - - logger.Info("conn is", conn) - if conn == nil { - err = fmt.Errorf("failed to find connection for Topic: %q, Subscription: %q", sub.topicID, sub.subscriptionID) - return - } - // Ok, ready to start pulling. - err = conn.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { - logger.Info("got an event!") - msg := &Message{ - Attributes: m.Attributes, - Data: m.Data, - } - event, err := t.codec.Decode(ctx, msg) - // If codec returns and error, try with the converter if it is set. - if err != nil && t.HasConverter() { - event, err = t.Converter.Convert(ctx, msg, err) - } + // Start up each subscription. + for _, sub := range t.subscriptions { + go t.startSubscriber(cctx, sub, func(err error) { if err != nil { - logger.Errorw("failed to decode message", zap.Error(err)) - m.Nack() - return + errc <- err + } else { + quit <- struct{}{} } - - if err := t.Receiver.Receive(ctx, *event, nil); err != nil { - logger.Warnw("pubsub receiver return err", zap.Error(err)) - m.Nack() - return - } - m.Ack() }) } - for _, sub := range t.subscriptions { - go startSubscriber(sub) + // Block for parent context to finish. + <-ctx.Done() + cancel() + + // Collect errors and done calls until we have n of them. + errs := []string(nil) + for success := 0; success < n; success++ { + var err error + select { + case err = <-errc: + case <-quit: + } + if err != nil { + errs = append(errs, err.Error()) + } } + close(quit) + close(errc) - <-ctx.Done() - return err + return errors.New(strings.Join(errs, "\n")) } From 8bc421a7a731f22be342e8d5074853791a5b33dd Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Mon, 12 Aug 2019 08:24:30 -0700 Subject: [PATCH 4/7] use existing pubsub subscription method. Signed-off-by: Scott Nichols --- pkg/cloudevents/transport/pubsub/options.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/cloudevents/transport/pubsub/options.go b/pkg/cloudevents/transport/pubsub/options.go index 423169e99..a2c6ee836 100644 --- a/pkg/cloudevents/transport/pubsub/options.go +++ b/pkg/cloudevents/transport/pubsub/options.go @@ -148,14 +148,14 @@ func WithSubscriptionID(subscriptionID string) Option { // WithSubscriptionIDFromEnv sets the subscription ID for pubsub transport from // a given environment variable name. func WithSubscriptionIDFromEnv(key string) Option { - // TODO: fix this method. return func(t *Transport) error { v := os.Getenv(key) if v == "" { return fmt.Errorf("unable to load subscription id, %q environment variable not set", key) } - t.subscriptionID = v - return nil + + opt := WithSubscriptionID(v) + return opt(t) } } From 31fb9da94156ff1ad6144d2186eb6c187880fb46 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Mon, 12 Aug 2019 08:35:00 -0700 Subject: [PATCH 5/7] I hate go mod. Signed-off-by: Scott Nichols --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index 26ba560e3..0824342d2 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/Azure/go-autorest/autorest/to v0.2.0 // indirect github.com/Azure/go-autorest/autorest/validation v0.1.0 // indirect github.com/fortytw2/leaktest v1.3.0 // indirect - github.com/gogo/protobuf v1.2.0 // indirect github.com/google/go-cmp v0.3.0 github.com/google/uuid v1.1.1 github.com/kelseyhightower/envconfig v1.4.0 From c90039d6bdd9db1543018447c2a79e3c82e64e2d Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Mon, 19 Aug 2019 14:23:20 -0700 Subject: [PATCH 6/7] update based on review. Signed-off-by: Scott Nichols --- .../transport/pubsub/internal/{conneciton.go => connection.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pkg/cloudevents/transport/pubsub/internal/{conneciton.go => connection.go} (100%) diff --git a/pkg/cloudevents/transport/pubsub/internal/conneciton.go b/pkg/cloudevents/transport/pubsub/internal/connection.go similarity index 100% rename from pkg/cloudevents/transport/pubsub/internal/conneciton.go rename to pkg/cloudevents/transport/pubsub/internal/connection.go From 1f98e7d5828c669bbdb1845a3abda23f2397eb37 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Mon, 19 Aug 2019 14:32:59 -0700 Subject: [PATCH 7/7] Review fixup. Signed-off-by: Scott Nichols --- cmd/samples/pubsub/converter/receiver/main.go | 12 +++---- cmd/samples/pubsub/converter/sender/main.go | 16 ++++----- cmd/samples/pubsub/multireceiver/main.go | 34 +++++++++---------- cmd/samples/pubsub/multisender/main.go | 29 ++++++++-------- cmd/samples/pubsub/receiver/main.go | 34 +++++++++---------- cmd/samples/pubsub/sender/main.go | 8 ++--- 6 files changed, 67 insertions(+), 66 deletions(-) diff --git a/cmd/samples/pubsub/converter/receiver/main.go b/cmd/samples/pubsub/converter/receiver/main.go index 8476d8422..b12ed2ea5 100644 --- a/cmd/samples/pubsub/converter/receiver/main.go +++ b/cmd/samples/pubsub/converter/receiver/main.go @@ -15,6 +15,12 @@ import ( "github.com/kelseyhightower/envconfig" ) +/* + +curl -X POST -H "Content-Type: application/json" -d '{"id":123,"message":"hello world"}' http://localhost:8080 + +*/ + type envConfig struct { ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"` TopicID string `envconfig:"PUBSUB_TOPIC" default:"demo_cloudevents" required:"true"` @@ -83,9 +89,3 @@ func main() { log.Printf("will listen on %s/%s\n", env.TopicID, env.SubscriptionID) log.Fatalf("failed to start receiver: %s", c.StartReceiver(ctx, gotEvent)) } - -/* - -curl -X POST -H "Content-Type: application/json" -d '{"id":123,"message":"hello world"}' http://localhost:8080 - -*/ diff --git a/cmd/samples/pubsub/converter/sender/main.go b/cmd/samples/pubsub/converter/sender/main.go index 6c145802f..c69821d10 100644 --- a/cmd/samples/pubsub/converter/sender/main.go +++ b/cmd/samples/pubsub/converter/sender/main.go @@ -10,6 +10,14 @@ import ( "github.com/kelseyhightower/envconfig" ) +/* + +To view: gcloud pubsub subscriptions pull --auto-ack foo + +To post: gcloud pubsub topics publish demo_cloudevents --message '{"id":123,"message":"hi from the terminal"}' + +*/ + type envConfig struct { ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT" required:"true"` @@ -69,11 +77,3 @@ func main() { log.Printf("Could not publish message: %v", err) } } - -/* - -To view: gcloud pubsub subscriptions pull --auto-ack foo - -To post: gcloud pubsub topics publish demo_cloudevents --message '{"id":123,"message":"hi from the terminal"}' - -*/ diff --git a/cmd/samples/pubsub/multireceiver/main.go b/cmd/samples/pubsub/multireceiver/main.go index 77a69f286..d5c878fdd 100644 --- a/cmd/samples/pubsub/multireceiver/main.go +++ b/cmd/samples/pubsub/multireceiver/main.go @@ -14,6 +14,23 @@ import ( "github.com/kelseyhightower/envconfig" ) +/* + +To setup: + +gcloud pubsub topics create demo_cloudevents +gcloud pubsub subscriptions create foo --topic=demo_cloudevents + +To test: + +gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}' + +To fix a bad message: + +gcloud pubsub subscriptions pull --auto-ack foo + +*/ + type envConfig struct { ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"` @@ -70,20 +87,3 @@ func main() { log.Fatalf("failed to start pubsub receiver, %s", err.Error()) } } - -/* - -To setup: - -gcloud pubsub topics create demo_cloudevents -gcloud pubsub subscriptions create foo --topic=demo_cloudevents - -To test: - -gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}' - -To fix a bad message: - -gcloud pubsub subscriptions pull --auto-ack foo - -*/ diff --git a/cmd/samples/pubsub/multisender/main.go b/cmd/samples/pubsub/multisender/main.go index c96082570..315134015 100644 --- a/cmd/samples/pubsub/multisender/main.go +++ b/cmd/samples/pubsub/multisender/main.go @@ -10,9 +10,21 @@ import ( "github.com/cloudevents/sdk-go" cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" - cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" ) +/* + +gcloud pubsub topics create ce1 +gcloud pubsub topics create ce2 +gcloud pubsub topics create ce3 + +gcloud pubsub subscriptions create ce1_sub --topic=ce1 +gcloud pubsub subscriptions create ce2_sub --topic=ce2 +gcloud pubsub subscriptions create ce3_sub --topic=ce3 + +*/ + type envConfig struct { ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT" required:"true"` @@ -32,8 +44,8 @@ func main() { os.Exit(1) } - t, err := cloudeventspubsub.New(context.Background(), - cloudeventspubsub.WithProjectID(env.ProjectID)) + t, err := cepubsub.New(context.Background(), + cepubsub.WithProjectID(env.ProjectID)) if err != nil { log.Printf("failed to create pubsub transport, %s", err.Error()) os.Exit(1) @@ -64,14 +76,3 @@ func main() { os.Exit(0) } - -/* -gcloud pubsub topics create ce1 -gcloud pubsub topics create ce2 -gcloud pubsub topics create ce3 - -gcloud pubsub subscriptions create ce1_sub --topic=ce1 -gcloud pubsub subscriptions create ce2_sub --topic=ce2 -gcloud pubsub subscriptions create ce3_sub --topic=ce3 - -*/ diff --git a/cmd/samples/pubsub/receiver/main.go b/cmd/samples/pubsub/receiver/main.go index 945e089f3..8ef3f326a 100644 --- a/cmd/samples/pubsub/receiver/main.go +++ b/cmd/samples/pubsub/receiver/main.go @@ -13,6 +13,23 @@ import ( "github.com/kelseyhightower/envconfig" ) +/* + +To setup: + +gcloud pubsub topics create demo_cloudevents +gcloud pubsub subscriptions create foo --topic=demo_cloudevents + +To test: + +gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}' + +To fix a bad message: + +gcloud pubsub subscriptions pull --auto-ack foo + +*/ + type envConfig struct { ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"` @@ -68,20 +85,3 @@ func main() { log.Fatalf("failed to start pubsub receiver, %s", err.Error()) } } - -/* - -To setup: - -gcloud pubsub topics create demo_cloudevents -gcloud pubsub subscriptions create foo --topic=demo_cloudevents - -To test: - -gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}' - -To fix a bad message: - -gcloud pubsub subscriptions pull --auto-ack foo - -*/ diff --git a/cmd/samples/pubsub/sender/main.go b/cmd/samples/pubsub/sender/main.go index f6155db31..6fcefd762 100644 --- a/cmd/samples/pubsub/sender/main.go +++ b/cmd/samples/pubsub/sender/main.go @@ -6,7 +6,7 @@ import ( "os" "github.com/cloudevents/sdk-go" - cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" "github.com/kelseyhightower/envconfig" ) @@ -29,9 +29,9 @@ func main() { os.Exit(1) } - t, err := cloudeventspubsub.New(context.Background(), - cloudeventspubsub.WithProjectID(env.ProjectID), - cloudeventspubsub.WithTopicID(env.TopicID)) + t, err := cepubsub.New(context.Background(), + cepubsub.WithProjectID(env.ProjectID), + cepubsub.WithTopicID(env.TopicID)) if err != nil { log.Printf("failed to create pubsub transport, %s", err.Error()) os.Exit(1)