Skip to content

Commit

Permalink
Add a Client abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
mikhailshilkov committed Mar 14, 2024
1 parent 5a8508d commit efae159
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 76 deletions.
130 changes: 130 additions & 0 deletions provider/pkg/provider/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2016-2024, Pulumi Corporation.

package client

import (
"context"
"encoding/json"
"fmt"

"github.com/aws/aws-sdk-go-v2/service/cloudcontrol"
"github.com/aws/aws-sdk-go-v2/service/cloudcontrol/types"
"github.com/mattbaird/jsonpatch"
"github.com/pkg/errors"
"github.com/pulumi/pulumi-aws-native/provider/pkg/schema"
)

type Client interface {
Create(ctx context.Context, typeName string, desiredState map[string]any) (identifier *string, resourceState map[string]any, err error)
Read(ctx context.Context, typeName, identifier string) (map[string]interface{}, error)
Update(ctx context.Context, typeName, identifier string, patches []jsonpatch.JsonPatchOperation) (map[string]interface{}, error)
Delete(ctx context.Context, typeName, identifier string) error
}

type clientImpl struct {
api CloudControlApiClient
awaiter CloudControlAwaiter
}

func NewClient(cctl *cloudcontrol.Client, roleArn *string) Client {
api := NewCloudControlApiClient(cctl, roleArn)
return &clientImpl{
api: api,
awaiter: NewCloudControlAwaiter(api),
}
}

func (c *clientImpl) Create(ctx context.Context, typeName string, desiredState map[string]any) (identifier *string, resourceState map[string]any, err error) {
// Serialize inputs as a desired state JSON.
jsonBytes, err := json.Marshal(desiredState)
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal as JSON: %w", err)
}
payload := string(jsonBytes)

res, err := c.api.CreateResource(ctx, typeName, payload)
if err != nil {
return nil, nil, fmt.Errorf("creating resource: %w", err)
}

pi, waitErr := c.awaiter.WaitForResourceOpCompletion(ctx, res)

// Read the state - even if there was a creation error but the progress event contains a resource ID.
var id string
var outputs map[string]interface{}
var readErr error
if pi != nil && pi.Identifier != nil {
// Retrieve the resource state from AWS.
// Note that we do so even if creation hasn't succeeded but the identifier is assigned.
id = *pi.Identifier
resourceState, err := c.api.GetResource(ctx, typeName, id)
if err != nil {
readErr = fmt.Errorf("reading resource state: %w", err)
} else {
outputs = schema.CfnToSdk(resourceState)
}
}

if waitErr != nil {
if id == "" {
return nil, nil, waitErr
}

if readErr != nil {
return nil, nil, fmt.Errorf("resource partially created but read failed. read error: %v, create error: %w", readErr, waitErr)
}

// Resource was created but failed to fully initialize.
// If it has some state, return a partial error.
return &id, outputs, waitErr
}
if pi.Identifier == nil {
return nil, nil, errors.New("received nil identifier while reading resource state")
}
if readErr != nil {
return nil, nil, fmt.Errorf("reading resource state: %w", readErr)
}

return &id, outputs, nil
}

func (c *clientImpl) Read(ctx context.Context, typeName, identifier string) (map[string]interface{}, error) {
return c.api.GetResource(ctx, typeName, identifier)
}

func (c *clientImpl) Update(ctx context.Context, typeName, identifier string, patches []jsonpatch.JsonPatchOperation) (map[string]interface{}, error) {
res, err := c.api.UpdateResource(ctx, typeName, identifier, patches)
if err != nil {
return nil, err
}

if _, err = c.awaiter.WaitForResourceOpCompletion(ctx, res); err != nil {
return nil, err
}

resourceState, err := c.api.GetResource(ctx, typeName, identifier)
if err != nil {
return nil, fmt.Errorf("reading resource state: %w", err)
}

return resourceState, nil
}

