Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Client abstraction #1416

Merged
merged 2 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion provider/pkg/provider/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type CloudControlApiClient interface {
// without awaiting any long-running operations.
CreateResource(ctx context.Context, cfType, desiredState string) (*types.ProgressEvent, error)

// UpdateResource updates a resource of the specified type with the desired state.
// UpdateResource updates a resource of the specified type with the specified changeset.
// It returns a ProgressEvent which is the initial progress returned directly from the API call,
// without awaiting any long-running operations.
// The changes to be applied are expressed as a list of JSON patch operations.
Expand Down
143 changes: 143 additions & 0 deletions provider/pkg/provider/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2016-2024, Pulumi Corporation.

package client

import (
"context"
"encoding/json"
"fmt"

"github.com/aws/aws-sdk-go-v2/service/cloudcontrol"
"github.com/aws/aws-sdk-go-v2/service/cloudcontrol/types"
"github.com/mattbaird/jsonpatch"
"github.com/pkg/errors"
"github.com/pulumi/pulumi-aws-native/provider/pkg/schema"
)

// CloudControlApiClient providers CRUD operations around Cloud Control API, with the mechanics of API calls abstracted away.
// For instance, it serializes and deserializes wire data and follows the protocol of long-running operations.
type CloudControlClient interface {
// Create creates a resource of the specified type with the desired state.
// It awaits the operation until completion and returns a map of output property values.
Create(ctx context.Context, typeName string, desiredState map[string]any) (identifier *string, resourceState map[string]any, err error)

// Read returns the current state of the specified resource. It deserializes
// the response from the service into a map of untyped values.
Read(ctx context.Context, typeName, identifier string) (map[string]interface{}, error)

// Update updates a resource of the specified type with the specified changeset.
// It awaits the operation until completion and returns a map of output property values.
Update(ctx context.Context, typeName, identifier string, patches []jsonpatch.JsonPatchOperation) (map[string]interface{}, error)

// Delete deletes a resource of the specified type with the given identifier.
// It awaits the operation until completion.
Delete(ctx context.Context, typeName, identifier string) error
}

type clientImpl struct {
api CloudControlApiClient
awaiter CloudControlAwaiter
}

func NewCloudControlClient(cctl *cloudcontrol.Client, roleArn *string) CloudControlClient {
api := NewCloudControlApiClient(cctl, roleArn)
return &clientImpl{
api: api,
awaiter: NewCloudControlAwaiter(api),
}
}

func (c *clientImpl) Create(ctx context.Context, typeName string, desiredState map[string]any) (identifier *string, resourceState map[string]any, err error) {
// Serialize inputs as a desired state JSON.
jsonBytes, err := json.Marshal(desiredState)
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal as JSON: %w", err)
}
payload := string(jsonBytes)

res, err := c.api.CreateResource(ctx, typeName, payload)
if err != nil {
return nil, nil, fmt.Errorf("creating resource: %w", err)
}

pi, waitErr := c.awaiter.WaitForResourceOpCompletion(ctx, res)

// Read the state - even if there was a creation error but the progress event contains a resource ID.
var id string
var outputs map[string]interface{}
var readErr error
if pi != nil && pi.Identifier != nil {
// Retrieve the resource state from AWS.
// Note that we do so even if creation hasn't succeeded but the identifier is assigned.
id = *pi.Identifier
resourceState, err := c.api.GetResource(ctx, typeName, id)
if err != nil {
readErr = fmt.Errorf("reading resource state: %w", err)
} else {
outputs = schema.CfnToSdk(resourceState)
}
}

if waitErr != nil {
if id == "" {
return nil, nil, waitErr
}

if readErr != nil {
return nil, nil, fmt.Errorf("resource partially created but read failed. read error: %v, create error: %w", readErr, waitErr)
}

// Resource was created but failed to fully initialize.
// If it has some state, return a partial error.
return &id, outputs, waitErr
}
if pi.Identifier == nil {
return nil, nil, errors.New("received nil identifier while reading resource state")
}
if readErr != nil {
return nil, nil, fmt.Errorf("reading resource state: %w", readErr)
}

return &id, outputs, nil
}

