Skip to content

Commit

Permalink
Pubsub Topic and Subscription object creation can be retried (#473)
Browse files Browse the repository at this point in the history
* Fix pubsub Topic and Subscription so that if they hit a failure during
initialization, a later call will succeed.

Also fix some of the teardown logic to be fully thread safe (in the
clase of DeleteTopic, followed by Publish() and the same pattern in
Subscription)

Signed-off-by: Eric Lemar <elemar@google.com>

* Add failure tests for the connection

Signed-off-by: Eric Lemar <elemar@google.com>

* Fix missing cancel

Signed-off-by: Eric Lemar <elemar@google.com>

* static analysis fixes

Signed-off-by: Eric Lemar <elemar@google.com>

* fix misspelled comment

Signed-off-by: Eric Lemar <elemar@google.com>
  • Loading branch information
ericlem authored Apr 28, 2020
1 parent c30f66a commit bec707e
Show file tree
Hide file tree
Showing 4 changed files with 1,430 additions and 160 deletions.
224 changes: 162 additions & 62 deletions v1/cloudevents/transport/pubsub/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ import (
pscontext "github.com/cloudevents/sdk-go/v1/cloudevents/transport/pubsub/context"
)

type topicInfo struct {
topic *pubsub.Topic
wasCreated bool
once sync.Once
err error
}

type subInfo struct {
sub *pubsub.Subscription
wasCreated bool
once sync.Once
err error
}

// Connection acts as either a pubsub topic or a pubsub subscription .
type Connection struct {
// AllowCreateTopic controls if the transport can create a topic if it does
Expand All @@ -26,24 +40,29 @@ type Connection struct {

Client *pubsub.Client

TopicID string
topic *pubsub.Topic
topicWasCreated bool
topicOnce sync.Once
TopicID string
topicInfo *topicInfo

SubscriptionID string
sub *pubsub.Subscription
subWasCreated bool
subOnce sync.Once
subInfo *subInfo

// Held when reading or writing topicInfo and subInfo. This is only
// held while reading the pointer, the structure internally manage
// their own internal concurrency. This also controls
// the update of AckDeadline and RetentionDuration if those are
// nil on start.
initLock sync.Mutex

// ReceiveSettings is used to configure Pubsub pull subscription.
ReceiveSettings *pubsub.ReceiveSettings

// AckDeadline is Pub/Sub AckDeadline.
// Default is 30 seconds.
// This can only be set prior to first call of any function.
AckDeadline *time.Duration
// RetentionDuration is Pub/Sub RetentionDuration.
// Default is 25 hours.
// This can only be set prior to first call of any function.
RetentionDuration *time.Duration
}

Expand All @@ -63,129 +82,210 @@ var DefaultReceiveSettings = pubsub.ReceiveSettings{
Synchronous: false,
}

func (c *Connection) getOrCreateTopic(ctx context.Context) (*pubsub.Topic, error) {
var err error
c.topicOnce.Do(func() {
func (c *Connection) getOrCreateTopicInfo(ctx context.Context, getAlreadyOpenOnly bool) (*topicInfo, error) {
// See if a topic has already been created or is in the process of being created.
// If not, start creating one.
c.initLock.Lock()
ti := c.topicInfo
if ti == nil && !getAlreadyOpenOnly {
c.topicInfo = &topicInfo{}
ti = c.topicInfo
}
c.initLock.Unlock()
if ti == nil {
return nil, fmt.Errorf("no already open topic")
}

// Make sure the topic structure is initialized at most once.
ti.once.Do(func() {
var ok bool
// Load the topic.
topic := c.Client.Topic(c.TopicID)
ok, err = topic.Exists(ctx)
if err != nil {
ok, ti.err = topic.Exists(ctx)
if ti.err != nil {
return
}
// If the topic does not exist, create a new topic with the given name.
if !ok {
if !c.AllowCreateTopic {
err = fmt.Errorf("transport not allowed to create topic %q", c.TopicID)
ti.err = fmt.Errorf("transport not allowed to create topic %q", c.TopicID)
return
}
topic, err = c.Client.CreateTopic(ctx, c.TopicID)
if err != nil {
topic, ti.err = c.Client.CreateTopic(ctx, c.TopicID)
if ti.err != nil {
return
}
c.topicWasCreated = true
ti.wasCreated = true
}
// Success.
c.topic = topic
ti.topic = topic
})
if c.topic == nil {
return nil, fmt.Errorf("unable to create topic %q, %v", c.TopicID, err)
if ti.topic == nil {
// Initialization failed, remove this attempt so that future callers
// will try to initialize again.
c.initLock.Lock()
if c.topicInfo == ti {
c.topicInfo = nil
}
c.initLock.Unlock()

return nil, fmt.Errorf("unable to get or create topic %q, %v", c.TopicID, ti.err)
}
return ti, nil
}

func (c *Connection) getOrCreateTopic(ctx context.Context, getAlreadyOpenOnly bool) (*pubsub.Topic, error) {
ti, err := c.getOrCreateTopicInfo(ctx, getAlreadyOpenOnly)
if ti != nil {
return ti.topic, nil
} else {
return nil, err
}
return c.topic, err
}

// DeleteTopic
func (c *Connection) DeleteTopic(ctx context.Context) error {
if !c.topicWasCreated {
ti, err := c.getOrCreateTopicInfo(ctx, true)

if err != nil {
return errors.New("topic not open")
}
if !ti.wasCreated {
return errors.New("topic was not created by pubsub transport")
}
if err := c.topic.Delete(ctx); err != nil {
if err := ti.topic.Delete(ctx); err != nil {
return err
}
c.topic = nil
c.topicWasCreated = false
c.topicOnce = sync.Once{}

ti.topic.Stop()

c.initLock.Lock()
if ti == c.topicInfo {
c.topicInfo = nil
}
c.initLock.Unlock()

return nil
}

func (c *Connection) getOrCreateSubscription(ctx context.Context) (*pubsub.Subscription, error) {
var err error
c.subOnce.Do(func() {
func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlreadyOpenOnly bool) (*subInfo, error) {
c.initLock.Lock()
// Default the ack deadline and retention duration config.
// We only do this once.
if c.AckDeadline == nil {
ackDeadline := DefaultAckDeadline
c.AckDeadline = &(ackDeadline)
}
if c.RetentionDuration == nil {
retentionDuration := DefaultRetentionDuration
c.RetentionDuration = &retentionDuration
}
// See if a subscription has already been created or is in the process of being created.
// If not, start creating one.
si := c.subInfo
if si == nil && !getAlreadyOpenOnly {
c.subInfo = &subInfo{}
si = c.subInfo
}
c.initLock.Unlock()
if si == nil {
return nil, fmt.Errorf("no already open subscription")
}

// Make sure the subscription structure is initialized at most once.
si.once.Do(func() {
// Load the subscription.
var ok bool
sub := c.Client.Subscription(c.SubscriptionID)
ok, err = sub.Exists(ctx)
if err != nil {
ok, si.err = sub.Exists(ctx)
if si.err != nil {
return
}
// If subscription doesn't exist, create it.
if !ok {
if !c.AllowCreateSubscription {
err = fmt.Errorf("transport not allowed to create subscription %q", c.SubscriptionID)
si.err = fmt.Errorf("transport not allowed to create subscription %q", c.SubscriptionID)
return
}

// Load the topic.
var topic *pubsub.Topic
topic, err = c.getOrCreateTopic(ctx)
if err != nil {
topic, si.err = c.getOrCreateTopic(ctx, false)
if si.err != nil {
return
}
// Default the ack deadline and retention duration config.
if c.AckDeadline == nil {
ackDeadline := DefaultAckDeadline
c.AckDeadline = &(ackDeadline)
}
if c.RetentionDuration == nil {
retentionDuration := DefaultRetentionDuration
c.RetentionDuration = &retentionDuration
}

// Create a new subscription to the previously created topic
// with the given name.
// TODO: allow to use push config + allow setting the SubscriptionConfig.
sub, err = c.Client.CreateSubscription(ctx, c.SubscriptionID, pubsub.SubscriptionConfig{
sub, si.err = c.Client.CreateSubscription(ctx, c.SubscriptionID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: *c.AckDeadline,
RetentionDuration: *c.RetentionDuration,
})
if err != nil {
_ = c.Client.Close()
if si.err != nil {
return
}
if c.ReceiveSettings == nil {
sub.ReceiveSettings = DefaultReceiveSettings
} else {
sub.ReceiveSettings = *c.ReceiveSettings
}
c.subWasCreated = true

si.wasCreated = true
}
if c.ReceiveSettings == nil {
sub.ReceiveSettings = DefaultReceiveSettings
} else {
sub.ReceiveSettings = *c.ReceiveSettings
}
// Success.
c.sub = sub
si.sub = sub
})
if c.sub == nil {
return nil, fmt.Errorf("unable to create sunscription %q, %v", c.SubscriptionID, err)
if si.sub == nil {
// Initialization failed, remove this attempt so that future callers
// will try to initialize again.
c.initLock.Lock()
if c.subInfo == si {
c.subInfo = nil
}
c.initLock.Unlock()
return nil, fmt.Errorf("unable to create subscription %q, %v", c.SubscriptionID, si.err)
}
return si, nil
}

func (c *Connection) getOrCreateSubscription(ctx context.Context, getAlreadyOpenOnly bool) (*pubsub.Subscription, error) {
si, err := c.getOrCreateSubscriptionInfo(ctx, getAlreadyOpenOnly)
if si != nil {
return si.sub, nil
} else {
return nil, err
}
return c.sub, err
}

// DeleteSubscription
func (c *Connection) DeleteSubscription(ctx context.Context) error {
if !c.subWasCreated {
si, err := c.getOrCreateSubscriptionInfo(ctx, true)

if err != nil {
return errors.New("subscription not open")
}

if !si.wasCreated {
return errors.New("subscription was not created by pubsub transport")
}
if err := c.sub.Delete(ctx); err != nil {
if err := si.sub.Delete(ctx); err != nil {
return err
}
c.sub = nil
c.subWasCreated = false
c.subOnce = sync.Once{}

c.initLock.Lock()
if si == c.subInfo {
c.subInfo = nil
}
c.initLock.Unlock()

return nil
}

// Publish
func (c *Connection) Publish(ctx context.Context, msg *pubsub.Message) (*cloudevents.Event, error) {
topic, err := c.getOrCreateTopic(ctx)
topic, err := c.getOrCreateTopic(ctx, false)
if err != nil {
return nil, err
}
Expand All @@ -198,7 +298,7 @@ func (c *Connection) Publish(ctx context.Context, msg *pubsub.Message) (*cloudev
// Start
// NOTE: This is a blocking call.
func (c *Connection) Receive(ctx context.Context, fn func(context.Context, *pubsub.Message)) error {
sub, err := c.getOrCreateSubscription(ctx)
sub, err := c.getOrCreateSubscription(ctx, false)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit bec707e

Please sign in to comment.