func (c *clientImpl) Delete(ctx context.Context, typeName, identifier string) error {
res, err := c.api.DeleteResource(ctx, typeName, identifier)
if err != nil {
return err
}

pi, err := c.awaiter.WaitForResourceOpCompletion(ctx, res)

if err != nil && pi != nil {
errorCode := pi.ErrorCode
if errorCode == types.HandlerErrorCodeNotFound {
// NotFound means that the resource was already deleted, so the operation can succeed.
return nil
}
}

return err
}
94 changes: 18 additions & 76 deletions provider/pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/cloudcontrol"
"github.com/aws/aws-sdk-go-v2/service/cloudcontrol/types"
"github.com/aws/aws-sdk-go-v2/service/cloudformation"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ssm"
Expand Down Expand Up @@ -110,12 +109,11 @@ type cfnProvider struct {

pulumiSchema []byte

cfn *cloudformation.Client
cctl client.CloudControlApiClient
awaiter client.CloudControlAwaiter
ec2 *ec2.Client
ssm *ssm.Client
sts *sts.Client
cfn *cloudformation.Client
client client.Client
ec2 *ec2.Client
ssm *ssm.Client
sts *sts.Client
}

var _ pulumirpc.ResourceProviderServer = (*cfnProvider)(nil)
Expand Down Expand Up @@ -483,8 +481,7 @@ func (p *cfnProvider) Configure(ctx context.Context, req *pulumirpc.ConfigureReq
}

p.cfn = cloudformation.NewFromConfig(cfg)
p.cctl = client.NewCloudControlApiClient(cloudcontrol.NewFromConfig(cfg), p.roleArn)
p.awaiter = client.NewCloudControlAwaiter(p.cctl)
p.client = client.NewClient(cloudcontrol.NewFromConfig(cfg), p.roleArn)
p.ec2 = ec2.NewFromConfig(cfg)
p.ssm = ssm.NewFromConfig(cfg)
p.sts = sts.NewFromConfig(cfg)
Expand Down Expand Up @@ -622,7 +619,7 @@ func (p *cfnProvider) getInvokeFunc(ctx context.Context, tok string) (invokeFunc
}
identifier := strings.Join(idParts, "|")
glog.V(9).Infof("%s invoking", cf.CfType)
outputs, err := p.cctl.GetResource(ctx, cf.CfType, identifier)
outputs, err := p.client.Read(ctx, cf.CfType, identifier)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -839,40 +836,16 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest)
}
}

// Serialize inputs as a desired state JSON.
jsonBytes, err := json.Marshal(payload)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal as JSON")
}
desiredState := string(jsonBytes)