func (c *clientImpl) Read(ctx context.Context, typeName, identifier string) (map[string]interface{}, error) {
return c.api.GetResource(ctx, typeName, identifier)
}

func (c *clientImpl) Update(ctx context.Context, typeName, identifier string, patches []jsonpatch.JsonPatchOperation) (map[string]interface{}, error) {
res, err := c.api.UpdateResource(ctx, typeName, identifier, patches)
if err != nil {
return nil, err
}

if _, err = c.awaiter.WaitForResourceOpCompletion(ctx, res); err != nil {
return nil, err
}

resourceState, err := c.api.GetResource(ctx, typeName, identifier)
if err != nil {
return nil, fmt.Errorf("reading resource state: %w", err)
}

return resourceState, nil
}

func (c *clientImpl) Delete(ctx context.Context, typeName, identifier string) error {
res, err := c.api.DeleteResource(ctx, typeName, identifier)
if err != nil {
return err
}

pi, err := c.awaiter.WaitForResourceOpCompletion(ctx, res)

if err != nil && pi != nil {
errorCode := pi.ErrorCode
if errorCode == types.HandlerErrorCodeNotFound {
// NotFound means that the resource was already deleted, so the operation can succeed.
return nil
}
}

return err
}
96 changes: 19 additions & 77 deletions provider/pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/cloudcontrol"
"github.com/aws/aws-sdk-go-v2/service/cloudcontrol/types"
"github.com/aws/aws-sdk-go-v2/service/cloudformation"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ssm"
Expand Down Expand Up @@ -110,12 +109,11 @@ type cfnProvider struct {

pulumiSchema []byte

cfn *cloudformation.Client
cctl client.CloudControlApiClient
awaiter client.CloudControlAwaiter
ec2 *ec2.Client
ssm *ssm.Client
sts *sts.Client
cfn *cloudformation.Client
ccc client.CloudControlClient
ec2 *ec2.Client
ssm *ssm.Client
sts *sts.Client
}

var _ pulumirpc.ResourceProviderServer = (*cfnProvider)(nil)
Expand Down Expand Up @@ -483,8 +481,7 @@ func (p *cfnProvider) Configure(ctx context.Context, req *pulumirpc.ConfigureReq
}

p.cfn = cloudformation.NewFromConfig(cfg)
p.cctl = client.NewCloudControlApiClient(cloudcontrol.NewFromConfig(cfg), p.roleArn)
p.awaiter = client.NewCloudControlAwaiter(p.cctl)
p.ccc = client.NewCloudControlClient(cloudcontrol.NewFromConfig(cfg), p.roleArn)
p.ec2 = ec2.NewFromConfig(cfg)
p.ssm = ssm.NewFromConfig(cfg)
p.sts = sts.NewFromConfig(cfg)
Expand Down Expand Up @@ -622,7 +619,7 @@ func (p *cfnProvider) getInvokeFunc(ctx context.Context, tok string) (invokeFunc
}
identifier := strings.Join(idParts, "|")
glog.V(9).Infof("%s invoking", cf.CfType)
outputs, err := p.cctl.GetResource(ctx, cf.CfType, identifier)
outputs, err := p.ccc.Read(ctx, cf.CfType, identifier)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -839,38 +836,14 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest)
}
}

// Serialize inputs as a desired state JSON.
jsonBytes, err := json.Marshal(payload)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal as JSON")
}
desiredState := string(jsonBytes)

// Create the resource with Cloud API.
glog.V(9).Infof("%s.CreateResource %q state %q", label, cfType, desiredState)
res, err := p.cctl.CreateResource(ctx, cfType, desiredState)
if err != nil {
return nil, errors.Wrapf(err, "creating resource")
}
pi, waitErr := p.awaiter.WaitForResourceOpCompletion(p.canceler.context, res)

