diff --git a/provider/pkg/provider/client/client.go b/provider/pkg/provider/client/client.go new file mode 100644 index 0000000000..f1b666b568 --- /dev/null +++ b/provider/pkg/provider/client/client.go @@ -0,0 +1,130 @@ +// Copyright 2016-2024, Pulumi Corporation. + +package client + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/aws/aws-sdk-go-v2/service/cloudcontrol" + "github.com/aws/aws-sdk-go-v2/service/cloudcontrol/types" + "github.com/mattbaird/jsonpatch" + "github.com/pkg/errors" + "github.com/pulumi/pulumi-aws-native/provider/pkg/schema" +) + +type Client interface { + Create(ctx context.Context, typeName string, desiredState map[string]any) (identifier *string, resourceState map[string]any, err error) + Read(ctx context.Context, typeName, identifier string) (map[string]interface{}, error) + Update(ctx context.Context, typeName, identifier string, patches []jsonpatch.JsonPatchOperation) (map[string]interface{}, error) + Delete(ctx context.Context, typeName, identifier string) error +} + +type clientImpl struct { + api CloudControlApiClient + awaiter CloudControlAwaiter +} + +func NewClient(cctl *cloudcontrol.Client, roleArn *string) Client { + api := NewCloudControlApiClient(cctl, roleArn) + return &clientImpl{ + api: api, + awaiter: NewCloudControlAwaiter(api), + } +} + +func (c *clientImpl) Create(ctx context.Context, typeName string, desiredState map[string]any) (identifier *string, resourceState map[string]any, err error) { + // Serialize inputs as a desired state JSON. + jsonBytes, err := json.Marshal(desiredState) + if err != nil { + return nil, nil, fmt.Errorf("failed to marshal as JSON: %w", err) + } + payload := string(jsonBytes) + + res, err := c.api.CreateResource(ctx, typeName, payload) + if err != nil { + return nil, nil, fmt.Errorf("creating resource: %w", err) + } + + pi, waitErr := c.awaiter.WaitForResourceOpCompletion(ctx, res) + + // Read the state - even if there was a creation error but the progress event contains a resource ID. + var id string + var outputs map[string]interface{} + var readErr error + if pi != nil && pi.Identifier != nil { + // Retrieve the resource state from AWS. + // Note that we do so even if creation hasn't succeeded but the identifier is assigned. + id = *pi.Identifier + resourceState, err := c.api.GetResource(ctx, typeName, id) + if err != nil { + readErr = fmt.Errorf("reading resource state: %w", err) + } else { + outputs = schema.CfnToSdk(resourceState) + } + } + + if waitErr != nil { + if id == "" { + return nil, nil, waitErr + } + + if readErr != nil { + return nil, nil, fmt.Errorf("resource partially created but read failed. read error: %v, create error: %w", readErr, waitErr) + } + + // Resource was created but failed to fully initialize. + // If it has some state, return a partial error. + return &id, outputs, waitErr + } + if pi.Identifier == nil { + return nil, nil, errors.New("received nil identifier while reading resource state") + } + if readErr != nil { + return nil, nil, fmt.Errorf("reading resource state: %w", readErr) + } + + return &id, outputs, nil +} + +func (c *clientImpl) Read(ctx context.Context, typeName, identifier string) (map[string]interface{}, error) { + return c.api.GetResource(ctx, typeName, identifier) +} + +func (c *clientImpl) Update(ctx context.Context, typeName, identifier string, patches []jsonpatch.JsonPatchOperation) (map[string]interface{}, error) { + res, err := c.api.UpdateResource(ctx, typeName, identifier, patches) + if err != nil { + return nil, err + } + + if _, err = c.awaiter.WaitForResourceOpCompletion(ctx, res); err != nil { + return nil, err + } + + resourceState, err := c.api.GetResource(ctx, typeName, identifier) + if err != nil { + return nil, fmt.Errorf("reading resource state: %w", err) + } + + return resourceState, nil +} + +func (c *clientImpl) Delete(ctx context.Context, typeName, identifier string) error { + res, err := c.api.DeleteResource(ctx, typeName, identifier) + if err != nil { + return err + } + + pi, err := c.awaiter.WaitForResourceOpCompletion(ctx, res) + + if err != nil && pi != nil { + errorCode := pi.ErrorCode + if errorCode == types.HandlerErrorCodeNotFound { + // NotFound means that the resource was already deleted, so the operation can succeed. + return nil + } + } + + return err +} diff --git a/provider/pkg/provider/provider.go b/provider/pkg/provider/provider.go index ce1d725427..6a151e114e 100644 --- a/provider/pkg/provider/provider.go +++ b/provider/pkg/provider/provider.go @@ -38,7 +38,6 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials/stscreds" "github.com/aws/aws-sdk-go-v2/service/cloudcontrol" - "github.com/aws/aws-sdk-go-v2/service/cloudcontrol/types" "github.com/aws/aws-sdk-go-v2/service/cloudformation" "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/ssm" @@ -110,12 +109,11 @@ type cfnProvider struct { pulumiSchema []byte - cfn *cloudformation.Client - cctl client.CloudControlApiClient - awaiter client.CloudControlAwaiter - ec2 *ec2.Client - ssm *ssm.Client - sts *sts.Client + cfn *cloudformation.Client + client client.Client + ec2 *ec2.Client + ssm *ssm.Client + sts *sts.Client } var _ pulumirpc.ResourceProviderServer = (*cfnProvider)(nil) @@ -483,8 +481,7 @@ func (p *cfnProvider) Configure(ctx context.Context, req *pulumirpc.ConfigureReq } p.cfn = cloudformation.NewFromConfig(cfg) - p.cctl = client.NewCloudControlApiClient(cloudcontrol.NewFromConfig(cfg), p.roleArn) - p.awaiter = client.NewCloudControlAwaiter(p.cctl) + p.client = client.NewClient(cloudcontrol.NewFromConfig(cfg), p.roleArn) p.ec2 = ec2.NewFromConfig(cfg) p.ssm = ssm.NewFromConfig(cfg) p.sts = sts.NewFromConfig(cfg) @@ -622,7 +619,7 @@ func (p *cfnProvider) getInvokeFunc(ctx context.Context, tok string) (invokeFunc } identifier := strings.Join(idParts, "|") glog.V(9).Infof("%s invoking", cf.CfType) - outputs, err := p.cctl.GetResource(ctx, cf.CfType, identifier) + outputs, err := p.client.Read(ctx, cf.CfType, identifier) if err != nil { return nil, err } @@ -839,40 +836,16 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest) } } - // Serialize inputs as a desired state JSON. - jsonBytes, err := json.Marshal(payload) - if err != nil { - return nil, errors.Wrap(err, "failed to marshal as JSON") - } - desiredState := string(jsonBytes) - // Create the resource with Cloud API. - glog.V(9).Infof("%s.CreateResource %q state %q", label, cfType, desiredState) - res, err := p.cctl.CreateResource(ctx, cfType, desiredState) - if err != nil { + glog.V(9).Infof("%s.CreateResource %q", label, cfType) + id, resourceState, err := p.client.Create(ctx, cfType, payload) + if err != nil && (id == nil || resourceState == nil) { return nil, errors.Wrapf(err, "creating resource") } - pi, waitErr := p.awaiter.WaitForResourceOpCompletion(p.canceler.context, res) - - // Read the state - even if there was a creation error but the progress event contains a resource ID. - var id string - var outputs map[string]interface{} - var readErr error - if pi != nil && pi.Identifier != nil { - // Retrieve the resource state from AWS. - // Note that we do so even if creation hasn't succeeded but the identifier is assigned. - id = *pi.Identifier - glog.V(9).Infof("%s.GetResource %q id %q", label, cfType, id) - resourceState, err := p.cctl.GetResource(ctx, cfType, id) - if err != nil { - readErr = fmt.Errorf("reading resource state: %w", err) - } else { - outputs = schema.CfnToSdk(resourceState) - } - } + outputs := schema.CfnToSdk(resourceState) // Write-only properties are not returned in the outputs, so we assume they should have the same value we sent from the inputs. - if hasSpec && len(spec.WriteOnly) > 0 { + if hasSpec && len(spec.WriteOnly) > 0 && outputs != nil { inputsMap := inputs.Mappable() for _, writeOnlyProp := range spec.WriteOnly { if _, ok := outputs[writeOnlyProp]; !ok { @@ -884,16 +857,9 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest) } } - if waitErr != nil { - if id == "" { - return nil, waitErr - } - + if err != nil { // Resource was created but failed to fully initialize. // If it has some state, return a partial error. - if readErr != nil { - return nil, fmt.Errorf("resource partially created but read failed. read error: %v, create error: %w", readErr, waitErr) - } obj := checkpointObject(inputs, outputs) checkpoint, err := plugin.MarshalProperties( obj, @@ -907,13 +873,7 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest) if err != nil { return nil, fmt.Errorf("marshalling currentResourceStateCheckpoint: %w", err) } - return nil, partialError(id, waitErr, checkpoint, req.GetProperties()) - } - if pi.Identifier == nil { - return nil, errors.New("received nil identifier while reading resource state") - } - if readErr != nil { - return nil, fmt.Errorf("reading resource state: %w", readErr) + return nil, partialError(*id, err, checkpoint, req.GetProperties()) } switch resourceToken { @@ -935,7 +895,7 @@ func (p *cfnProvider) Create(ctx context.Context, req *pulumirpc.CreateRequest) } return &pulumirpc.CreateResponse{ - Id: id, + Id: *id, Properties: checkpoint, }, nil } @@ -960,7 +920,7 @@ func (p *cfnProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*pu if !ok { return nil, errors.Errorf("Resource type %s not found", resourceToken) } - resourceState, err := p.cctl.GetResource(p.canceler.context, spec.CfType, id) + resourceState, err := p.client.Read(p.canceler.context, spec.CfType, id) if err != nil { var oe *smithy.OperationError if errors.As(err, &oe) { @@ -1097,18 +1057,10 @@ func (p *cfnProvider) Update(ctx context.Context, req *pulumirpc.UpdateRequest) } glog.V(9).Infof("%s.UpdateResource %q id %q state %+v", label, spec.CfType, id, ops) - res, err := p.cctl.UpdateResource(p.canceler.context, spec.CfType, id, ops) + resourceState, err := p.client.Update(p.canceler.context, spec.CfType, id, ops) if err != nil { return nil, err } - if _, err = p.awaiter.WaitForResourceOpCompletion(p.canceler.context, res); err != nil { - return nil, err - } - - resourceState, err := p.cctl.GetResource(p.canceler.context, spec.CfType, id) - if err != nil { - return nil, errors.Wrapf(err, "reading resource state") - } outputs := schema.CfnToSdk(resourceState) // Write-only properties are not returned in the outputs, so we assume they should have the same value we sent from the inputs. @@ -1164,20 +1116,10 @@ func (p *cfnProvider) Delete(ctx context.Context, req *pulumirpc.DeleteRequest) id := req.GetId() glog.V(9).Infof("%s.DeleteResource %q id %q", label, cfType, id) - res, err := p.cctl.DeleteResource(p.canceler.context, cfType, id) + err := p.client.Delete(p.canceler.context, cfType, id) if err != nil { return nil, err } - if pi, err := p.awaiter.WaitForResourceOpCompletion(ctx, res); err != nil { - if pi != nil { - errorCode := pi.ErrorCode - if errorCode == types.HandlerErrorCodeNotFound { - // NotFound means that the resource was already deleted, so the operation can succeed. - return &pbempty.Empty{}, nil - } - } - return nil, err - } return &pbempty.Empty{}, nil }