// Create the resource with Cloud API.
glog.V(9).Infof("%s.CreateResource %q state %q", label, cfType, desiredState)
res, err := p.cctl.CreateResource(ctx, cfType, desiredState)
if err != nil {
glog.V(9).Infof("%s.CreateResource %q", label, cfType)
id, resourceState, err := p.client.Create(ctx, cfType, payload)
if err != nil && (id == nil || resourceState == nil) {
return nil, errors.Wrapf(err, "creating resource")
}
pi, waitErr := p.awaiter.WaitForResourceOpCompletion(p.canceler.context, res)

// Read the state - even if there was a creation error but the progress event contains a resource ID.
var id string
var outputs map[string]interface{}
var readErr error
if pi != nil && pi.Identifier != nil {
// Retrieve the resource state from AWS.
// Note that we do so even if creation hasn't succeeded but the identifier is assigned.
id = *pi.Identifier
glog.V(9).Infof("%s.GetResource %q id %q", label, cfType, id)
resourceState, err := p.cctl.GetResource(ctx, cfType, id)
if err != nil {
readErr = fmt.Errorf("reading resource state: %w", err)
} else {
outputs = schema.CfnToSdk(resourceState)
}
}

outputs := schema.CfnToSdk(resourceState)
// Write-only properties are not returned in the outputs, so we assume they should have the same value we sent from the inputs.
if hasSpec && len(spec.WriteOnly) > 0 {
if hasSpec && len(spec.WriteOnly) > 0 && outputs != nil {
inputsMap := inputs.Mappable()
for _, writeOnlyProp := range spec.WriteOnly {
if _, ok := outputs[writeOnlyProp]; !ok {
Expand All @@ -884,16 +857,9 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest)
}
}

if waitErr != nil {
if id == "" {
return nil, waitErr
}

if err != nil {
// Resource was created but failed to fully initialize.
// If it has some state, return a partial error.
if readErr != nil {
return nil, fmt.Errorf("resource partially created but read failed. read error: %v, create error: %w", readErr, waitErr)
}
obj := checkpointObject(inputs, outputs)
checkpoint, err := plugin.MarshalProperties(
obj,
Expand All @@ -907,13 +873,7 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest)
if err != nil {
return nil, fmt.Errorf("marshalling currentResourceStateCheckpoint: %w", err)
}
return nil, partialError(id, waitErr, checkpoint, req.GetProperties())
}
if pi.Identifier == nil {
return nil, errors.New("received nil identifier while reading resource state")
}
if readErr != nil {
return nil, fmt.Errorf("reading resource state: %w", readErr)
return nil, partialError(*id, err, checkpoint, req.GetProperties())
}

switch resourceToken {
Expand All @@ -935,7 +895,7 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest)
}

return &pulumirpc.CreateResponse{
Id: id,
Id: *id,
Properties: checkpoint,
}, nil
}
Expand All @@ -960,7 +920,7 @@ func (p *cfnProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*pu
if !ok {
return nil, errors.Errorf("Resource type %s not found", resourceToken)
}
resourceState, err := p.cctl.GetResource(p.canceler.context, spec.CfType, id)
resourceState, err := p.client.Read(p.canceler.context, spec.CfType, id)
if err != nil {
var oe *smithy.OperationError
if errors.As(err, &oe) {
Expand Down Expand Up @@ -1097,18 +1057,10 @@ func (p *cfnProvider) Update(ctx context.Context, req *pulumirpc.UpdateRequest)
}

glog.V(9).Infof("%s.UpdateResource %q id %q state %+v", label, spec.CfType, id, ops)
res, err := p.cctl.UpdateResource(p.canceler.context, spec.CfType, id, ops)
resourceState, err := p.client.Update(p.canceler.context, spec.CfType, id, ops)
if err != nil {
return nil, err
}
if _, err = p.awaiter.WaitForResourceOpCompletion(p.canceler.context, res); err != nil {
return nil, err
}

resourceState, err := p.cctl.GetResource(p.canceler.context, spec.CfType, id)
if err != nil {
return nil, errors.Wrapf(err, "reading resource state")
}
outputs := schema.CfnToSdk(resourceState)

// Write-only properties are not returned in the outputs, so we assume they should have the same value we sent from the inputs.
Expand Down Expand Up @@ -1164,20 +1116,10 @@ func (p *cfnProvider) Delete(ctx context.Context, req *pulumirpc.DeleteRequest)
id := req.GetId()

glog.V(9).Infof("%s.DeleteResource %q id %q", label, cfType, id)
res, err := p.cctl.DeleteResource(p.canceler.context, cfType, id)
err := p.client.Delete(p.canceler.context, cfType, id)
if err != nil {
return nil, err
}
if pi, err := p.awaiter.WaitForResourceOpCompletion(ctx, res); err != nil {
if pi != nil {
errorCode := pi.ErrorCode
if errorCode == types.HandlerErrorCodeNotFound {
// NotFound means that the resource was already deleted, so the operation can succeed.
return &pbempty.Empty{}, nil
}
}
return nil, err
}

return &pbempty.Empty{}, nil
}
Expand Down

0 comments on commit efae159

Please sign in to comment.