// Read the state - even if there was a creation error but the progress event contains a resource ID.
var id string
var outputs map[string]interface{}
var readErr error
if pi != nil && pi.Identifier != nil {
// Retrieve the resource state from AWS.
// Note that we do so even if creation hasn't succeeded but the identifier is assigned.
id = *pi.Identifier
glog.V(9).Infof("%s.GetResource %q id %q", label, cfType, id)
resourceState, err := p.cctl.GetResource(ctx, cfType, id)
if err != nil {
readErr = fmt.Errorf("reading resource state: %w", err)
} else {
outputs = schema.CfnToSdk(resourceState)
}
glog.V(9).Infof("%s.CreateResource %q", label, cfType)
id, resourceState, createErr := p.ccc.Create(ctx, cfType, payload)
if createErr != nil && (id == nil || resourceState == nil) {
return nil, errors.Wrapf(createErr, "creating resource")
}

outputs := schema.CfnToSdk(resourceState)
// Write-only properties are not returned in the outputs, so we assume they should have the same value we sent from the inputs.
if hasSpec && len(spec.WriteOnly) > 0 {
inputsMap := inputs.Mappable()
Expand All @@ -884,16 +857,9 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest)
}
}

if waitErr != nil {
if id == "" {
return nil, waitErr
}

if createErr != nil {
// Resource was created but failed to fully initialize.
// If it has some state, return a partial error.
if readErr != nil {
return nil, fmt.Errorf("resource partially created but read failed. read error: %v, create error: %w", readErr, waitErr)
}
// It has some state, so we return a partial error.
obj := checkpointObject(inputs, outputs)
checkpoint, err := plugin.MarshalProperties(
obj,
Expand All @@ -907,13 +873,7 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest)
if err != nil {
return nil, fmt.Errorf("marshalling currentResourceStateCheckpoint: %w", err)
}
return nil, partialError(id, waitErr, checkpoint, req.GetProperties())
}
if pi.Identifier == nil {
return nil, errors.New("received nil identifier while reading resource state")
}
if readErr != nil {
return nil, fmt.Errorf("reading resource state: %w", readErr)
return nil, partialError(*id, createErr, checkpoint, req.GetProperties())
}

switch resourceToken {
Expand All @@ -935,7 +895,7 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest)
}

return &pulumirpc.CreateResponse{
Id: id,
Id: *id,
Properties: checkpoint,
}, nil
}
Expand All @@ -960,7 +920,7 @@ func (p *cfnProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*pu
if !ok {
return nil, errors.Errorf("Resource type %s not found", resourceToken)
}
resourceState, err := p.cctl.GetResource(p.canceler.context, spec.CfType, id)
resourceState, err := p.ccc.Read(p.canceler.context, spec.CfType, id)
if err != nil {
var oe *smithy.OperationError
if errors.As(err, &oe) {
Expand Down Expand Up @@ -1097,18 +1057,10 @@ func (p *cfnProvider) Update(ctx context.Context, req *pulumirpc.UpdateRequest)
}

glog.V(9).Infof("%s.UpdateResource %q id %q state %+v", label, spec.CfType, id, ops)
res, err := p.cctl.UpdateResource(p.canceler.context, spec.CfType, id, ops)
resourceState, err := p.ccc.Update(p.canceler.context, spec.CfType, id, ops)
if err != nil {
return nil, err
}
if _, err = p.awaiter.WaitForResourceOpCompletion(p.canceler.context, res); err != nil {
return nil, err
}

resourceState, err := p.cctl.GetResource(p.canceler.context, spec.CfType, id)
if err != nil {
return nil, errors.Wrapf(err, "reading resource state")
}
outputs := schema.CfnToSdk(resourceState)

// Write-only properties are not returned in the outputs, so we assume they should have the same value we sent from the inputs.
Expand Down Expand Up @@ -1164,20 +1116,10 @@ func (p *cfnProvider) Delete(ctx context.Context, req *pulumirpc.DeleteRequest)
id := req.GetId()

glog.V(9).Infof("%s.DeleteResource %q id %q", label, cfType, id)
res, err := p.cctl.DeleteResource(p.canceler.context, cfType, id)
err := p.ccc.Delete(p.canceler.context, cfType, id)
if err != nil {
return nil, err
}
if pi, err := p.awaiter.WaitForResourceOpCompletion(ctx, res); err != nil {
if pi != nil {
errorCode := pi.ErrorCode
if errorCode == types.HandlerErrorCodeNotFound {
// NotFound means that the resource was already deleted, so the operation can succeed.
return &pbempty.Empty{}, nil
}
}
return nil, err
}

return &pbempty.Empty{}, nil
}
Expand Down
Loading