diff --git a/cmd/samples/pubsub/converter/receiver/main.go b/cmd/samples/pubsub/converter/receiver/main.go index 93f50f522..b12ed2ea5 100644 --- a/cmd/samples/pubsub/converter/receiver/main.go +++ b/cmd/samples/pubsub/converter/receiver/main.go @@ -10,10 +10,17 @@ import ( "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" "github.com/google/uuid" "github.com/kelseyhightower/envconfig" ) +/* + +curl -X POST -H "Content-Type: application/json" -d '{"id":123,"message":"hello world"}' http://localhost:8080 + +*/ + type envConfig struct { ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"` TopicID string `envconfig:"PUBSUB_TOPIC" default:"demo_cloudevents" required:"true"` @@ -44,7 +51,7 @@ func convert(ctx context.Context, m transport.Message, err error) (*cloudevents. log.Printf("trying to recover from %v", err) if msg, ok := m.(*pubsub.Message); ok { - tx := pubsub.TransportContextFrom(ctx) + tx := pscontext.TransportContextFrom(ctx) // Make a new event and convert the message payload. event := cloudevents.NewEvent() event.SetSource("github.com/cloudevents/cmd/samples/pubsub/converter/receiver") @@ -82,9 +89,3 @@ func main() { log.Printf("will listen on %s/%s\n", env.TopicID, env.SubscriptionID) log.Fatalf("failed to start receiver: %s", c.StartReceiver(ctx, gotEvent)) } - -/* - -curl -X POST -H "Content-Type: application/json" -d '{"id":123,"message":"hello world"}' http://localhost:8080 - -*/ diff --git a/cmd/samples/pubsub/converter/sender/main.go b/cmd/samples/pubsub/converter/sender/main.go index 6c145802f..c69821d10 100644 --- a/cmd/samples/pubsub/converter/sender/main.go +++ b/cmd/samples/pubsub/converter/sender/main.go @@ -10,6 +10,14 @@ import ( "github.com/kelseyhightower/envconfig" ) +/* + +To view: gcloud pubsub subscriptions pull --auto-ack foo + +To post: gcloud pubsub topics publish demo_cloudevents --message '{"id":123,"message":"hi from the terminal"}' + +*/ + type envConfig struct { ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT" required:"true"` @@ -69,11 +77,3 @@ func main() { log.Printf("Could not publish message: %v", err) } } - -/* - -To view: gcloud pubsub subscriptions pull --auto-ack foo - -To post: gcloud pubsub topics publish demo_cloudevents --message '{"id":123,"message":"hi from the terminal"}' - -*/ diff --git a/cmd/samples/pubsub/multireceiver/main.go b/cmd/samples/pubsub/multireceiver/main.go new file mode 100644 index 000000000..d5c878fdd --- /dev/null +++ b/cmd/samples/pubsub/multireceiver/main.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "strings" + + "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go/pkg/cloudevents/client" + cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" + "github.com/kelseyhightower/envconfig" +) + +/* + +To setup: + +gcloud pubsub topics create demo_cloudevents +gcloud pubsub subscriptions create foo --topic=demo_cloudevents + +To test: + +gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}' + +To fix a bad message: + +gcloud pubsub subscriptions pull --auto-ack foo + +*/ + +type envConfig struct { + ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"` + + SubscriptionIDs string `envconfig:"PUBSUB_SUBSCRIPTIONS" default:"foo" required:"true"` +} + +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { + fmt.Printf("Event Context: %+v\n", event.Context) + + fmt.Printf("Transport Context: %+v\n", pscontext.TransportContextFrom(ctx)) + + data := &Example{} + if err := event.DataAs(data); err != nil { + fmt.Printf("Got Data Error: %s\n", err.Error()) + } + fmt.Printf("Data: %+v\n", data) + + fmt.Printf("----------------------------\n") + return nil +} + +func main() { + ctx := context.Background() + + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Printf("[ERROR] Failed to process env var: %s", err) + os.Exit(1) + } + + var opts []cepubsub.Option + opts = append(opts, cepubsub.WithProjectID(env.ProjectID)) + for _, subscription := range strings.Split(env.SubscriptionIDs, ",") { + opts = append(opts, cepubsub.WithSubscriptionID(subscription)) + } + + t, err := cepubsub.New(context.Background(), opts...) + if err != nil { + log.Fatalf("failed to create pubsub transport, %s", err.Error()) + } + c, err := client.New(t) + if err != nil { + log.Fatalf("failed to create client, %s", err.Error()) + } + + log.Println("Created client, listening...") + + if err := c.StartReceiver(ctx, receive); err != nil { + log.Fatalf("failed to start pubsub receiver, %s", err.Error()) + } +} diff --git a/cmd/samples/pubsub/multisender/main.go b/cmd/samples/pubsub/multisender/main.go new file mode 100644 index 000000000..315134015 --- /dev/null +++ b/cmd/samples/pubsub/multisender/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "log" + "os" + "strings" + + "github.com/kelseyhightower/envconfig" + + "github.com/cloudevents/sdk-go" + cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" + cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" +) + +/* + +gcloud pubsub topics create ce1 +gcloud pubsub topics create ce2 +gcloud pubsub topics create ce3 + +gcloud pubsub subscriptions create ce1_sub --topic=ce1 +gcloud pubsub subscriptions create ce2_sub --topic=ce2 +gcloud pubsub subscriptions create ce3_sub --topic=ce3 + +*/ + +type envConfig struct { + ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT" required:"true"` + + TopicIDs string `envconfig:"PUBSUB_TOPICS" default:"demo_cloudevents" required:"true"` +} + +// Basic data struct. +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func main() { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Printf("[ERROR] Failed to process env var: %s", err) + os.Exit(1) + } + + t, err := cepubsub.New(context.Background(), + cepubsub.WithProjectID(env.ProjectID)) + if err != nil { + log.Printf("failed to create pubsub transport, %s", err.Error()) + os.Exit(1) + } + c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + log.Printf("failed to create client, %s", err.Error()) + os.Exit(1) + } + + for i, topic := range strings.Split(env.TopicIDs, ",") { + ctx := cecontext.WithTopic(context.Background(), topic) + event := cloudevents.NewEvent(cloudevents.VersionV03) + event.SetType("com.cloudevents.sample.sent") + event.SetSource("github.com/cloudevents/sdk-go/cmd/samples/pubsub/multisender/") + _ = event.SetData(&Example{ + Sequence: i, + Message: "HELLO " + topic, + }) + + _, err = c.Send(ctx, event) + + if err != nil { + log.Printf("failed to send: %v", err) + os.Exit(1) + } + } + + os.Exit(0) +} diff --git a/cmd/samples/pubsub/receiver/main.go b/cmd/samples/pubsub/receiver/main.go index 33f4836bb..8ef3f326a 100644 --- a/cmd/samples/pubsub/receiver/main.go +++ b/cmd/samples/pubsub/receiver/main.go @@ -8,10 +8,28 @@ import ( "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/cloudevents/client" - cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" "github.com/kelseyhightower/envconfig" ) +/* + +To setup: + +gcloud pubsub topics create demo_cloudevents +gcloud pubsub subscriptions create foo --topic=demo_cloudevents + +To test: + +gcloud pubsub topics publish demo_cloudevents --message='{"Hello": "world"}' + +To fix a bad message: + +gcloud pubsub subscriptions pull --auto-ack foo + +*/ + type envConfig struct { ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT"` @@ -28,7 +46,7 @@ type Example struct { func receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { fmt.Printf("Event Context: %+v\n", event.Context) - fmt.Printf("Transport Context: %+v\n", cloudeventspubsub.TransportContextFrom(ctx)) + fmt.Printf("Transport Context: %+v\n", pscontext.TransportContextFrom(ctx)) data := &Example{} if err := event.DataAs(data); err != nil { @@ -49,10 +67,10 @@ func main() { os.Exit(1) } - t, err := cloudeventspubsub.New(context.Background(), - cloudeventspubsub.WithProjectID(env.ProjectID), - cloudeventspubsub.WithTopicID(env.TopicID), - cloudeventspubsub.WithSubscriptionID(env.SubscriptionID)) + t, err := cepubsub.New(context.Background(), + cepubsub.WithProjectID(env.ProjectID), + cepubsub.WithTopicID(env.TopicID), + cepubsub.WithSubscriptionID(env.SubscriptionID)) if err != nil { log.Fatalf("failed to create pubsub transport, %s", err.Error()) } @@ -64,6 +82,6 @@ func main() { log.Println("Created client, listening...") if err := c.StartReceiver(ctx, receive); err != nil { - log.Fatalf("failed to start nats receiver, %s", err.Error()) + log.Fatalf("failed to start pubsub receiver, %s", err.Error()) } } diff --git a/cmd/samples/pubsub/sender/main.go b/cmd/samples/pubsub/sender/main.go index f6155db31..6fcefd762 100644 --- a/cmd/samples/pubsub/sender/main.go +++ b/cmd/samples/pubsub/sender/main.go @@ -6,7 +6,7 @@ import ( "os" "github.com/cloudevents/sdk-go" - cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" "github.com/kelseyhightower/envconfig" ) @@ -29,9 +29,9 @@ func main() { os.Exit(1) } - t, err := cloudeventspubsub.New(context.Background(), - cloudeventspubsub.WithProjectID(env.ProjectID), - cloudeventspubsub.WithTopicID(env.TopicID)) + t, err := cepubsub.New(context.Background(), + cepubsub.WithProjectID(env.ProjectID), + cepubsub.WithTopicID(env.TopicID)) if err != nil { log.Printf("failed to create pubsub transport, %s", err.Error()) os.Exit(1) diff --git a/pkg/cloudevents/context/context.go b/pkg/cloudevents/context/context.go index 3a338ab5d..e580360f1 100644 --- a/pkg/cloudevents/context/context.go +++ b/pkg/cloudevents/context/context.go @@ -30,6 +30,28 @@ func TargetFrom(ctx context.Context) *url.URL { return nil } +// Opaque key type used to store topic +type topicKeyType struct{} + +var topicKey = topicKeyType{} + +// WithTopic returns back a new context with the given topic. Topic is intended to be transport dependent. +// For pubsub transport, `topic` should be a Pub/Sub Topic ID. +func WithTopic(ctx context.Context, topic string) context.Context { + return context.WithValue(ctx, topicKey, topic) +} + +// TopicFrom looks in the given context and returns `topic` as a string if found and valid, otherwise "". +func TopicFrom(ctx context.Context) string { + c := ctx.Value(topicKey) + if c != nil { + if s, ok := c.(string); ok { + return s + } + } + return "" +} + // Opaque key type used to store encoding type encodingKeyType struct{} diff --git a/pkg/cloudevents/transport/pubsub/context.go b/pkg/cloudevents/transport/pubsub/context/context.go similarity index 99% rename from pkg/cloudevents/transport/pubsub/context.go rename to pkg/cloudevents/transport/pubsub/context/context.go index 57ec916ab..e64a5ee71 100644 --- a/pkg/cloudevents/transport/pubsub/context.go +++ b/pkg/cloudevents/transport/pubsub/context/context.go @@ -1,4 +1,4 @@ -package pubsub +package context import ( "context" diff --git a/pkg/cloudevents/transport/pubsub/context/context_test.go b/pkg/cloudevents/transport/pubsub/context/context_test.go new file mode 100644 index 000000000..960b2b7c5 --- /dev/null +++ b/pkg/cloudevents/transport/pubsub/context/context_test.go @@ -0,0 +1 @@ +package context_test diff --git a/pkg/cloudevents/transport/pubsub/context_test.go b/pkg/cloudevents/transport/pubsub/context_test.go deleted file mode 100644 index fcd9a357a..000000000 --- a/pkg/cloudevents/transport/pubsub/context_test.go +++ /dev/null @@ -1 +0,0 @@ -package pubsub_test diff --git a/pkg/cloudevents/transport/pubsub/internal/connection.go b/pkg/cloudevents/transport/pubsub/internal/connection.go new file mode 100644 index 000000000..e7dc7fd60 --- /dev/null +++ b/pkg/cloudevents/transport/pubsub/internal/connection.go @@ -0,0 +1,189 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "cloud.google.com/go/pubsub" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + pscontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" +) + +// Connection acts as either a pubsub topic or a pubsub subscription . +type Connection struct { + // AllowCreateTopic controls if the transport can create a topic if it does + // not exist. + AllowCreateTopic bool + + // AllowCreateSubscription controls if the transport can create a + // subscription if it does not exist. + AllowCreateSubscription bool + + ProjectID string + + Client *pubsub.Client + + TopicID string + topic *pubsub.Topic + topicWasCreated bool + topicOnce sync.Once + + SubscriptionID string + sub *pubsub.Subscription + subWasCreated bool + subOnce sync.Once + + // AckDeadline is Pub/Sub AckDeadline. + // Default is 30 seconds. + AckDeadline *time.Duration + // RetentionDuration is Pub/Sub RetentionDuration. + // Default is 25 hours. + RetentionDuration *time.Duration +} + +const ( + DefaultAckDeadline = 30 * time.Second + DefaultRetentionDuration = 25 * time.Hour +) + +func (c *Connection) getOrCreateTopic(ctx context.Context) (*pubsub.Topic, error) { + var err error + c.topicOnce.Do(func() { + var ok bool + // Load the topic. + topic := c.Client.Topic(c.TopicID) + ok, err = topic.Exists(ctx) + if err != nil { + return + } + // If the topic does not exist, create a new topic with the given name. + if !ok { + if !c.AllowCreateTopic { + err = fmt.Errorf("transport not allowed to create topic %q", c.TopicID) + return + } + topic, err = c.Client.CreateTopic(ctx, c.TopicID) + if err != nil { + return + } + c.topicWasCreated = true + } + // Success. + c.topic = topic + }) + if c.topic == nil { + return nil, fmt.Errorf("unable to create topic %q, %v", c.TopicID, err) + } + return c.topic, err +} + +// DeleteTopic +func (c *Connection) DeleteTopic(ctx context.Context) error { + if c.topicWasCreated { + if err := c.topic.Delete(ctx); err != nil { + return err + } + c.topic = nil + c.topicWasCreated = false + c.topicOnce = sync.Once{} + } + return errors.New("topic was not created by pubsub transport") +} + +func (c *Connection) getOrCreateSubscription(ctx context.Context) (*pubsub.Subscription, error) { + var err error + c.subOnce.Do(func() { + // Load the subscription. + var ok bool + sub := c.Client.Subscription(c.SubscriptionID) + ok, err = sub.Exists(ctx) + if err != nil { + return + } + // If subscription doesn't exist, create it. + if !ok { + if !c.AllowCreateSubscription { + err = fmt.Errorf("transport not allowed to create subscription %q", c.SubscriptionID) + return + } + + // Load the topic. + var topic *pubsub.Topic + topic, err = c.getOrCreateTopic(ctx) + if err != nil { + return + } + // Default the ack deadline and retention duration config. + if c.AckDeadline == nil { + ackDeadline := DefaultAckDeadline + c.AckDeadline = &(ackDeadline) + } + if c.RetentionDuration == nil { + retentionDuration := DefaultRetentionDuration + c.RetentionDuration = &retentionDuration + } + + // Create a new subscription to the previously created topic + // with the given name. + // TODO: allow to use push config + allow setting the SubscriptionConfig. + sub, err = c.Client.CreateSubscription(ctx, c.SubscriptionID, pubsub.SubscriptionConfig{ + Topic: topic, + AckDeadline: *c.AckDeadline, + RetentionDuration: *c.RetentionDuration, + }) + if err != nil { + _ = c.Client.Close() + return + } + c.subWasCreated = true + } + // Success. + c.sub = sub + }) + if c.sub == nil { + return nil, fmt.Errorf("unable to create sunscription %q, %v", c.SubscriptionID, err) + } + return c.sub, err +} + +// DeleteSubscription +func (c *Connection) DeleteSubscription(ctx context.Context) error { + if c.subWasCreated { + if err := c.sub.Delete(ctx); err != nil { + return err + } + c.sub = nil + c.subWasCreated = false + c.subOnce = sync.Once{} + } + return errors.New("subscription was not created by pubsub transport") +} + +// Publish +func (c *Connection) Publish(ctx context.Context, msg *pubsub.Message) (*cloudevents.Event, error) { + topic, err := c.getOrCreateTopic(ctx) + if err != nil { + return nil, err + } + + r := topic.Publish(ctx, msg) + _, err = r.Get(ctx) + return nil, err +} + +// Start +// NOTE: This is a blocking call. +func (c *Connection) Receive(ctx context.Context, fn func(context.Context, *pubsub.Message)) error { + sub, err := c.getOrCreateSubscription(ctx) + if err != nil { + return err + } + // Ok, ready to start pulling. + return sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { + ctx = pscontext.WithTransportContext(ctx, pscontext.NewTransportContext(c.ProjectID, c.TopicID, c.SubscriptionID, "pull", m)) + fn(ctx, m) + }) +} diff --git a/pkg/cloudevents/transport/pubsub/options.go b/pkg/cloudevents/transport/pubsub/options.go index 27301dc38..a2c6ee836 100644 --- a/pkg/cloudevents/transport/pubsub/options.go +++ b/pkg/cloudevents/transport/pubsub/options.go @@ -132,9 +132,15 @@ func WithTopicIDFromDefaultEnv() Option { } // WithSubscriptionID sets the subscription ID for pubsub transport. +// This option can be used multiple times. func WithSubscriptionID(subscriptionID string) Option { return func(t *Transport) error { - t.subscriptionID = subscriptionID + if t.subscriptions == nil { + t.subscriptions = make([]subscriptionWithTopic, 0) + } + t.subscriptions = append(t.subscriptions, subscriptionWithTopic{ + subscriptionID: subscriptionID, + }) return nil } } @@ -147,8 +153,9 @@ func WithSubscriptionIDFromEnv(key string) Option { if v == "" { return fmt.Errorf("unable to load subscription id, %q environment variable not set", key) } - t.subscriptionID = v - return nil + + opt := WithSubscriptionID(v) + return opt(t) } } diff --git a/pkg/cloudevents/transport/pubsub/transport.go b/pkg/cloudevents/transport/pubsub/transport.go index f249aa4e3..9db9b8749 100644 --- a/pkg/cloudevents/transport/pubsub/transport.go +++ b/pkg/cloudevents/transport/pubsub/transport.go @@ -4,15 +4,16 @@ import ( "context" "errors" "fmt" - "sync" - "time" - "go.uber.org/zap" + "strings" + "sync" "cloud.google.com/go/pubsub" + "github.com/cloudevents/sdk-go/pkg/cloudevents" cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/internal" ) // Transport adheres to transport.Transport. @@ -22,6 +23,11 @@ const ( TransportName = "Pub/Sub" ) +type subscriptionWithTopic struct { + topicID string + subscriptionID string +} + // Transport acts as both a pubsub topic and a pubsub subscription . type Transport struct { // Encoding @@ -44,19 +50,17 @@ type Transport struct { // subscription if it does not exist. AllowCreateSubscription bool - projectID string + projectID string + topicID string + subscriptionID string - client *pubsub.Client + gccMux sync.Mutex - topicID string - topic *pubsub.Topic - topicWasCreated bool - topicOnce sync.Once + subscriptions []subscriptionWithTopic + client *pubsub.Client - subscriptionID string - sub *pubsub.Subscription - subWasCreated bool - subOnce sync.Once + connectionsBySubscription map[string]*internal.Connection + connectionsByTopic map[string]*internal.Connection // Receiver Receiver transport.Receiver @@ -82,6 +86,14 @@ func New(ctx context.Context, opts ...Option) (*Transport, error) { // Success. t.client = client } + + if t.connectionsBySubscription == nil { + t.connectionsBySubscription = make(map[string]*internal.Connection, 0) + } + + if t.connectionsByTopic == nil { + t.connectionsByTopic = make(map[string]*internal.Connection, 0) + } return t, nil } @@ -115,106 +127,47 @@ func (t *Transport) loadCodec(ctx context.Context) bool { return true } -func (t *Transport) getOrCreateTopic(ctx context.Context) (*pubsub.Topic, error) { - var err error - t.topicOnce.Do(func() { - var ok bool - // Load the topic. - topic := t.client.Topic(t.topicID) - ok, err = topic.Exists(ctx) - if err != nil { - _ = t.client.Close() - return +func (t *Transport) getConnection(ctx context.Context, topic, subscription string) *internal.Connection { + if subscription != "" { + if conn, ok := t.connectionsBySubscription[subscription]; ok { + return conn } - // If the topic does not exist, create a new topic with the given name. - if !ok { - if !t.AllowCreateTopic { - err = fmt.Errorf("transport not allowed to create topic %q", t.topicID) - return - } - topic, err = t.client.CreateTopic(ctx, t.topicID) - if err != nil { - return - } - t.topicWasCreated = true - } - // Success. - t.topic = topic - }) - if t.topic == nil { - return nil, fmt.Errorf("unable to create topic %q", t.topicID) } - return t.topic, err -} - -func (t *Transport) DeleteTopic(ctx context.Context) error { - if t.topicWasCreated { - if err := t.topic.Delete(ctx); err != nil { - return err + if topic != "" { + if conn, ok := t.connectionsByTopic[topic]; ok { + return conn } - t.topic = nil - t.topicWasCreated = false - t.topicOnce = sync.Once{} } - return errors.New("topic was not created by pubsub transport") -} -func (t *Transport) getOrCreateSubscription(ctx context.Context) (*pubsub.Subscription, error) { - var err error - t.subOnce.Do(func() { - // Load the topic. - var topic *pubsub.Topic - topic, err = t.getOrCreateTopic(ctx) - if err != nil { - return - } - // Load the subscription. - var ok bool - sub := t.client.Subscription(t.subscriptionID) - ok, err = sub.Exists(ctx) - if err != nil { - _ = t.client.Close() - return - } - // If subscription doesn't exist, create it. - if !ok { - if !t.AllowCreateSubscription { - err = fmt.Errorf("transport not allowed to create subscription %q", t.subscriptionID) - return - } - // Create a new subscription to the previously created topic - // with the given name. - // TODO: allow to use push config + allow setting the SubscriptionConfig. - sub, err = t.client.CreateSubscription(ctx, t.subscriptionID, pubsub.SubscriptionConfig{ - Topic: topic, - AckDeadline: 30 * time.Second, - RetentionDuration: 25 * time.Hour, - }) - if err != nil { - _ = t.client.Close() - return - } - t.subWasCreated = true - } - // Success. - t.sub = sub - }) - if t.sub == nil { - return nil, fmt.Errorf("unable to create sunscription %q", t.subscriptionID) - } - return t.sub, err + return nil } -func (t *Transport) DeleteSubscription(ctx context.Context) error { - if t.subWasCreated { - if err := t.sub.Delete(ctx); err != nil { - return err - } - t.sub = nil - t.subWasCreated = false - t.subOnce = sync.Once{} +func (t *Transport) getOrCreateConnection(ctx context.Context, topic, subscription string) *internal.Connection { + t.gccMux.Lock() + defer t.gccMux.Unlock() + + // Get. + if conn := t.getConnection(ctx, topic, subscription); conn != nil { + return conn + } + // Create. + conn := &internal.Connection{ + AllowCreateSubscription: t.AllowCreateSubscription, + AllowCreateTopic: t.AllowCreateTopic, + Client: t.client, + ProjectID: t.projectID, + TopicID: topic, + SubscriptionID: subscription, + } + // Save for later. + if subscription != "" { + t.connectionsBySubscription[subscription] = conn } - return errors.New("subscription was not created by pubsub transport") + if topic != "" { + t.connectionsByTopic[topic] = conn + } + + return conn } // Send implements Transport.Send @@ -223,28 +176,23 @@ func (t *Transport) Send(ctx context.Context, event cloudevents.Event) (*cloudev return nil, fmt.Errorf("unknown encoding set on transport: %d", t.Encoding) } - msg, err := t.codec.Encode(ctx, event) - if err != nil { - return nil, err + topic := cecontext.TopicFrom(ctx) + if topic == "" { + topic = t.topicID } - topic, err := t.getOrCreateTopic(ctx) + conn := t.getOrCreateConnection(ctx, topic, "") + + msg, err := t.codec.Encode(ctx, event) if err != nil { return nil, err } if m, ok := msg.(*Message); ok { - - r := topic.Publish(ctx, &pubsub.Message{ + return conn.Publish(ctx, &pubsub.Message{ Attributes: m.Attributes, Data: m.Data, }) - - _, err := r.Get(ctx) - if err != nil { - return nil, err - } - return nil, nil } return nil, fmt.Errorf("failed to encode Event into a Message") @@ -265,24 +213,20 @@ func (t *Transport) HasConverter() bool { return t.Converter != nil } -// StartReceiver implements Transport.StartReceiver -// NOTE: This is a blocking call. -func (t *Transport) StartReceiver(ctx context.Context) error { +func (t *Transport) startSubscriber(ctx context.Context, sub subscriptionWithTopic, done func(error)) { logger := cecontext.LoggerFrom(ctx) - - // Load the codec. - if ok := t.loadCodec(ctx); !ok { - return fmt.Errorf("unknown encoding set on transport: %d", t.Encoding) - } - - sub, err := t.getOrCreateSubscription(ctx) - if err != nil { - return err + logger.Infof("starting subscriber for Topic %q, Subscription %q", sub.topicID, sub.subscriptionID) + conn := t.getOrCreateConnection(ctx, sub.topicID, sub.subscriptionID) + + logger.Info("conn is", conn) + if conn == nil { + err := fmt.Errorf("failed to find connection for Topic: %q, Subscription: %q", sub.topicID, sub.subscriptionID) + done(err) + return } // Ok, ready to start pulling. - err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { - ctx = WithTransportContext(ctx, NewTransportContext(t.projectID, t.topicID, t.subscriptionID, "pull", m)) - + err := conn.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { + logger.Info("got an event!") msg := &Message{ Attributes: m.Attributes, Data: m.Data, @@ -305,6 +249,53 @@ func (t *Transport) StartReceiver(ctx context.Context) error { } m.Ack() }) + done(err) +} + +// StartReceiver implements Transport.StartReceiver +// NOTE: This is a blocking call. +func (t *Transport) StartReceiver(ctx context.Context) error { + // Load the codec. + if ok := t.loadCodec(ctx); !ok { + return fmt.Errorf("unknown encoding set on transport: %d", t.Encoding) + } + + cctx, cancel := context.WithCancel(ctx) + n := len(t.subscriptions) + + // Make the channels for quit and errors. + quit := make(chan struct{}, n) + errc := make(chan error, n) + + // Start up each subscription. + for _, sub := range t.subscriptions { + go t.startSubscriber(cctx, sub, func(err error) { + if err != nil { + errc <- err + } else { + quit <- struct{}{} + } + }) + } + + // Block for parent context to finish. + <-ctx.Done() + cancel() + + // Collect errors and done calls until we have n of them. + errs := []string(nil) + for success := 0; success < n; success++ { + var err error + select { + case err = <-errc: + case <-quit: + } + if err != nil { + errs = append(errs, err.Error()) + } + } + close(quit) + close(errc) - return err + return errors.New(strings.Join(errs, "\n")) }