Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): create/update export subscriptions #6885

Merged
merged 10 commits into from
Dec 10, 2022
75 changes: 64 additions & 11 deletions pubsublite/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
fmpb "google.golang.org/genproto/protobuf/field_mask"
)

var (
Expand Down Expand Up @@ -155,35 +156,50 @@ func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator
}

type createSubscriptionSettings struct {
backlogLocation BacklogLocation
target SeekTarget
}

// CreateSubscriptionOption is an option for AdminClient.CreateSubscription.
type CreateSubscriptionOption interface {
apply(*createSubscriptionSettings)
}

type startingOffset struct {
backlogLocation BacklogLocation
type targetLocation struct {
target SeekTarget
}

func (so startingOffset) apply(settings *createSubscriptionSettings) {
settings.backlogLocation = so.backlogLocation
func (tl targetLocation) apply(settings *createSubscriptionSettings) {
settings.target = tl.target
}

// StartingOffset specifies the offset at which a newly created subscription
// will start receiving messages.
//
// Deprecated. This is equivalent to calling AtTargetLocation with a
// BacklogLocation and will be removed in the next major version.
func StartingOffset(location BacklogLocation) CreateSubscriptionOption {
return startingOffset{location}
return targetLocation{location}
}

// AtTargetLocation specifies the target location within the message backlog
// that a new subscription should be initialized to.
//
// An additional seek request is initiated if the target location is a publish
// or event time. If the seek fails, the created subscription is not deleted.
func AtTargetLocation(target SeekTarget) CreateSubscriptionOption {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
return targetLocation{target}
}

// CreateSubscription creates a new subscription from the given config. If the
// subscription already exists an error will be returned.
//
// By default, a new subscription will only receive messages published after
// the subscription was created. Use StartingOffset to override.
// the subscription was created. Use AtTargetLocation to initialize the
// subscription to another location within the message backlog.
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig, opts ...CreateSubscriptionOption) (*SubscriptionConfig, error) {
var settings createSubscriptionSettings
settings := createSubscriptionSettings{
target: End,
}
for _, opt := range opts {
opt.apply(&settings)
}
Expand All @@ -195,16 +211,53 @@ func (ac *AdminClient) CreateSubscription(ctx context.Context, config Subscripti
if _, err := wire.ParseTopicPath(config.Topic); err != nil {
return nil, err
}
req := &pb.CreateSubscriptionRequest{

var skipBacklog, requiresSeek, requiresUpdate bool
switch settings.target.(type) {
case PublishTime, EventTime:
requiresSeek = true
case BacklogLocation:
skipBacklog = settings.target.(BacklogLocation) == End
}

// Request 1 - create the subscription.
createReq := &pb.CreateSubscriptionRequest{
Parent: subsPath.LocationPath().String(),
Subscription: config.toProto(),
SubscriptionId: subsPath.SubscriptionID,
SkipBacklog: settings.backlogLocation != Beginning,
SkipBacklog: skipBacklog,
}
if requiresSeek && createReq.Subscription.GetExportConfig().GetDesiredState() == pb.ExportConfig_ACTIVE {
// Export subscriptions must be paused while seeking. The state is later
// updated to active.
requiresUpdate = true
createReq.Subscription.ExportConfig.DesiredState = pb.ExportConfig_PAUSED
}
subspb, err := ac.admin.CreateSubscription(ctx, req)
subspb, err := ac.admin.CreateSubscription(ctx, createReq)
if err != nil {
return nil, err
}

// Request 2 (optional) - seek the subscription.
if requiresSeek {
if _, err = ac.SeekSubscription(ctx, subsPath.String(), settings.target); err != nil {
return nil, err
}
}

// Request 3 (optional) - make the export subscription active.
if requiresUpdate {
updateReq := &pb.UpdateSubscriptionRequest{
Subscription: &pb.Subscription{
Name: subsPath.String(),
ExportConfig: &pb.ExportConfig{DesiredState: pb.ExportConfig_ACTIVE},
},
UpdateMask: &fmpb.FieldMask{Paths: []string{"export_config.desired_state"}},
}
if subspb, err = ac.admin.UpdateSubscription(ctx, updateReq); err != nil {
return nil, err
}
}
return protoToSubscriptionConfig(subspb), nil
}

Expand Down
Loading