From 75e8568a11e7334d1ce2550fa0b809e354dca383 Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 01:34:09 +0800 Subject: [PATCH 01/21] Add osis pipeline --- internal/service/osis/pipeline.go | 855 ++++++++++++++++++++++++++++++ 1 file changed, 855 insertions(+) create mode 100644 internal/service/osis/pipeline.go diff --git a/internal/service/osis/pipeline.go b/internal/service/osis/pipeline.go new file mode 100644 index 000000000000..65f2a45203fe --- /dev/null +++ b/internal/service/osis/pipeline.go @@ -0,0 +1,855 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package osis + +import ( + "context" + "errors" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/osis" + awstypes "github.com/aws/aws-sdk-go-v2/service/osis/types" + "github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts" + "github.com/hashicorp/terraform-plugin-framework-validators/int64validator" + "github.com/hashicorp/terraform-plugin-framework-validators/listvalidator" + "github.com/hashicorp/terraform-plugin-framework-validators/setvalidator" + "github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator" + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/path" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/resource/schema" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/setplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/schema/validator" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry" + "github.com/hashicorp/terraform-provider-aws/internal/create" + "github.com/hashicorp/terraform-provider-aws/internal/enum" + "github.com/hashicorp/terraform-provider-aws/internal/framework" + "github.com/hashicorp/terraform-provider-aws/internal/framework/flex" + fwtypes "github.com/hashicorp/terraform-provider-aws/internal/framework/types" + tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" + "github.com/hashicorp/terraform-provider-aws/names" +) + +// @FrameworkResource(name="Pipeline") +// @Tags(identifierAttribute="arn") +func newResourcePipeline(_ context.Context) (resource.ResourceWithConfigure, error) { + r := &resourcePipeline{} + + r.SetDefaultCreateTimeout(30 * time.Minute) + r.SetDefaultUpdateTimeout(30 * time.Minute) + r.SetDefaultDeleteTimeout(30 * time.Minute) + + return r, nil +} + +const ( + ResNamePipeline = "Pipeline" + iamPropagationTimeout = time.Minute * 1 +) + +type resourcePipeline struct { + framework.ResourceWithConfigure + framework.WithTimeouts +} + +func (r *resourcePipeline) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) { + resp.TypeName = "aws_osis_pipeline" +} + +func (r *resourcePipeline) Schema(ctx context.Context, req resource.SchemaRequest, resp *resource.SchemaResponse) { + resp.Schema = schema.Schema{ + Attributes: map[string]schema.Attribute{ + "arn": framework.ARNAttributeComputedOnly(), + "id": framework.IDAttribute(), + "ingest_endpoint_urls": schema.SetAttribute{ + Computed: true, + ElementType: types.StringType, + PlanModifiers: []planmodifier.Set{ + setplanmodifier.UseStateForUnknown(), + }, + }, + "max_units": schema.Int64Attribute{ + Required: true, + Validators: []validator.Int64{ + int64validator.AtLeast(1), + }, + }, + "min_units": schema.Int64Attribute{ + Required: true, + Validators: []validator.Int64{ + int64validator.AtLeast(1), + }, + }, + "pipeline_configuration_body": schema.StringAttribute{ + Required: true, + Validators: []validator.String{ + stringvalidator.LengthBetween(1, 24000), + }, + }, + "pipeline_name": schema.StringAttribute{ + Required: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.RequiresReplace(), + }, + Validators: []validator.String{ + stringvalidator.LengthBetween(3, 28), + }, + }, + names.AttrTags: tftags.TagsAttribute(), + names.AttrTagsAll: tftags.TagsAttributeComputedOnly(), + }, + Blocks: map[string]schema.Block{ + "buffer_options": schema.ListNestedBlock{ + Validators: []validator.List{ + listvalidator.SizeAtMost(1), + }, + NestedObject: schema.NestedBlockObject{ + Attributes: map[string]schema.Attribute{ + "persistent_buffer_enabled": schema.BoolAttribute{ + Required: true, + }, + }, + }, + }, + "encryption_at_rest_options": schema.ListNestedBlock{ + Validators: []validator.List{ + listvalidator.SizeAtMost(1), + }, + NestedObject: schema.NestedBlockObject{ + Attributes: map[string]schema.Attribute{ + "kms_key_arn": schema.StringAttribute{ + PlanModifiers: []planmodifier.String{ + stringplanmodifier.UseStateForUnknown(), + }, + CustomType: fwtypes.ARNType, + Required: true, + Validators: []validator.String{ + stringvalidator.LengthBetween(7, 2048), + }, + }, + }, + }, + }, + "log_publishing_options": schema.ListNestedBlock{ + Validators: []validator.List{ + listvalidator.SizeAtMost(1), + }, + NestedObject: schema.NestedBlockObject{ + Attributes: map[string]schema.Attribute{ + "is_logging_enabled": schema.BoolAttribute{ + Optional: true, + }, + }, + Blocks: map[string]schema.Block{ + "cloudwatch_log_destination": schema.ListNestedBlock{ + Validators: []validator.List{ + listvalidator.SizeAtMost(1), + }, + NestedObject: schema.NestedBlockObject{ + Attributes: map[string]schema.Attribute{ + "log_group": schema.StringAttribute{ + Required: true, + Validators: []validator.String{ + stringvalidator.LengthBetween(1, 512), + }, + }, + }, + }, + }, + }, + }, + }, + "vpc_options": schema.ListNestedBlock{ + Validators: []validator.List{ + listvalidator.SizeAtMost(1), + }, + NestedObject: schema.NestedBlockObject{ + Attributes: map[string]schema.Attribute{ + "security_group_ids": schema.SetAttribute{ + Optional: true, + ElementType: types.StringType, + Validators: []validator.Set{ + setvalidator.SizeBetween(1, 12), + setvalidator.ValueStringsAre( + stringvalidator.All( + stringvalidator.LengthAtLeast(11), + stringvalidator.LengthAtMost(20), + ), + ), + }, + }, + "subnet_ids": schema.SetAttribute{ + Required: true, + ElementType: types.StringType, + Validators: []validator.Set{ + setvalidator.SizeBetween(1, 12), + setvalidator.ValueStringsAre( + stringvalidator.All( + stringvalidator.LengthAtLeast(15), + stringvalidator.LengthAtMost(24), + ), + ), + }, + }, + }, + }, + }, + "timeouts": timeouts.Block(ctx, timeouts.Opts{ + Create: true, + Update: true, + Delete: true, + }), + }, + } +} + +func (r *resourcePipeline) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) { + conn := r.Meta().OpenSearchIngestionClient(ctx) + + var plan resourcePipelineData + resp.Diagnostics.Append(req.Plan.Get(ctx, &plan)...) + if resp.Diagnostics.HasError() { + return + } + + in := &osis.CreatePipelineInput{ + MaxUnits: aws.Int32(int32(plan.MaxUnits.ValueInt64())), + MinUnits: aws.Int32(int32(plan.MinUnits.ValueInt64())), + PipelineConfigurationBody: aws.String(plan.PipelineConfigurationBody.ValueString()), + PipelineName: aws.String(plan.PipelineName.ValueString()), + Tags: getTagsIn(ctx), + } + + if !plan.BufferOptions.IsNull() { + var bufferOptions []bufferOptionsData + resp.Diagnostics.Append(plan.BufferOptions.ElementsAs(ctx, &bufferOptions, false)...) + if resp.Diagnostics.HasError() { + return + } + + in.BufferOptions = expandBufferOptions(bufferOptions) + } + + if !plan.EncryptionAtRestOptions.IsNull() { + var encryptionAtRestOptions []encryptionAtRestOptionsData + resp.Diagnostics.Append(plan.EncryptionAtRestOptions.ElementsAs(ctx, &encryptionAtRestOptions, false)...) + if resp.Diagnostics.HasError() { + return + } + + in.EncryptionAtRestOptions = expandEncryptionAtRestOptions(encryptionAtRestOptions) + } + + if !plan.LogPublishingOptions.IsNull() { + var logPublishingOptions []logPublishingOptionsData + resp.Diagnostics.Append(plan.LogPublishingOptions.ElementsAs(ctx, &logPublishingOptions, false)...) + if resp.Diagnostics.HasError() { + return + } + + logPublishingOptionsInput, d := expandLogPublishingOptions(ctx, logPublishingOptions) + resp.Diagnostics.Append(d...) + if resp.Diagnostics.HasError() { + return + } + in.LogPublishingOptions = logPublishingOptionsInput + } + + if !plan.VpcOptions.IsNull() { + var vpcOptions []vpcOptionsData + resp.Diagnostics.Append(plan.VpcOptions.ElementsAs(ctx, &vpcOptions, false)...) + if resp.Diagnostics.HasError() { + return + } + in.VpcOptions = expandVpcOptions(ctx, vpcOptions) + } + + // Retry for IAM eventual consistency + var out *osis.CreatePipelineOutput + err := tfresource.Retry(ctx, iamPropagationTimeout, func() *retry.RetryError { + var err error + out, err = conn.CreatePipeline(ctx, in) + if err != nil { + var ve *awstypes.ValidationException + if errors.As(err, &ve) { + return retry.RetryableError(err) + } + return retry.NonRetryableError(err) + } + + return nil + }) + + if err != nil { + resp.Diagnostics.AddError( + create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionCreating, ResNamePipeline, plan.PipelineName.String(), err), + err.Error(), + ) + return + } + if out == nil || out.Pipeline == nil { + resp.Diagnostics.AddError( + create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionCreating, ResNamePipeline, plan.PipelineName.String(), nil), + errors.New("empty output").Error(), + ) + return + } + + state := plan + state.ID = flex.StringToFramework(ctx, out.Pipeline.PipelineName) + + createTimeout := r.CreateTimeout(ctx, plan.Timeouts) + waitOut, err := waitPipelineCreated(ctx, conn, aws.ToString(out.Pipeline.PipelineName), createTimeout) + if err != nil { + resp.Diagnostics.AddError( + create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionWaitingForCreation, ResNamePipeline, plan.PipelineName.String(), err), + err.Error(), + ) + return + } + + resp.Diagnostics.Append(state.refreshFromOutput(ctx, waitOut)...) + resp.Diagnostics.Append(resp.State.Set(ctx, &state)...) +} + +func (r *resourcePipeline) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) { + conn := r.Meta().OpenSearchIngestionClient(ctx) + + var state resourcePipelineData + resp.Diagnostics.Append(req.State.Get(ctx, &state)...) + if resp.Diagnostics.HasError() { + return + } + + out, err := findPipelineByID(ctx, conn, state.ID.ValueString()) + if tfresource.NotFound(err) { + resp.State.RemoveResource(ctx) + return + } + if err != nil { + resp.Diagnostics.AddError( + create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionSetting, ResNamePipeline, state.ID.String(), err), + err.Error(), + ) + return + } + + resp.Diagnostics.Append(state.refreshFromOutput(ctx, out)...) + resp.Diagnostics.Append(resp.State.Set(ctx, &state)...) +} + +func (r *resourcePipeline) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) { + conn := r.Meta().OpenSearchIngestionClient(ctx) + + var plan, state resourcePipelineData + resp.Diagnostics.Append(req.Plan.Get(ctx, &plan)...) + resp.Diagnostics.Append(req.State.Get(ctx, &state)...) + if resp.Diagnostics.HasError() { + return + } + + if !plan.BufferOptions.Equal(state.BufferOptions) || + !plan.EncryptionAtRestOptions.Equal(state.EncryptionAtRestOptions) || + !plan.LogPublishingOptions.Equal(state.LogPublishingOptions) || + !plan.MaxUnits.Equal(state.MaxUnits) || + !plan.MinUnits.Equal(state.MinUnits) || + !plan.PipelineConfigurationBody.Equal(state.PipelineConfigurationBody) { + in := &osis.UpdatePipelineInput{ + PipelineName: aws.String(plan.PipelineName.ValueString()), + } + + if !plan.MaxUnits.IsNull() { + in.MaxUnits = aws.Int32(int32(plan.MaxUnits.ValueInt64())) + } + + if !plan.MinUnits.IsNull() { + in.MinUnits = aws.Int32(int32(plan.MinUnits.ValueInt64())) + } + + if !plan.PipelineConfigurationBody.IsNull() { + in.PipelineConfigurationBody = aws.String(plan.PipelineConfigurationBody.ValueString()) + } + + if !plan.BufferOptions.IsNull() { + var bufferOptions []bufferOptionsData + resp.Diagnostics.Append(plan.BufferOptions.ElementsAs(ctx, &bufferOptions, false)...) + if resp.Diagnostics.HasError() { + return + } + + in.BufferOptions = expandBufferOptions(bufferOptions) + } + + if !plan.EncryptionAtRestOptions.IsNull() { + var encryptionAtRestOptions []encryptionAtRestOptionsData + resp.Diagnostics.Append(plan.EncryptionAtRestOptions.ElementsAs(ctx, &encryptionAtRestOptions, false)...) + if resp.Diagnostics.HasError() { + return + } + in.EncryptionAtRestOptions = expandEncryptionAtRestOptions(encryptionAtRestOptions) + } + if !plan.LogPublishingOptions.IsNull() { + var logPublishingOptions []logPublishingOptionsData + resp.Diagnostics.Append(plan.LogPublishingOptions.ElementsAs(ctx, &logPublishingOptions, false)...) + if resp.Diagnostics.HasError() { + return + } + + logPublishingOptionsInput, d := expandLogPublishingOptions(ctx, logPublishingOptions) + resp.Diagnostics.Append(d...) + if resp.Diagnostics.HasError() { + return + } + in.LogPublishingOptions = logPublishingOptionsInput + } + + out, err := conn.UpdatePipeline(ctx, in) + if err != nil { + resp.Diagnostics.AddError( + create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionUpdating, ResNamePipeline, plan.ID.String(), err), + err.Error(), + ) + return + } + if out == nil || out.Pipeline == nil { + resp.Diagnostics.AddError( + create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionUpdating, ResNamePipeline, plan.ID.String(), nil), + errors.New("empty output").Error(), + ) + return + } + } + updateTimeout := r.UpdateTimeout(ctx, plan.Timeouts) + waitOut, err := waitPipelineUpdated(ctx, conn, plan.ID.ValueString(), updateTimeout) + if err != nil { + resp.Diagnostics.AddError( + create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionWaitingForUpdate, ResNamePipeline, plan.ID.String(), err), + err.Error(), + ) + return + } + + resp.Diagnostics.Append(plan.refreshFromOutput(ctx, waitOut)...) + resp.Diagnostics.Append(resp.State.Set(ctx, &plan)...) +} + +func (r *resourcePipeline) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) { + conn := r.Meta().OpenSearchIngestionClient(ctx) + + var state resourcePipelineData + resp.Diagnostics.Append(req.State.Get(ctx, &state)...) + if resp.Diagnostics.HasError() { + return + } + + in := &osis.DeletePipelineInput{ + PipelineName: aws.String(state.ID.ValueString()), + } + + _, err := conn.DeletePipeline(ctx, in) + + if err != nil { + var nfe *awstypes.ResourceNotFoundException + if errors.As(err, &nfe) { + return + } + resp.Diagnostics.AddError( + create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionDeleting, ResNamePipeline, state.ID.String(), err), + err.Error(), + ) + return + } + + deleteTimeout := r.DeleteTimeout(ctx, state.Timeouts) + _, err = waitPipelineDeleted(ctx, conn, state.ID.ValueString(), deleteTimeout) + if err != nil { + resp.Diagnostics.AddError( + create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionWaitingForDeletion, ResNamePipeline, state.ID.String(), err), + err.Error(), + ) + return + } +} + +// refreshFromOutput writes state data from an AWS response object +func (pd *resourcePipelineData) refreshFromOutput(ctx context.Context, out *awstypes.Pipeline) diag.Diagnostics { + var diags diag.Diagnostics + + if out == nil { + return diags + } + + pd.ARN = flex.StringToFramework(ctx, out.PipelineArn) + pd.ID = flex.StringToFramework(ctx, out.PipelineName) + pd.PipelineName = flex.StringToFramework(ctx, out.PipelineName) + pd.PipelineConfigurationBody = flex.StringToFramework(ctx, out.PipelineConfigurationBody) + minUnits := int64(out.MinUnits) + pd.MinUnits = flex.Int64ToFramework(ctx, &minUnits) + maxUnits := int64(out.MaxUnits) + pd.MaxUnits = flex.Int64ToFramework(ctx, &maxUnits) + pd.IngestEndpointUrls = flex.FlattenFrameworkStringValueSet(ctx, out.IngestEndpointUrls) + + bufferOptions, d := flattenBufferOptions(ctx, out.BufferOptions) + diags.Append(d...) + pd.BufferOptions = bufferOptions + + encryptionAtRestOptions, d := flattenEncryptionAtRestOptions(ctx, out.EncryptionAtRestOptions) + diags.Append(d...) + pd.EncryptionAtRestOptions = encryptionAtRestOptions + + logPublishingOptions, d := flattenLogPublishingOptions(ctx, out.LogPublishingOptions) + diags.Append(d...) + pd.LogPublishingOptions = logPublishingOptions + + setTagsOut(ctx, out.Tags) + return diags +} + +func (r *resourcePipeline) ModifyPlan(ctx context.Context, req resource.ModifyPlanRequest, resp *resource.ModifyPlanResponse) { + r.SetTagsAll(ctx, req, resp) +} + +func (r *resourcePipeline) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) { + resource.ImportStatePassthroughID(ctx, path.Root("id"), req, resp) +} + +func waitPipelineCreated(ctx context.Context, conn *osis.Client, id string, timeout time.Duration) (*awstypes.Pipeline, error) { + stateConf := &retry.StateChangeConf{ + Pending: enum.Slice(awstypes.PipelineStatusCreating, awstypes.PipelineStatusStarting), + Target: enum.Slice(awstypes.PipelineStatusActive), + Refresh: statusPipeline(ctx, conn, id), + Timeout: timeout, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, + } + + outputRaw, err := stateConf.WaitForStateContext(ctx) + if out, ok := outputRaw.(*awstypes.Pipeline); ok { + return out, err + } + + return nil, err +} + +func waitPipelineUpdated(ctx context.Context, conn *osis.Client, id string, timeout time.Duration) (*awstypes.Pipeline, error) { + stateConf := &retry.StateChangeConf{ + Pending: enum.Slice(awstypes.PipelineStatusUpdating), + Target: enum.Slice(awstypes.PipelineStatusActive), + Refresh: statusPipeline(ctx, conn, id), + Timeout: timeout, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, + } + + outputRaw, err := stateConf.WaitForStateContext(ctx) + if out, ok := outputRaw.(*awstypes.Pipeline); ok { + return out, err + } + + return nil, err +} + +func waitPipelineDeleted(ctx context.Context, conn *osis.Client, id string, timeout time.Duration) (*awstypes.Pipeline, error) { + stateConf := &retry.StateChangeConf{ + Pending: enum.Slice(awstypes.PipelineStatusDeleting), + Target: []string{}, + Refresh: statusPipeline(ctx, conn, id), + Timeout: timeout, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, + } + + outputRaw, err := stateConf.WaitForStateContext(ctx) + if out, ok := outputRaw.(*awstypes.Pipeline); ok { + return out, err + } + + return nil, err +} + +func statusPipeline(ctx context.Context, conn *osis.Client, id string) retry.StateRefreshFunc { + return func() (interface{}, string, error) { + out, err := findPipelineByID(ctx, conn, id) + if tfresource.NotFound(err) { + return nil, "", nil + } + + if err != nil { + return nil, "", err + } + + return out, string(out.Status), nil + } +} + +func findPipelineByID(ctx context.Context, conn *osis.Client, id string) (*awstypes.Pipeline, error) { + in := &osis.GetPipelineInput{ + PipelineName: aws.String(id), + } + + out, err := conn.GetPipeline(ctx, in) + if err != nil { + var nfe *awstypes.ResourceNotFoundException + if errors.As(err, &nfe) { + return nil, &retry.NotFoundError{ + LastError: err, + LastRequest: in, + } + } + + return nil, err + } + + if out == nil || out.Pipeline == nil { + return nil, tfresource.NewEmptyResultError(in) + } + + return out.Pipeline, nil +} + +func flattenBufferOptions(ctx context.Context, apiObject *awstypes.BufferOptions) (types.List, diag.Diagnostics) { + var diags diag.Diagnostics + elemType := types.ObjectType{AttrTypes: bufferOptionsAttrTypes} + + if apiObject == nil { + return types.ListValueMust(elemType, []attr.Value{}), diags + } + + obj := map[string]attr.Value{ + "persistent_buffer_enabled": flex.BoolToFramework(ctx, apiObject.PersistentBufferEnabled), + } + objVal, d := types.ObjectValue(bufferOptionsAttrTypes, obj) + diags.Append(d...) + + listVal, d := types.ListValue(elemType, []attr.Value{objVal}) + diags.Append(d...) + + return listVal, diags +} + +//func flattenEncryptionAtRestOptions(ctx context.Context, apiObject *awstypes.EncryptionAtRestOptions) (types.List, diag.Diagnostics) { +// var diags diag.Diagnostics +// elemType := fwtypes.NewObjectTypeOf[encryptionAtRestOptionsData](ctx).ObjectType +// +// if apiObject == nil { +// return types.ListValueMust(elemType, []attr.Value{}), diags +// } +// +// values := make([]attr.Value, len(apiObjects)) +// for i, o := range apiObjects { +// values[i] = flattenMonitorData(ctx, o).value(ctx) +// } +// +// objVal := &encryptionAtRestOptionsData{ +// KmsKeyArn: flex.StringToFrameworkARN(ctx, apiObject.KmsKeyArn), +// } +// +// obj := map[string]attr.Value{ +// "kms_key_arn": flex.StringToFrameworkARN(ctx, apiObject.KmsKeyArn), +// } +// //objVal, d := types.ObjectValue(encryptionAtRestOptionsAttrTypes, obj) +// //diags.Append(d...) +// +// listVal, d := types.ListValue(elemType, []attr.Value{objVal}) +// diags.Append(d...) +// +// return listVal, diags +//} + +func flattenEncryptionAtRestOptions(ctx context.Context, apiObject *awstypes.EncryptionAtRestOptions) (types.List, diag.Diagnostics) { + var diags diag.Diagnostics + elemType := types.ObjectType{AttrTypes: encryptionAtRestOptionsAttrTypes} + + if apiObject == nil { + return types.ListValueMust(elemType, []attr.Value{}), diags + } + + obj := map[string]attr.Value{ + //"kms_key_arn": flex.StringToFrameworkARN(ctx, apiObject.KmsKeyArn), + "kms_key_arn": flex.StringToFramework(ctx, apiObject.KmsKeyArn), + } + objVal, d := types.ObjectValue(encryptionAtRestOptionsAttrTypes, obj) + diags.Append(d...) + + listVal, d := types.ListValue(elemType, []attr.Value{objVal}) + diags.Append(d...) + + return listVal, diags +} + +func flattenLogPublishingOptions(ctx context.Context, apiObject *awstypes.LogPublishingOptions) (types.List, diag.Diagnostics) { + var diags diag.Diagnostics + elemType := types.ObjectType{AttrTypes: logPublishingOptionsAttrTypes} + + if apiObject == nil { + return types.ListValueMust(elemType, []attr.Value{}), diags + } + + cloudWatchLogDestination, d := flattenCloudWatchLogDestination(ctx, apiObject.CloudWatchLogDestination) + diags.Append(d...) + + obj := map[string]attr.Value{ + "is_logging_enabled": flex.BoolToFramework(ctx, apiObject.IsLoggingEnabled), + "cloudwatch_log_destination": cloudWatchLogDestination, + } + objVal, d := types.ObjectValue(logPublishingOptionsAttrTypes, obj) + diags.Append(d...) + + listVal, d := types.ListValue(elemType, []attr.Value{objVal}) + diags.Append(d...) + + return listVal, diags +} + +func flattenCloudWatchLogDestination(ctx context.Context, apiObject *awstypes.CloudWatchLogDestination) (types.List, diag.Diagnostics) { + var diags diag.Diagnostics + elemType := types.ObjectType{AttrTypes: cloudWatchLogDestinationAttrTypes} + + if apiObject == nil { + return types.ListValueMust(elemType, []attr.Value{}), diags + } + + obj := map[string]attr.Value{ + "log_group": flex.StringToFramework(ctx, apiObject.LogGroup), + } + objVal, d := types.ObjectValue(cloudWatchLogDestinationAttrTypes, obj) + diags.Append(d...) + + listVal, d := types.ListValue(elemType, []attr.Value{objVal}) + diags.Append(d...) + + return listVal, diags +} + +func expandBufferOptions(tfList []bufferOptionsData) *awstypes.BufferOptions { + if len(tfList) == 0 { + return nil + } + bo := tfList[0] + return &awstypes.BufferOptions{ + PersistentBufferEnabled: aws.Bool(bo.PersistentBufferEnabled.ValueBool()), + } +} + +func expandEncryptionAtRestOptions(tfList []encryptionAtRestOptionsData) *awstypes.EncryptionAtRestOptions { + if len(tfList) == 0 { + return nil + } + earo := tfList[0] + return &awstypes.EncryptionAtRestOptions{ + KmsKeyArn: aws.String(earo.KmsKeyArn.ValueString()), + } +} + +func expandLogPublishingOptions(ctx context.Context, tfList []logPublishingOptionsData) (*awstypes.LogPublishingOptions, diag.Diagnostics) { + var diags diag.Diagnostics + + if len(tfList) == 0 { + return nil, diags + } + + lpo := tfList[0] + apiObject := &awstypes.LogPublishingOptions{} + if !lpo.IsLoggingEnabled.IsNull() { + apiObject.IsLoggingEnabled = aws.Bool(lpo.IsLoggingEnabled.ValueBool()) + } + + if !lpo.CloudWatchLogDestination.IsNull() { + var cloudWatchLogDestination []cloudWatchLogDestinationData + diags.Append(lpo.CloudWatchLogDestination.ElementsAs(ctx, &cloudWatchLogDestination, false)...) + apiObject.CloudWatchLogDestination = expandCloudWatchLogDestination(cloudWatchLogDestination) + } + + return apiObject, diags +} + +func expandCloudWatchLogDestination(tfList []cloudWatchLogDestinationData) *awstypes.CloudWatchLogDestination { + if len(tfList) == 0 { + return nil + } + cwld := tfList[0] + return &awstypes.CloudWatchLogDestination{ + LogGroup: aws.String(cwld.LogGroup.ValueString()), + } +} + +func expandVpcOptions(ctx context.Context, tfList []vpcOptionsData) *awstypes.VpcOptions { + if len(tfList) == 0 { + return nil + } + vo := tfList[0] + apiObject := &awstypes.VpcOptions{ + SubnetIds: flex.ExpandFrameworkStringValueSet(ctx, vo.SubnetIds), + } + + if !vo.SecurityGroupIds.IsNull() { + apiObject.SecurityGroupIds = flex.ExpandFrameworkStringValueSet(ctx, vo.SecurityGroupIds) + } + + return apiObject +} + +type resourcePipelineData struct { + ARN types.String `tfsdk:"arn"` + BufferOptions types.List `tfsdk:"buffer_options"` + EncryptionAtRestOptions types.List `tfsdk:"encryption_at_rest_options"` + ID types.String `tfsdk:"id"` + IngestEndpointUrls types.Set `tfsdk:"ingest_endpoint_urls"` + LogPublishingOptions types.List `tfsdk:"log_publishing_options"` + MaxUnits types.Int64 `tfsdk:"max_units"` + MinUnits types.Int64 `tfsdk:"min_units"` + PipelineConfigurationBody types.String `tfsdk:"pipeline_configuration_body"` + PipelineName types.String `tfsdk:"pipeline_name"` + Tags types.Map `tfsdk:"tags"` + TagsAll types.Map `tfsdk:"tags_all"` + Timeouts timeouts.Value `tfsdk:"timeouts"` + VpcOptions types.List `tfsdk:"vpc_options"` +} + +type bufferOptionsData struct { + PersistentBufferEnabled types.Bool `tfsdk:"persistent_buffer_enabled"` +} + +type encryptionAtRestOptionsData struct { + KmsKeyArn fwtypes.ARN `tfsdk:"kms_key_arn"` +} + +type logPublishingOptionsData struct { + CloudWatchLogDestination types.List `tfsdk:"cloudwatch_log_destination"` + IsLoggingEnabled types.Bool `tfsdk:"is_logging_enabled"` +} + +type cloudWatchLogDestinationData struct { + LogGroup types.String `tfsdk:"log_group"` +} + +type vpcOptionsData struct { + SecurityGroupIds types.Set `tfsdk:"security_group_ids"` + SubnetIds types.Set `tfsdk:"subnet_ids"` +} + +var ( + bufferOptionsAttrTypes = map[string]attr.Type{ + "persistent_buffer_enabled": types.BoolType, + } + + encryptionAtRestOptionsAttrTypes = map[string]attr.Type{ + "kms_key_arn": types.StringType, + } + + logPublishingOptionsAttrTypes = map[string]attr.Type{ + "cloudwatch_log_destination": types.ListType{ElemType: types.ObjectType{AttrTypes: cloudWatchLogDestinationAttrTypes}}, + "is_logging_enabled": types.BoolType, + } + + cloudWatchLogDestinationAttrTypes = map[string]attr.Type{ + "log_group": types.StringType, + } +) From 2450db3951c7fb8441605bc2734b068133ac1e16 Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 01:34:37 +0800 Subject: [PATCH 02/21] Add osis exports --- internal/service/osis/exports_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 internal/service/osis/exports_test.go diff --git a/internal/service/osis/exports_test.go b/internal/service/osis/exports_test.go new file mode 100644 index 000000000000..29af5bf1238b --- /dev/null +++ b/internal/service/osis/exports_test.go @@ -0,0 +1,10 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package osis + +// Exports for use in tests only. +var ( + ResourcePipeline = newResourcePipeline + FindPipelineByID = findPipelineByID +) From 1d7c5cfbe40e6fcefce522b70bb258e9d5d2fb3f Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 01:35:32 +0800 Subject: [PATCH 03/21] Update osis gen --- internal/service/osis/generate.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/service/osis/generate.go b/internal/service/osis/generate.go index c2281bfd620c..ae709ae0ffc8 100644 --- a/internal/service/osis/generate.go +++ b/internal/service/osis/generate.go @@ -1,6 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 +//go:generate go run ../../generate/tags/main.go -AWSSDKVersion=2 -ListTags -ServiceTagsSlice -UpdateTags //go:generate go run ../../generate/servicepackage/main.go // ONLY generate directives and package declaration! Do not add anything else to this file. From 0304c48ec7f6d8ee8ec08f32a088965cbadc55d7 Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 01:35:52 +0800 Subject: [PATCH 04/21] Update osis svc gen --- internal/service/osis/service_package_gen.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/service/osis/service_package_gen.go b/internal/service/osis/service_package_gen.go index 75a8c07868f8..a8888e878a99 100644 --- a/internal/service/osis/service_package_gen.go +++ b/internal/service/osis/service_package_gen.go @@ -19,7 +19,15 @@ func (p *servicePackage) FrameworkDataSources(ctx context.Context) []*types.Serv } func (p *servicePackage) FrameworkResources(ctx context.Context) []*types.ServicePackageFrameworkResource { - return []*types.ServicePackageFrameworkResource{} + return []*types.ServicePackageFrameworkResource{ + { + Factory: newResourcePipeline, + Name: "Pipeline", + Tags: &types.ServicePackageResourceTags{ + IdentifierAttribute: "arn", + }, + }, + } } func (p *servicePackage) SDKDataSources(ctx context.Context) []*types.ServicePackageSDKDataSource { From 8341826625ac726422881a66537613293dd0ac74 Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 01:36:18 +0800 Subject: [PATCH 05/21] Add osis to names --- names/names.go | 1 + 1 file changed, 1 insertion(+) diff --git a/names/names.go b/names/names.go index a3010eb67f97..bc731c7de1b2 100644 --- a/names/names.go +++ b/names/names.go @@ -74,6 +74,7 @@ const ( MQEndpointID = "mq" ObservabilityAccessManagerEndpointID = "oam" OpenSearchServerlessEndpointID = "aoss" + OpenSearchIngestionEndpointID = "osis" PipesEndpointID = "pipes" PollyEndpointID = "polly" PricingEndpointID = "pricing" From cc86df1fd5c9efe90967f86ae4028952dd76eb1e Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 01:36:39 +0800 Subject: [PATCH 06/21] Add osis tags gen --- internal/service/osis/tags_gen.go | 146 ++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 internal/service/osis/tags_gen.go diff --git a/internal/service/osis/tags_gen.go b/internal/service/osis/tags_gen.go new file mode 100644 index 000000000000..15e22819585c --- /dev/null +++ b/internal/service/osis/tags_gen.go @@ -0,0 +1,146 @@ +// Code generated by internal/generate/tags/main.go; DO NOT EDIT. +package osis + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/osis" + awstypes "github.com/aws/aws-sdk-go-v2/service/osis/types" + "github.com/hashicorp/terraform-plugin-log/tflog" + "github.com/hashicorp/terraform-provider-aws/internal/conns" + "github.com/hashicorp/terraform-provider-aws/internal/logging" + tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" + "github.com/hashicorp/terraform-provider-aws/internal/types/option" + "github.com/hashicorp/terraform-provider-aws/names" +) + +// listTags lists osis service tags. +// The identifier is typically the Amazon Resource Name (ARN), although +// it may also be a different identifier depending on the service. +func listTags(ctx context.Context, conn *osis.Client, identifier string, optFns ...func(*osis.Options)) (tftags.KeyValueTags, error) { + input := &osis.ListTagsForResourceInput{ + Arn: aws.String(identifier), + } + + output, err := conn.ListTagsForResource(ctx, input, optFns...) + + if err != nil { + return tftags.New(ctx, nil), err + } + + return KeyValueTags(ctx, output.Tags), nil +} + +// ListTags lists osis service tags and set them in Context. +// It is called from outside this package. +func (p *servicePackage) ListTags(ctx context.Context, meta any, identifier string) error { + tags, err := listTags(ctx, meta.(*conns.AWSClient).OpenSearchIngestionClient(ctx), identifier) + + if err != nil { + return err + } + + if inContext, ok := tftags.FromContext(ctx); ok { + inContext.TagsOut = option.Some(tags) + } + + return nil +} + +// []*SERVICE.Tag handling + +// Tags returns osis service tags. +func Tags(tags tftags.KeyValueTags) []awstypes.Tag { + result := make([]awstypes.Tag, 0, len(tags)) + + for k, v := range tags.Map() { + tag := awstypes.Tag{ + Key: aws.String(k), + Value: aws.String(v), + } + + result = append(result, tag) + } + + return result +} + +// KeyValueTags creates tftags.KeyValueTags from osis service tags. +func KeyValueTags(ctx context.Context, tags []awstypes.Tag) tftags.KeyValueTags { + m := make(map[string]*string, len(tags)) + + for _, tag := range tags { + m[aws.ToString(tag.Key)] = tag.Value + } + + return tftags.New(ctx, m) +} + +// getTagsIn returns osis service tags from Context. +// nil is returned if there are no input tags. +func getTagsIn(ctx context.Context) []awstypes.Tag { + if inContext, ok := tftags.FromContext(ctx); ok { + if tags := Tags(inContext.TagsIn.UnwrapOrDefault()); len(tags) > 0 { + return tags + } + } + + return nil +} + +// setTagsOut sets osis service tags in Context. +func setTagsOut(ctx context.Context, tags []awstypes.Tag) { + if inContext, ok := tftags.FromContext(ctx); ok { + inContext.TagsOut = option.Some(KeyValueTags(ctx, tags)) + } +} + +// updateTags updates osis service tags. +// The identifier is typically the Amazon Resource Name (ARN), although +// it may also be a different identifier depending on the service. +func updateTags(ctx context.Context, conn *osis.Client, identifier string, oldTagsMap, newTagsMap any, optFns ...func(*osis.Options)) error { + oldTags := tftags.New(ctx, oldTagsMap) + newTags := tftags.New(ctx, newTagsMap) + + ctx = tflog.SetField(ctx, logging.KeyResourceId, identifier) + + removedTags := oldTags.Removed(newTags) + removedTags = removedTags.IgnoreSystem(names.OpenSearchIngestion) + if len(removedTags) > 0 { + input := &osis.UntagResourceInput{ + Arn: aws.String(identifier), + TagKeys: removedTags.Keys(), + } + + _, err := conn.UntagResource(ctx, input, optFns...) + + if err != nil { + return fmt.Errorf("untagging resource (%s): %w", identifier, err) + } + } + + updatedTags := oldTags.Updated(newTags) + updatedTags = updatedTags.IgnoreSystem(names.OpenSearchIngestion) + if len(updatedTags) > 0 { + input := &osis.TagResourceInput{ + Arn: aws.String(identifier), + Tags: Tags(updatedTags), + } + + _, err := conn.TagResource(ctx, input, optFns...) + + if err != nil { + return fmt.Errorf("tagging resource (%s): %w", identifier, err) + } + } + + return nil +} + +// UpdateTags updates osis service tags. +// It is called from outside this package. +func (p *servicePackage) UpdateTags(ctx context.Context, meta any, identifier string, oldTags, newTags any) error { + return updateTags(ctx, meta.(*conns.AWSClient).OpenSearchIngestionClient(ctx), identifier, oldTags, newTags) +} From ad6c24f71bf26cb75b9af91813c73d2db3cba777 Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 01:36:57 +0800 Subject: [PATCH 07/21] Add osis docs --- website/docs/r/osis_pipeline.html.markdown | 141 +++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 website/docs/r/osis_pipeline.html.markdown diff --git a/website/docs/r/osis_pipeline.html.markdown b/website/docs/r/osis_pipeline.html.markdown new file mode 100644 index 000000000000..d40b8b29070c --- /dev/null +++ b/website/docs/r/osis_pipeline.html.markdown @@ -0,0 +1,141 @@ +--- +subcategory: "OpenSearch Ingestion" +layout: "aws" +page_title: "AWS: aws_osis_pipeline" +description: |- + Terraform resource for managing an AWS OpenSearch Ingestion Pipeline. +--- + +# Resource: aws_osis_pipeline + +Terraform resource for managing an AWS OpenSearch Ingestion Pipeline. + +## Example Usage + +### Basic Usage + +```terraform +data "aws_region" "current" {} + +resource "aws_iam_role" "example" { + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "example" { + pipeline_name = "example" + pipeline_configuration_body = <<-EOT + version: "2" + example-pipeline: + source: + http: + path: "/example" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.example.arn}" + region: "${data.aws_region.current.name}" + bucket: "example" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 +} +``` + +### Using file function + +```terraform +resource "aws_osis_pipeline" "example" { + pipeline_name = "example" + pipeline_configuration_body = file("example.yaml") + max_units = 1 + min_units = 1 +} +``` + +## Argument Reference + +The following arguments are required: + +* `max_units` - (Required) The maximum pipeline capacity, in Ingestion Compute Units (ICUs). +* `min_units` - (Required) The minimum pipeline capacity, in Ingestion Compute Units (ICUs). +* `pipeline_configuration_body` - (Required) The pipeline configuration in YAML format. This argument accepts the pipeline configuration as a string or within a .yaml file. If you provide the configuration as a string, each new line must be escaped with \n. +* `pipeline_name` - (Required) The name of the OpenSearch Ingestion pipeline to create. Pipeline names are unique across the pipelines owned by an account within an AWS Region. + +The following arguments are optional: + +* `buffer_options` - (Optional) Key-value pairs to configure persistent buffering for the pipeline. See [`buffer_options`](#buffer_options) below. +* `encryption_at_rest_options` - (Optional) Key-value pairs to configure encryption for data that is written to a persistent buffer. See [`encryption_at_rest_options`](#encryption_at_rest_options) below. +* `log_publishing_options` - (Optional) Key-value pairs to configure log publishing. See [`log_publishing_options`](#log_publishing_options) below. +* `tags` - (Optional) A map of tags to assign to the pipeline. If configured with a provider [`default_tags` configuration block](https://registry.terraform.io/providers/hashicorp/aws/latest/docs#default_tags-configuration-block) present, tags with matching keys will overwrite those defined at the provider-level. +* `vpc_options` - (Optional) Container for the values required to configure VPC access for the pipeline. If you don't specify these values, OpenSearch Ingestion creates the pipeline with a public endpoint. See [`vpc_options`](#vpc_options) below. + +### buffer_options + +* `persistent_buffer_enabled` - (Required) Whether persistent buffering should be enabled. + +### encryption_at_rest_options + +* `kms_key_arn` - (Required) The ARN of the KMS key used to encrypt data-at-rest in OpenSearch Ingestion. By default, data is encrypted using an AWS owned key. + +### log_publishing_options + +* `cloudwatch_log_destination` - (Optional) The destination for OpenSearch Ingestion logs sent to Amazon CloudWatch Logs. This parameter is required if IsLoggingEnabled is set to true. See [`cloudwatch_log_destination`](#cloudwatch_log_destination) below. +* `is_logging_enabled` - (Optional) Whether logs should be published. + +### cloudwatch_log_destination + +* `log_group` - (Required) The name of the CloudWatch Logs group to send pipeline logs to. You can specify an existing log group or create a new one. For example, /aws/OpenSearchService/IngestionService/my-pipeline. + +### vpc_options + +* `subnet_ids` - (Required) A list of subnet IDs associated with the VPC endpoint. +* `security_group_ids` - (Optional) A list of security groups associated with the VPC endpoint. + +## Attribute Reference + +This resource exports the following attributes in addition to the arguments above: + +* `arn` - Amazon Resource Name (ARN) of the pipeline. +* `id` - Unique identifier for the pipeline. +* `ingest_endpoint_urls` - The list of ingestion endpoints for the pipeline, which you can send data to. + +## Timeouts + +[Configuration options](https://developer.hashicorp.com/terraform/language/resources/syntax#operation-timeouts): + +* `create` - (Default `30m`) +* `update` - (Default `30m`) +* `delete` - (Default `30m`) + +## Import + +In Terraform v1.5.0 and later, use an [`import` block](https://developer.hashicorp.com/terraform/language/import) to import OpenSearch Ingestion Pipeline using the `id`. For example: + +```terraform +import { + to = aws_osis_pipeline.example + id = "example" +} +``` + +Using `terraform import`, import OpenSearch Ingestion Pipeline using the `id`. For example: + +```console +% terraform import aws_osis_pipeline.example example +``` From 993dfa7e28ae594a1fae76f672aadc69f4b70a92 Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 01:38:19 +0800 Subject: [PATCH 08/21] Add osis acc tests --- internal/service/osis/pipeline_test.go | 743 +++++++++++++++++++++++++ 1 file changed, 743 insertions(+) create mode 100644 internal/service/osis/pipeline_test.go diff --git a/internal/service/osis/pipeline_test.go b/internal/service/osis/pipeline_test.go new file mode 100644 index 000000000000..f833c5ac937a --- /dev/null +++ b/internal/service/osis/pipeline_test.go @@ -0,0 +1,743 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package osis_test + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/YakDriver/regexache" + "github.com/aws/aws-sdk-go-v2/service/osis" + "github.com/aws/aws-sdk-go-v2/service/osis/types" + sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/hashicorp/terraform-plugin-testing/terraform" + "github.com/hashicorp/terraform-provider-aws/internal/acctest" + "github.com/hashicorp/terraform-provider-aws/internal/conns" + "github.com/hashicorp/terraform-provider-aws/internal/create" + tfosis "github.com/hashicorp/terraform-provider-aws/internal/service/osis" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" + "github.com/hashicorp/terraform-provider-aws/names" +) + +func TestAccOpenSearchIngestionPipeline_basic(t *testing.T) { + ctx := acctest.Context(t) + + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_basic(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "min_units", "1"), + resource.TestCheckResourceAttr(resourceName, "max_units", "1"), + resource.TestCheckResourceAttrSet(resourceName, "pipeline_configuration_body"), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "ingest_endpoint_urls.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "pipeline_name", "aws_osis_pipeline.test", "id"), + acctest.MatchResourceAttrRegionalARN(resourceName, "arn", "osis", regexache.MustCompile(`pipeline/.+$`)), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_disappears(t *testing.T) { + ctx := acctest.Context(t) + + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_basic(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + acctest.CheckFrameworkResourceDisappears(ctx, acctest.Provider, tfosis.ResourcePipeline, resourceName), + ), + ExpectNonEmptyPlan: true, + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_buffer(t *testing.T) { + ctx := acctest.Context(t) + + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_bufferOptions(rName, true), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "buffer_options.#", "1"), + resource.TestCheckResourceAttr(resourceName, "buffer_options.0.persistent_buffer_enabled", "true"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccPipelineConfig_bufferOptions(rName, false), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "buffer_options.#", "1"), + resource.TestCheckResourceAttr(resourceName, "buffer_options.0.persistent_buffer_enabled", "false"), + ), + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_encryption(t *testing.T) { + ctx := acctest.Context(t) + + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_encryption(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "encryption_at_rest_options.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "encryption_at_rest_options.0.kms_key_arn"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_logPublishing(t *testing.T) { + ctx := acctest.Context(t) + + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_logPublishing(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "log_publishing_options.#", "1"), + resource.TestCheckResourceAttr(resourceName, "log_publishing_options.0.is_logging_enabled", "true"), + resource.TestCheckResourceAttr(resourceName, "log_publishing_options.0.cloudwatch_log_destination.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "log_publishing_options.0.cloudwatch_log_destination.0.log_group"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_vpc(t *testing.T) { + ctx := acctest.Context(t) + + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_vpc(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "vpc_options.#", "1"), + resource.TestCheckResourceAttr(resourceName, "vpc_options.0.security_group_ids.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "vpc_options.0.security_group_ids.0"), + resource.TestCheckResourceAttr(resourceName, "vpc_options.0.subnet_ids.#", "2"), + resource.TestCheckResourceAttrSet(resourceName, "vpc_options.0.subnet_ids.0"), + resource.TestCheckResourceAttrSet(resourceName, "vpc_options.0.subnet_ids.1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_tags(t *testing.T) { + ctx := acctest.Context(t) + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_tags1(rName, "key1", "value1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"), + ), + }, + { + Config: testAccPipelineConfig_tags2(rName, "key1", "value1", "key2", "value2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "tags.%", "2"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + { + Config: testAccPipelineConfig_tags1(rName, "key2", "value2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + }, + }) +} + +func testAccCheckPipelineDestroy(ctx context.Context) resource.TestCheckFunc { + return func(s *terraform.State) error { + conn := acctest.Provider.Meta().(*conns.AWSClient).OpenSearchIngestionClient(ctx) + + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_osis_pipeline" { + continue + } + + _, err := tfosis.FindPipelineByID(ctx, conn, rs.Primary.ID) + + if tfresource.NotFound(err) { + continue + } + + if err != nil { + return create.Error(names.OpenSearchIngestion, create.ErrActionCheckingDestroyed, tfosis.ResNamePipeline, rs.Primary.ID, err) + } + + return create.Error(names.OpenSearchIngestion, create.ErrActionCheckingDestroyed, tfosis.ResNamePipeline, rs.Primary.ID, errors.New("not destroyed")) + } + + return nil + } +} + +func testAccCheckPipelineExists(ctx context.Context, name string, pipeline *types.Pipeline) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[name] + if !ok { + return create.Error(names.OpenSearchIngestion, create.ErrActionCheckingExistence, tfosis.ResNamePipeline, name, errors.New("not found")) + } + + if rs.Primary.ID == "" { + return create.Error(names.OpenSearchIngestion, create.ErrActionCheckingExistence, tfosis.ResNamePipeline, name, errors.New("not set")) + } + + conn := acctest.Provider.Meta().(*conns.AWSClient).OpenSearchIngestionClient(ctx) + resp, err := tfosis.FindPipelineByID(ctx, conn, rs.Primary.ID) + + if err != nil { + return create.Error(names.OpenSearchIngestion, create.ErrActionCheckingExistence, tfosis.ResNamePipeline, rs.Primary.ID, err) + } + + *pipeline = *resp + + return nil + } +} + +func testAccPreCheck(ctx context.Context, t *testing.T) { + conn := acctest.Provider.Meta().(*conns.AWSClient).OpenSearchIngestionClient(ctx) + + input := &osis.ListPipelinesInput{} + _, err := conn.ListPipelines(ctx, input) + + if acctest.PreCheckSkipError(err) { + t.Skipf("skipping acceptance testing: %s", err) + } + if err != nil { + t.Fatalf("unexpected PreCheck error: %s", err) + } +} + +func testAccPipelineConfig_basic(rName string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 +} +`, rName) +} + +func testAccPipelineConfig_tags1(rName string, key1, value1 string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 + + tags = { + %[2]q = %[3]q + } +} +`, rName, key1, value1) +} + +func testAccPipelineConfig_tags2(rName string, key1, value1, key2, value2 string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 + + tags = { + %[2]q = %[3]q + %[4]q = %[5]q + } +} +`, rName, key1, value1, key2, value2) +} + +func testAccPipelineConfig_bufferOptions(rName string, bufferEnabled bool) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 2 + min_units = 2 + + buffer_options { + persistent_buffer_enabled = %[2]t + } +} +`, rName, bufferEnabled) +} + +func testAccPipelineConfig_encryption(rName string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 + + encryption_at_rest_options { + kms_key_arn = aws_kms_key.test.arn + } +} + +resource "aws_kms_key" "test" { + description = %[1]q + deletion_window_in_days = 7 +} +`, rName) +} + +func testAccPipelineConfig_logPublishing(rName string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_cloudwatch_log_group" "test" { + name = "/aws/vendedlogs/OpenSearchIngestion/example-pipeline/test-logs" +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 + + log_publishing_options { + is_logging_enabled = true + cloudwatch_log_destination { + log_group = aws_cloudwatch_log_group.test.name + } + + } +} +`, rName) +} + +func testAccPipelineConfig_vpc(rName string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_vpc" "test" { + cidr_block = "10.0.0.0/16" +} + +resource "aws_subnet" "test1" { + cidr_block = "10.0.1.0/24" + vpc_id = aws_vpc.test.id +} + +resource "aws_subnet" "test2" { + cidr_block = "10.0.2.0/24" + vpc_id = aws_vpc.test.id +} + +resource "aws_security_group" "test" { + name = %[1]q + vpc_id = aws_vpc.test.id +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 + + vpc_options { + security_group_ids = [aws_security_group.test.id] + subnet_ids = [aws_subnet.test1.id, aws_subnet.test2.id] + } +} +`, rName) +} From a1220c5383248e5b87782d58544b513370bdef7b Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 01:44:32 +0800 Subject: [PATCH 09/21] Update osis acc test --- internal/service/osis/pipeline_test.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/internal/service/osis/pipeline_test.go b/internal/service/osis/pipeline_test.go index f833c5ac937a..f55468ca5e01 100644 --- a/internal/service/osis/pipeline_test.go +++ b/internal/service/osis/pipeline_test.go @@ -232,9 +232,8 @@ func TestAccOpenSearchIngestionPipeline_vpc(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "vpc_options.#", "1"), resource.TestCheckResourceAttr(resourceName, "vpc_options.0.security_group_ids.#", "1"), resource.TestCheckResourceAttrSet(resourceName, "vpc_options.0.security_group_ids.0"), - resource.TestCheckResourceAttr(resourceName, "vpc_options.0.subnet_ids.#", "2"), + resource.TestCheckResourceAttr(resourceName, "vpc_options.0.subnet_ids.#", "1"), resource.TestCheckResourceAttrSet(resourceName, "vpc_options.0.subnet_ids.0"), - resource.TestCheckResourceAttrSet(resourceName, "vpc_options.0.subnet_ids.1"), ), }, { @@ -697,16 +696,11 @@ resource "aws_vpc" "test" { cidr_block = "10.0.0.0/16" } -resource "aws_subnet" "test1" { +resource "aws_subnet" "test" { cidr_block = "10.0.1.0/24" vpc_id = aws_vpc.test.id } -resource "aws_subnet" "test2" { - cidr_block = "10.0.2.0/24" - vpc_id = aws_vpc.test.id -} - resource "aws_security_group" "test" { name = %[1]q vpc_id = aws_vpc.test.id @@ -736,7 +730,7 @@ resource "aws_osis_pipeline" "test" { vpc_options { security_group_ids = [aws_security_group.test.id] - subnet_ids = [aws_subnet.test1.id, aws_subnet.test2.id] + subnet_ids = [aws_subnet.test.id] } } `, rName) From 3fc3fb08b901247fa9acfc0cbfa9baa1c23ec689 Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 02:38:25 +0800 Subject: [PATCH 10/21] fix osis acc test --- internal/service/osis/pipeline_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/service/osis/pipeline_test.go b/internal/service/osis/pipeline_test.go index f55468ca5e01..cf3d0870b073 100644 --- a/internal/service/osis/pipeline_test.go +++ b/internal/service/osis/pipeline_test.go @@ -237,9 +237,10 @@ func TestAccOpenSearchIngestionPipeline_vpc(t *testing.T) { ), }, { - ResourceName: resourceName, - ImportState: true, - ImportStateVerify: true, + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"vpc_options"}, }, }, }) From cff6c1dfbec897a765c8525d595dce08b2d7bbee Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 02:38:58 +0800 Subject: [PATCH 11/21] bump timeouts to 45m --- internal/service/osis/pipeline.go | 6 +++--- website/docs/r/osis_pipeline.html.markdown | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/service/osis/pipeline.go b/internal/service/osis/pipeline.go index 65f2a45203fe..b9c53a02e1e6 100644 --- a/internal/service/osis/pipeline.go +++ b/internal/service/osis/pipeline.go @@ -42,9 +42,9 @@ import ( func newResourcePipeline(_ context.Context) (resource.ResourceWithConfigure, error) { r := &resourcePipeline{} - r.SetDefaultCreateTimeout(30 * time.Minute) - r.SetDefaultUpdateTimeout(30 * time.Minute) - r.SetDefaultDeleteTimeout(30 * time.Minute) + r.SetDefaultCreateTimeout(45 * time.Minute) + r.SetDefaultUpdateTimeout(45 * time.Minute) + r.SetDefaultDeleteTimeout(45 * time.Minute) return r, nil } diff --git a/website/docs/r/osis_pipeline.html.markdown b/website/docs/r/osis_pipeline.html.markdown index d40b8b29070c..23bd32c2366e 100644 --- a/website/docs/r/osis_pipeline.html.markdown +++ b/website/docs/r/osis_pipeline.html.markdown @@ -119,9 +119,9 @@ This resource exports the following attributes in addition to the arguments abov [Configuration options](https://developer.hashicorp.com/terraform/language/resources/syntax#operation-timeouts): -* `create` - (Default `30m`) -* `update` - (Default `30m`) -* `delete` - (Default `30m`) +* `create` - (Default `45m`) +* `update` - (Default `45m`) +* `delete` - (Default `45m`) ## Import From e18fe19e569b8d7ec29db72f72ecb658072644ef Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 02:42:57 +0800 Subject: [PATCH 12/21] add changelog entry --- .changelog/35582.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/35582.txt diff --git a/.changelog/35582.txt b/.changelog/35582.txt new file mode 100644 index 000000000000..a92c0aa7b70d --- /dev/null +++ b/.changelog/35582.txt @@ -0,0 +1,3 @@ +```release-note:new-resource +aws_osis_pipeline +``` From 5bd5f9faf985a6246a79bde23952ee9ef805fbe7 Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 02:55:45 +0800 Subject: [PATCH 13/21] fix caps semgrep --- internal/service/osis/pipeline.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/service/osis/pipeline.go b/internal/service/osis/pipeline.go index b9c53a02e1e6..ef76a76baf72 100644 --- a/internal/service/osis/pipeline.go +++ b/internal/service/osis/pipeline.go @@ -268,7 +268,7 @@ func (r *resourcePipeline) Create(ctx context.Context, req resource.CreateReques if resp.Diagnostics.HasError() { return } - in.VpcOptions = expandVpcOptions(ctx, vpcOptions) + in.VpcOptions = expandVPCOptions(ctx, vpcOptions) } // Retry for IAM eventual consistency @@ -780,7 +780,7 @@ func expandCloudWatchLogDestination(tfList []cloudWatchLogDestinationData) *awst } } -func expandVpcOptions(ctx context.Context, tfList []vpcOptionsData) *awstypes.VpcOptions { +func expandVPCOptions(ctx context.Context, tfList []vpcOptionsData) *awstypes.VpcOptions { if len(tfList) == 0 { return nil } From 905fe98bf7f5021aa59297aecb89d2d0614d8564 Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 03:12:52 +0800 Subject: [PATCH 14/21] fix osis generate --- internal/service/osis/generate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/service/osis/generate.go b/internal/service/osis/generate.go index ae709ae0ffc8..b4346f0dd945 100644 --- a/internal/service/osis/generate.go +++ b/internal/service/osis/generate.go @@ -1,7 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 -//go:generate go run ../../generate/tags/main.go -AWSSDKVersion=2 -ListTags -ServiceTagsSlice -UpdateTags +//go:generate go run ../../generate/tags/main.go -AWSSDKVersion=2 -ListTags -ServiceTagsSlice -TagInIDElem=Arn -UpdateTags //go:generate go run ../../generate/servicepackage/main.go // ONLY generate directives and package declaration! Do not add anything else to this file. From 577f494e1c19c46d50aec96e0f201501e1fb7a8e Mon Sep 17 00:00:00 2001 From: Farhan Angullia Date: Thu, 1 Feb 2024 03:19:11 +0800 Subject: [PATCH 15/21] clean up osis pipeline --- internal/service/osis/generate.go | 2 +- internal/service/osis/pipeline.go | 30 ------------------------------ 2 files changed, 1 insertion(+), 31 deletions(-) diff --git a/internal/service/osis/generate.go b/internal/service/osis/generate.go index b4346f0dd945..b88b4e3a96d1 100644 --- a/internal/service/osis/generate.go +++ b/internal/service/osis/generate.go @@ -1,7 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 -//go:generate go run ../../generate/tags/main.go -AWSSDKVersion=2 -ListTags -ServiceTagsSlice -TagInIDElem=Arn -UpdateTags +//go:generate go run ../../generate/tags/main.go -AWSSDKVersion=2 -ListTags -ServiceTagsSlice -TagInIDElem=Arn -ListTagsInIDElem=Arn -UpdateTags //go:generate go run ../../generate/servicepackage/main.go // ONLY generate directives and package declaration! Do not add anything else to this file. diff --git a/internal/service/osis/pipeline.go b/internal/service/osis/pipeline.go index ef76a76baf72..ea53c7c7073a 100644 --- a/internal/service/osis/pipeline.go +++ b/internal/service/osis/pipeline.go @@ -634,35 +634,6 @@ func flattenBufferOptions(ctx context.Context, apiObject *awstypes.BufferOptions return listVal, diags } -//func flattenEncryptionAtRestOptions(ctx context.Context, apiObject *awstypes.EncryptionAtRestOptions) (types.List, diag.Diagnostics) { -// var diags diag.Diagnostics -// elemType := fwtypes.NewObjectTypeOf[encryptionAtRestOptionsData](ctx).ObjectType -// -// if apiObject == nil { -// return types.ListValueMust(elemType, []attr.Value{}), diags -// } -// -// values := make([]attr.Value, len(apiObjects)) -// for i, o := range apiObjects { -// values[i] = flattenMonitorData(ctx, o).value(ctx) -// } -// -// objVal := &encryptionAtRestOptionsData{ -// KmsKeyArn: flex.StringToFrameworkARN(ctx, apiObject.KmsKeyArn), -// } -// -// obj := map[string]attr.Value{ -// "kms_key_arn": flex.StringToFrameworkARN(ctx, apiObject.KmsKeyArn), -// } -// //objVal, d := types.ObjectValue(encryptionAtRestOptionsAttrTypes, obj) -// //diags.Append(d...) -// -// listVal, d := types.ListValue(elemType, []attr.Value{objVal}) -// diags.Append(d...) -// -// return listVal, diags -//} - func flattenEncryptionAtRestOptions(ctx context.Context, apiObject *awstypes.EncryptionAtRestOptions) (types.List, diag.Diagnostics) { var diags diag.Diagnostics elemType := types.ObjectType{AttrTypes: encryptionAtRestOptionsAttrTypes} @@ -672,7 +643,6 @@ func flattenEncryptionAtRestOptions(ctx context.Context, apiObject *awstypes.Enc } obj := map[string]attr.Value{ - //"kms_key_arn": flex.StringToFrameworkARN(ctx, apiObject.KmsKeyArn), "kms_key_arn": flex.StringToFramework(ctx, apiObject.KmsKeyArn), } objVal, d := types.ObjectValue(encryptionAtRestOptionsAttrTypes, obj) From a6ba79f5b58be02f88641987e827d7490476492a Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 7 Feb 2024 17:41:17 -0500 Subject: [PATCH 16/21] r/aws_osis_pipeline: Use AutoFlEx. --- internal/service/osis/consts.go | 12 + internal/service/osis/exports_test.go | 5 +- internal/service/osis/pipeline.go | 704 ++++++------------- internal/service/osis/pipeline_test.go | 29 +- internal/service/osis/service_package_gen.go | 2 +- website/docs/r/osis_pipeline.html.markdown | 2 +- 6 files changed, 237 insertions(+), 517 deletions(-) create mode 100644 internal/service/osis/consts.go diff --git a/internal/service/osis/consts.go b/internal/service/osis/consts.go new file mode 100644 index 000000000000..6ddec039f747 --- /dev/null +++ b/internal/service/osis/consts.go @@ -0,0 +1,12 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package osis + +import ( + "time" +) + +const ( + propagationTimeout = 2 * time.Minute +) diff --git a/internal/service/osis/exports_test.go b/internal/service/osis/exports_test.go index 29af5bf1238b..2e3060b848df 100644 --- a/internal/service/osis/exports_test.go +++ b/internal/service/osis/exports_test.go @@ -5,6 +5,7 @@ package osis // Exports for use in tests only. var ( - ResourcePipeline = newResourcePipeline - FindPipelineByID = findPipelineByID + ResourcePipeline = newPipelineResource + + FindPipelineByID = findPipelineByName ) diff --git a/internal/service/osis/pipeline.go b/internal/service/osis/pipeline.go index ea53c7c7073a..bd04d0c80be0 100644 --- a/internal/service/osis/pipeline.go +++ b/internal/service/osis/pipeline.go @@ -6,6 +6,7 @@ package osis import ( "context" "errors" + "fmt" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -17,8 +18,6 @@ import ( "github.com/hashicorp/terraform-plugin-framework-validators/setvalidator" "github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator" "github.com/hashicorp/terraform-plugin-framework/attr" - "github.com/hashicorp/terraform-plugin-framework/diag" - "github.com/hashicorp/terraform-plugin-framework/path" "github.com/hashicorp/terraform-plugin-framework/resource" "github.com/hashicorp/terraform-plugin-framework/resource/schema" "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" @@ -27,10 +26,12 @@ import ( "github.com/hashicorp/terraform-plugin-framework/schema/validator" "github.com/hashicorp/terraform-plugin-framework/types" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry" - "github.com/hashicorp/terraform-provider-aws/internal/create" "github.com/hashicorp/terraform-provider-aws/internal/enum" + "github.com/hashicorp/terraform-provider-aws/internal/errs" + "github.com/hashicorp/terraform-provider-aws/internal/errs/fwdiag" "github.com/hashicorp/terraform-provider-aws/internal/framework" "github.com/hashicorp/terraform-provider-aws/internal/framework/flex" + fwflex "github.com/hashicorp/terraform-provider-aws/internal/framework/flex" fwtypes "github.com/hashicorp/terraform-provider-aws/internal/framework/types" tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" "github.com/hashicorp/terraform-provider-aws/internal/tfresource" @@ -39,8 +40,8 @@ import ( // @FrameworkResource(name="Pipeline") // @Tags(identifierAttribute="arn") -func newResourcePipeline(_ context.Context) (resource.ResourceWithConfigure, error) { - r := &resourcePipeline{} +func newPipelineResource(_ context.Context) (resource.ResourceWithConfigure, error) { + r := &pipelineResource{} r.SetDefaultCreateTimeout(45 * time.Minute) r.SetDefaultUpdateTimeout(45 * time.Minute) @@ -49,26 +50,22 @@ func newResourcePipeline(_ context.Context) (resource.ResourceWithConfigure, err return r, nil } -const ( - ResNamePipeline = "Pipeline" - iamPropagationTimeout = time.Minute * 1 -) - -type resourcePipeline struct { +type pipelineResource struct { framework.ResourceWithConfigure + framework.WithImportByID framework.WithTimeouts } -func (r *resourcePipeline) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) { - resp.TypeName = "aws_osis_pipeline" +func (r *pipelineResource) Metadata(_ context.Context, request resource.MetadataRequest, response *resource.MetadataResponse) { + response.TypeName = "aws_osis_pipeline" } -func (r *resourcePipeline) Schema(ctx context.Context, req resource.SchemaRequest, resp *resource.SchemaResponse) { - resp.Schema = schema.Schema{ +func (r *pipelineResource) Schema(ctx context.Context, request resource.SchemaRequest, response *resource.SchemaResponse) { + response.Schema = schema.Schema{ Attributes: map[string]schema.Attribute{ - "arn": framework.ARNAttributeComputedOnly(), - "id": framework.IDAttribute(), + names.AttrID: framework.IDAttribute(), "ingest_endpoint_urls": schema.SetAttribute{ + CustomType: fwtypes.SetOfStringType, Computed: true, ElementType: types.StringType, PlanModifiers: []planmodifier.Set{ @@ -87,6 +84,7 @@ func (r *resourcePipeline) Schema(ctx context.Context, req resource.SchemaReques int64validator.AtLeast(1), }, }, + "pipeline_arn": framework.ARNAttributeComputedOnly(), "pipeline_configuration_body": schema.StringAttribute{ Required: true, Validators: []validator.String{ @@ -107,6 +105,7 @@ func (r *resourcePipeline) Schema(ctx context.Context, req resource.SchemaReques }, Blocks: map[string]schema.Block{ "buffer_options": schema.ListNestedBlock{ + CustomType: fwtypes.NewListNestedObjectTypeOf[bufferOptionsModel](ctx), Validators: []validator.List{ listvalidator.SizeAtMost(1), }, @@ -119,25 +118,21 @@ func (r *resourcePipeline) Schema(ctx context.Context, req resource.SchemaReques }, }, "encryption_at_rest_options": schema.ListNestedBlock{ + CustomType: fwtypes.NewListNestedObjectTypeOf[encryptionAtRestOptionsModel](ctx), Validators: []validator.List{ listvalidator.SizeAtMost(1), }, NestedObject: schema.NestedBlockObject{ Attributes: map[string]schema.Attribute{ "kms_key_arn": schema.StringAttribute{ - PlanModifiers: []planmodifier.String{ - stringplanmodifier.UseStateForUnknown(), - }, CustomType: fwtypes.ARNType, Required: true, - Validators: []validator.String{ - stringvalidator.LengthBetween(7, 2048), - }, }, }, }, }, "log_publishing_options": schema.ListNestedBlock{ + CustomType: fwtypes.NewListNestedObjectTypeOf[logPublishingOptionsModel](ctx), Validators: []validator.List{ listvalidator.SizeAtMost(1), }, @@ -149,6 +144,7 @@ func (r *resourcePipeline) Schema(ctx context.Context, req resource.SchemaReques }, Blocks: map[string]schema.Block{ "cloudwatch_log_destination": schema.ListNestedBlock{ + CustomType: fwtypes.NewListNestedObjectTypeOf[cloudWatchLogDestinationModel](ctx), Validators: []validator.List{ listvalidator.SizeAtMost(1), }, @@ -166,643 +162,365 @@ func (r *resourcePipeline) Schema(ctx context.Context, req resource.SchemaReques }, }, }, + "timeouts": timeouts.Block(ctx, timeouts.Opts{ + Create: true, + Update: true, + Delete: true, + }), "vpc_options": schema.ListNestedBlock{ + CustomType: fwtypes.NewListNestedObjectTypeOf[vpcOptionsModel](ctx), Validators: []validator.List{ listvalidator.SizeAtMost(1), }, NestedObject: schema.NestedBlockObject{ Attributes: map[string]schema.Attribute{ "security_group_ids": schema.SetAttribute{ + CustomType: fwtypes.SetOfStringType, Optional: true, ElementType: types.StringType, Validators: []validator.Set{ setvalidator.SizeBetween(1, 12), - setvalidator.ValueStringsAre( - stringvalidator.All( - stringvalidator.LengthAtLeast(11), - stringvalidator.LengthAtMost(20), - ), - ), }, }, "subnet_ids": schema.SetAttribute{ + CustomType: fwtypes.SetOfStringType, Required: true, ElementType: types.StringType, Validators: []validator.Set{ setvalidator.SizeBetween(1, 12), - setvalidator.ValueStringsAre( - stringvalidator.All( - stringvalidator.LengthAtLeast(15), - stringvalidator.LengthAtMost(24), - ), - ), }, }, }, }, }, - "timeouts": timeouts.Block(ctx, timeouts.Opts{ - Create: true, - Update: true, - Delete: true, - }), }, } } -func (r *resourcePipeline) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) { - conn := r.Meta().OpenSearchIngestionClient(ctx) - - var plan resourcePipelineData - resp.Diagnostics.Append(req.Plan.Get(ctx, &plan)...) - if resp.Diagnostics.HasError() { +func (r *pipelineResource) Create(ctx context.Context, request resource.CreateRequest, response *resource.CreateResponse) { + var data pipelineResourceModel + response.Diagnostics.Append(request.Plan.Get(ctx, &data)...) + if response.Diagnostics.HasError() { return } - in := &osis.CreatePipelineInput{ - MaxUnits: aws.Int32(int32(plan.MaxUnits.ValueInt64())), - MinUnits: aws.Int32(int32(plan.MinUnits.ValueInt64())), - PipelineConfigurationBody: aws.String(plan.PipelineConfigurationBody.ValueString()), - PipelineName: aws.String(plan.PipelineName.ValueString()), - Tags: getTagsIn(ctx), - } - - if !plan.BufferOptions.IsNull() { - var bufferOptions []bufferOptionsData - resp.Diagnostics.Append(plan.BufferOptions.ElementsAs(ctx, &bufferOptions, false)...) - if resp.Diagnostics.HasError() { - return - } + conn := r.Meta().OpenSearchIngestionClient(ctx) - in.BufferOptions = expandBufferOptions(bufferOptions) + name := data.PipelineName.ValueString() + input := &osis.CreatePipelineInput{} + response.Diagnostics.Append(fwflex.Expand(ctx, data, input)...) + if response.Diagnostics.HasError() { + return } - if !plan.EncryptionAtRestOptions.IsNull() { - var encryptionAtRestOptions []encryptionAtRestOptionsData - resp.Diagnostics.Append(plan.EncryptionAtRestOptions.ElementsAs(ctx, &encryptionAtRestOptions, false)...) - if resp.Diagnostics.HasError() { - return - } - - in.EncryptionAtRestOptions = expandEncryptionAtRestOptions(encryptionAtRestOptions) - } + // Additional fields. + input.Tags = getTagsIn(ctx) - if !plan.LogPublishingOptions.IsNull() { - var logPublishingOptions []logPublishingOptionsData - resp.Diagnostics.Append(plan.LogPublishingOptions.ElementsAs(ctx, &logPublishingOptions, false)...) - if resp.Diagnostics.HasError() { - return - } + // Retry for IAM eventual consistency. + _, err := tfresource.RetryWhenIsA[*awstypes.ValidationException](ctx, propagationTimeout, func() (interface{}, error) { + return conn.CreatePipeline(ctx, input) + }) - logPublishingOptionsInput, d := expandLogPublishingOptions(ctx, logPublishingOptions) - resp.Diagnostics.Append(d...) - if resp.Diagnostics.HasError() { - return - } - in.LogPublishingOptions = logPublishingOptionsInput - } + if err != nil { + response.Diagnostics.AddError(fmt.Sprintf("creating OpenSearch Ingestion Pipeline (%s)", name), err.Error()) - if !plan.VpcOptions.IsNull() { - var vpcOptions []vpcOptionsData - resp.Diagnostics.Append(plan.VpcOptions.ElementsAs(ctx, &vpcOptions, false)...) - if resp.Diagnostics.HasError() { - return - } - in.VpcOptions = expandVPCOptions(ctx, vpcOptions) + return } - // Retry for IAM eventual consistency - var out *osis.CreatePipelineOutput - err := tfresource.Retry(ctx, iamPropagationTimeout, func() *retry.RetryError { - var err error - out, err = conn.CreatePipeline(ctx, in) - if err != nil { - var ve *awstypes.ValidationException - if errors.As(err, &ve) { - return retry.RetryableError(err) - } - return retry.NonRetryableError(err) - } + data.setID() - return nil - }) + pipeline, err := waitPipelineCreated(ctx, conn, name, r.CreateTimeout(ctx, data.Timeouts)) if err != nil { - resp.Diagnostics.AddError( - create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionCreating, ResNamePipeline, plan.PipelineName.String(), err), - err.Error(), - ) + response.Diagnostics.AddError(fmt.Sprintf("waiting for OpenSearch Ingestion Pipeline (%s) create", name), err.Error()) + return } - if out == nil || out.Pipeline == nil { - resp.Diagnostics.AddError( - create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionCreating, ResNamePipeline, plan.PipelineName.String(), nil), - errors.New("empty output").Error(), - ) + + // Set values for unknowns. + data.IngestEndpointUrls.SetValue = fwflex.FlattenFrameworkStringValueSet(ctx, pipeline.IngestEndpointUrls) + data.PipelineARN = flex.StringToFramework(ctx, pipeline.PipelineArn) + + response.Diagnostics.Append(response.State.Set(ctx, &data)...) +} + +func (r *pipelineResource) Read(ctx context.Context, request resource.ReadRequest, response *resource.ReadResponse) { + var data pipelineResourceModel + response.Diagnostics.Append(request.State.Get(ctx, &data)...) + if response.Diagnostics.HasError() { return } - state := plan - state.ID = flex.StringToFramework(ctx, out.Pipeline.PipelineName) + if err := data.InitFromID(); err != nil { + response.Diagnostics.AddError("parsing resource ID", err.Error()) - createTimeout := r.CreateTimeout(ctx, plan.Timeouts) - waitOut, err := waitPipelineCreated(ctx, conn, aws.ToString(out.Pipeline.PipelineName), createTimeout) - if err != nil { - resp.Diagnostics.AddError( - create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionWaitingForCreation, ResNamePipeline, plan.PipelineName.String(), err), - err.Error(), - ) return } - resp.Diagnostics.Append(state.refreshFromOutput(ctx, waitOut)...) - resp.Diagnostics.Append(resp.State.Set(ctx, &state)...) -} - -func (r *resourcePipeline) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) { conn := r.Meta().OpenSearchIngestionClient(ctx) - var state resourcePipelineData - resp.Diagnostics.Append(req.State.Get(ctx, &state)...) - if resp.Diagnostics.HasError() { - return - } + name := data.PipelineName.ValueString() + pipeline, err := findPipelineByName(ctx, conn, name) - out, err := findPipelineByID(ctx, conn, state.ID.ValueString()) if tfresource.NotFound(err) { - resp.State.RemoveResource(ctx) + response.Diagnostics.Append(fwdiag.NewResourceNotFoundWarningDiagnostic(err)) + response.State.RemoveResource(ctx) + return } + if err != nil { - resp.Diagnostics.AddError( - create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionSetting, ResNamePipeline, state.ID.String(), err), - err.Error(), - ) + response.Diagnostics.AddError(fmt.Sprintf("reading OpenSearch Ingestion Pipeline (%s)", name), err.Error()) + return } - resp.Diagnostics.Append(state.refreshFromOutput(ctx, out)...) - resp.Diagnostics.Append(resp.State.Set(ctx, &state)...) -} + response.Diagnostics.Append(fwflex.Flatten(ctx, pipeline, &data)...) + if response.Diagnostics.HasError() { + return + } -func (r *resourcePipeline) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) { - conn := r.Meta().OpenSearchIngestionClient(ctx) + response.Diagnostics.Append(response.State.Set(ctx, &data)...) +} - var plan, state resourcePipelineData - resp.Diagnostics.Append(req.Plan.Get(ctx, &plan)...) - resp.Diagnostics.Append(req.State.Get(ctx, &state)...) - if resp.Diagnostics.HasError() { +func (r *pipelineResource) Update(ctx context.Context, request resource.UpdateRequest, response *resource.UpdateResponse) { + var old, new pipelineResourceModel + response.Diagnostics.Append(request.State.Get(ctx, &old)...) + if response.Diagnostics.HasError() { + return + } + response.Diagnostics.Append(request.Plan.Get(ctx, &new)...) + if response.Diagnostics.HasError() { return } - if !plan.BufferOptions.Equal(state.BufferOptions) || - !plan.EncryptionAtRestOptions.Equal(state.EncryptionAtRestOptions) || - !plan.LogPublishingOptions.Equal(state.LogPublishingOptions) || - !plan.MaxUnits.Equal(state.MaxUnits) || - !plan.MinUnits.Equal(state.MinUnits) || - !plan.PipelineConfigurationBody.Equal(state.PipelineConfigurationBody) { - in := &osis.UpdatePipelineInput{ - PipelineName: aws.String(plan.PipelineName.ValueString()), - } - - if !plan.MaxUnits.IsNull() { - in.MaxUnits = aws.Int32(int32(plan.MaxUnits.ValueInt64())) - } + conn := r.Meta().OpenSearchIngestionClient(ctx) - if !plan.MinUnits.IsNull() { - in.MinUnits = aws.Int32(int32(plan.MinUnits.ValueInt64())) + if !new.BufferOptions.Equal(old.BufferOptions) || + !new.EncryptionAtRestOptions.Equal(old.EncryptionAtRestOptions) || + !new.LogPublishingOptions.Equal(old.LogPublishingOptions) || + !new.MaxUnits.Equal(old.MaxUnits) || + !new.MinUnits.Equal(old.MinUnits) || + !new.PipelineConfigurationBody.Equal(old.PipelineConfigurationBody) { + input := &osis.UpdatePipelineInput{} + response.Diagnostics.Append(fwflex.Expand(ctx, new, input)...) + if response.Diagnostics.HasError() { + return } - if !plan.PipelineConfigurationBody.IsNull() { - in.PipelineConfigurationBody = aws.String(plan.PipelineConfigurationBody.ValueString()) - } + name := new.PipelineName.ValueString() + _, err := conn.UpdatePipeline(ctx, input) - if !plan.BufferOptions.IsNull() { - var bufferOptions []bufferOptionsData - resp.Diagnostics.Append(plan.BufferOptions.ElementsAs(ctx, &bufferOptions, false)...) - if resp.Diagnostics.HasError() { - return - } + if err != nil { + response.Diagnostics.AddError(fmt.Sprintf("updating OpenSearch Ingestion Pipeline (%s)", name), err.Error()) - in.BufferOptions = expandBufferOptions(bufferOptions) + return } - if !plan.EncryptionAtRestOptions.IsNull() { - var encryptionAtRestOptions []encryptionAtRestOptionsData - resp.Diagnostics.Append(plan.EncryptionAtRestOptions.ElementsAs(ctx, &encryptionAtRestOptions, false)...) - if resp.Diagnostics.HasError() { - return - } - in.EncryptionAtRestOptions = expandEncryptionAtRestOptions(encryptionAtRestOptions) - } - if !plan.LogPublishingOptions.IsNull() { - var logPublishingOptions []logPublishingOptionsData - resp.Diagnostics.Append(plan.LogPublishingOptions.ElementsAs(ctx, &logPublishingOptions, false)...) - if resp.Diagnostics.HasError() { - return - } - - logPublishingOptionsInput, d := expandLogPublishingOptions(ctx, logPublishingOptions) - resp.Diagnostics.Append(d...) - if resp.Diagnostics.HasError() { - return - } - in.LogPublishingOptions = logPublishingOptionsInput - } + if _, err := waitPipelineUpdated(ctx, conn, name, r.UpdateTimeout(ctx, new.Timeouts)); err != nil { + response.Diagnostics.AddError(fmt.Sprintf("waiting for OpenSearch Ingestion Pipeline (%s) update", name), err.Error()) - out, err := conn.UpdatePipeline(ctx, in) - if err != nil { - resp.Diagnostics.AddError( - create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionUpdating, ResNamePipeline, plan.ID.String(), err), - err.Error(), - ) - return - } - if out == nil || out.Pipeline == nil { - resp.Diagnostics.AddError( - create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionUpdating, ResNamePipeline, plan.ID.String(), nil), - errors.New("empty output").Error(), - ) return } } - updateTimeout := r.UpdateTimeout(ctx, plan.Timeouts) - waitOut, err := waitPipelineUpdated(ctx, conn, plan.ID.ValueString(), updateTimeout) - if err != nil { - resp.Diagnostics.AddError( - create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionWaitingForUpdate, ResNamePipeline, plan.ID.String(), err), - err.Error(), - ) - return - } - resp.Diagnostics.Append(plan.refreshFromOutput(ctx, waitOut)...) - resp.Diagnostics.Append(resp.State.Set(ctx, &plan)...) + response.Diagnostics.Append(response.State.Set(ctx, &new)...) } -func (r *resourcePipeline) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) { - conn := r.Meta().OpenSearchIngestionClient(ctx) - - var state resourcePipelineData - resp.Diagnostics.Append(req.State.Get(ctx, &state)...) - if resp.Diagnostics.HasError() { +func (r *pipelineResource) Delete(ctx context.Context, request resource.DeleteRequest, response *resource.DeleteResponse) { + var data pipelineResourceModel + response.Diagnostics.Append(request.State.Get(ctx, &data)...) + if response.Diagnostics.HasError() { return } - in := &osis.DeletePipelineInput{ - PipelineName: aws.String(state.ID.ValueString()), + conn := r.Meta().OpenSearchIngestionClient(ctx) + + name := data.PipelineName.ValueString() + input := &osis.DeletePipelineInput{ + PipelineName: aws.String(name), } - _, err := conn.DeletePipeline(ctx, in) + _, err := conn.DeletePipeline(ctx, input) - if err != nil { - var nfe *awstypes.ResourceNotFoundException - if errors.As(err, &nfe) { - return - } - resp.Diagnostics.AddError( - create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionDeleting, ResNamePipeline, state.ID.String(), err), - err.Error(), - ) + if errs.IsA[*awstypes.ResourceNotFoundException](err) { return } - deleteTimeout := r.DeleteTimeout(ctx, state.Timeouts) - _, err = waitPipelineDeleted(ctx, conn, state.ID.ValueString(), deleteTimeout) if err != nil { - resp.Diagnostics.AddError( - create.ProblemStandardMessage(names.OpenSearchIngestion, create.ErrActionWaitingForDeletion, ResNamePipeline, state.ID.String(), err), - err.Error(), - ) + response.Diagnostics.AddError(fmt.Sprintf("deleting OpenSearch Ingestion Pipeline (%s)", name), err.Error()) + + return + } + + if _, err := waitPipelineDeleted(ctx, conn, name, r.DeleteTimeout(ctx, data.Timeouts)); err != nil { + response.Diagnostics.AddError(fmt.Sprintf("waiting for OpenSearch Ingestion Pipeline (%s) delete", name), err.Error()) + return } } -// refreshFromOutput writes state data from an AWS response object -func (pd *resourcePipelineData) refreshFromOutput(ctx context.Context, out *awstypes.Pipeline) diag.Diagnostics { - var diags diag.Diagnostics +func (r *pipelineResource) ModifyPlan(ctx context.Context, request resource.ModifyPlanRequest, response *resource.ModifyPlanResponse) { + r.SetTagsAll(ctx, request, response) +} - if out == nil { - return diags +func findPipelineByName(ctx context.Context, conn *osis.Client, name string) (*awstypes.Pipeline, error) { + input := &osis.GetPipelineInput{ + PipelineName: aws.String(name), } - pd.ARN = flex.StringToFramework(ctx, out.PipelineArn) - pd.ID = flex.StringToFramework(ctx, out.PipelineName) - pd.PipelineName = flex.StringToFramework(ctx, out.PipelineName) - pd.PipelineConfigurationBody = flex.StringToFramework(ctx, out.PipelineConfigurationBody) - minUnits := int64(out.MinUnits) - pd.MinUnits = flex.Int64ToFramework(ctx, &minUnits) - maxUnits := int64(out.MaxUnits) - pd.MaxUnits = flex.Int64ToFramework(ctx, &maxUnits) - pd.IngestEndpointUrls = flex.FlattenFrameworkStringValueSet(ctx, out.IngestEndpointUrls) + output, err := conn.GetPipeline(ctx, input) - bufferOptions, d := flattenBufferOptions(ctx, out.BufferOptions) - diags.Append(d...) - pd.BufferOptions = bufferOptions + if errs.IsA[*awstypes.ResourceNotFoundException](err) { + return nil, &retry.NotFoundError{ + LastError: err, + LastRequest: input, + } + } - encryptionAtRestOptions, d := flattenEncryptionAtRestOptions(ctx, out.EncryptionAtRestOptions) - diags.Append(d...) - pd.EncryptionAtRestOptions = encryptionAtRestOptions + if err != nil { + return nil, err + } - logPublishingOptions, d := flattenLogPublishingOptions(ctx, out.LogPublishingOptions) - diags.Append(d...) - pd.LogPublishingOptions = logPublishingOptions + if output == nil || output.Pipeline == nil { + return nil, tfresource.NewEmptyResultError(input) + } - setTagsOut(ctx, out.Tags) - return diags + return output.Pipeline, nil } -func (r *resourcePipeline) ModifyPlan(ctx context.Context, req resource.ModifyPlanRequest, resp *resource.ModifyPlanResponse) { - r.SetTagsAll(ctx, req, resp) -} +func statusPipeline(ctx context.Context, conn *osis.Client, name string) retry.StateRefreshFunc { + return func() (interface{}, string, error) { + output, err := findPipelineByName(ctx, conn, name) + + if tfresource.NotFound(err) { + return nil, "", nil + } + + if err != nil { + return nil, "", err + } -func (r *resourcePipeline) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) { - resource.ImportStatePassthroughID(ctx, path.Root("id"), req, resp) + return output, string(output.Status), nil + } } -func waitPipelineCreated(ctx context.Context, conn *osis.Client, id string, timeout time.Duration) (*awstypes.Pipeline, error) { +func waitPipelineCreated(ctx context.Context, conn *osis.Client, name string, timeout time.Duration) (*awstypes.Pipeline, error) { stateConf := &retry.StateChangeConf{ Pending: enum.Slice(awstypes.PipelineStatusCreating, awstypes.PipelineStatusStarting), Target: enum.Slice(awstypes.PipelineStatusActive), - Refresh: statusPipeline(ctx, conn, id), + Refresh: statusPipeline(ctx, conn, name), Timeout: timeout, MinTimeout: 10 * time.Second, Delay: 30 * time.Second, } outputRaw, err := stateConf.WaitForStateContext(ctx) - if out, ok := outputRaw.(*awstypes.Pipeline); ok { - return out, err + + if output, ok := outputRaw.(*awstypes.Pipeline); ok { + if reason := output.StatusReason; reason != nil { + tfresource.SetLastError(err, errors.New(aws.ToString(reason.Description))) + } + + return output, err } return nil, err } -func waitPipelineUpdated(ctx context.Context, conn *osis.Client, id string, timeout time.Duration) (*awstypes.Pipeline, error) { +func waitPipelineUpdated(ctx context.Context, conn *osis.Client, name string, timeout time.Duration) (*awstypes.Pipeline, error) { stateConf := &retry.StateChangeConf{ Pending: enum.Slice(awstypes.PipelineStatusUpdating), Target: enum.Slice(awstypes.PipelineStatusActive), - Refresh: statusPipeline(ctx, conn, id), + Refresh: statusPipeline(ctx, conn, name), Timeout: timeout, MinTimeout: 10 * time.Second, Delay: 30 * time.Second, } outputRaw, err := stateConf.WaitForStateContext(ctx) - if out, ok := outputRaw.(*awstypes.Pipeline); ok { - return out, err + + if output, ok := outputRaw.(*awstypes.Pipeline); ok { + if reason := output.StatusReason; reason != nil { + tfresource.SetLastError(err, errors.New(aws.ToString(reason.Description))) + } + + return output, err } return nil, err } -func waitPipelineDeleted(ctx context.Context, conn *osis.Client, id string, timeout time.Duration) (*awstypes.Pipeline, error) { +func waitPipelineDeleted(ctx context.Context, conn *osis.Client, name string, timeout time.Duration) (*awstypes.Pipeline, error) { stateConf := &retry.StateChangeConf{ Pending: enum.Slice(awstypes.PipelineStatusDeleting), Target: []string{}, - Refresh: statusPipeline(ctx, conn, id), + Refresh: statusPipeline(ctx, conn, name), Timeout: timeout, MinTimeout: 10 * time.Second, Delay: 30 * time.Second, } outputRaw, err := stateConf.WaitForStateContext(ctx) - if out, ok := outputRaw.(*awstypes.Pipeline); ok { - return out, err - } - - return nil, err -} - -func statusPipeline(ctx context.Context, conn *osis.Client, id string) retry.StateRefreshFunc { - return func() (interface{}, string, error) { - out, err := findPipelineByID(ctx, conn, id) - if tfresource.NotFound(err) { - return nil, "", nil - } - - if err != nil { - return nil, "", err - } - - return out, string(out.Status), nil - } -} - -func findPipelineByID(ctx context.Context, conn *osis.Client, id string) (*awstypes.Pipeline, error) { - in := &osis.GetPipelineInput{ - PipelineName: aws.String(id), - } - out, err := conn.GetPipeline(ctx, in) - if err != nil { - var nfe *awstypes.ResourceNotFoundException - if errors.As(err, &nfe) { - return nil, &retry.NotFoundError{ - LastError: err, - LastRequest: in, - } + if output, ok := outputRaw.(*awstypes.Pipeline); ok { + if reason := output.StatusReason; reason != nil { + tfresource.SetLastError(err, errors.New(aws.ToString(reason.Description))) } - return nil, err - } - - if out == nil || out.Pipeline == nil { - return nil, tfresource.NewEmptyResultError(in) - } - - return out.Pipeline, nil -} - -func flattenBufferOptions(ctx context.Context, apiObject *awstypes.BufferOptions) (types.List, diag.Diagnostics) { - var diags diag.Diagnostics - elemType := types.ObjectType{AttrTypes: bufferOptionsAttrTypes} - - if apiObject == nil { - return types.ListValueMust(elemType, []attr.Value{}), diags - } - - obj := map[string]attr.Value{ - "persistent_buffer_enabled": flex.BoolToFramework(ctx, apiObject.PersistentBufferEnabled), + return output, err } - objVal, d := types.ObjectValue(bufferOptionsAttrTypes, obj) - diags.Append(d...) - - listVal, d := types.ListValue(elemType, []attr.Value{objVal}) - diags.Append(d...) - - return listVal, diags -} -func flattenEncryptionAtRestOptions(ctx context.Context, apiObject *awstypes.EncryptionAtRestOptions) (types.List, diag.Diagnostics) { - var diags diag.Diagnostics - elemType := types.ObjectType{AttrTypes: encryptionAtRestOptionsAttrTypes} - - if apiObject == nil { - return types.ListValueMust(elemType, []attr.Value{}), diags - } - - obj := map[string]attr.Value{ - "kms_key_arn": flex.StringToFramework(ctx, apiObject.KmsKeyArn), - } - objVal, d := types.ObjectValue(encryptionAtRestOptionsAttrTypes, obj) - diags.Append(d...) - - listVal, d := types.ListValue(elemType, []attr.Value{objVal}) - diags.Append(d...) - - return listVal, diags -} - -func flattenLogPublishingOptions(ctx context.Context, apiObject *awstypes.LogPublishingOptions) (types.List, diag.Diagnostics) { - var diags diag.Diagnostics - elemType := types.ObjectType{AttrTypes: logPublishingOptionsAttrTypes} - - if apiObject == nil { - return types.ListValueMust(elemType, []attr.Value{}), diags - } - - cloudWatchLogDestination, d := flattenCloudWatchLogDestination(ctx, apiObject.CloudWatchLogDestination) - diags.Append(d...) - - obj := map[string]attr.Value{ - "is_logging_enabled": flex.BoolToFramework(ctx, apiObject.IsLoggingEnabled), - "cloudwatch_log_destination": cloudWatchLogDestination, - } - objVal, d := types.ObjectValue(logPublishingOptionsAttrTypes, obj) - diags.Append(d...) - - listVal, d := types.ListValue(elemType, []attr.Value{objVal}) - diags.Append(d...) - - return listVal, diags -} - -func flattenCloudWatchLogDestination(ctx context.Context, apiObject *awstypes.CloudWatchLogDestination) (types.List, diag.Diagnostics) { - var diags diag.Diagnostics - elemType := types.ObjectType{AttrTypes: cloudWatchLogDestinationAttrTypes} - - if apiObject == nil { - return types.ListValueMust(elemType, []attr.Value{}), diags - } - - obj := map[string]attr.Value{ - "log_group": flex.StringToFramework(ctx, apiObject.LogGroup), - } - objVal, d := types.ObjectValue(cloudWatchLogDestinationAttrTypes, obj) - diags.Append(d...) - - listVal, d := types.ListValue(elemType, []attr.Value{objVal}) - diags.Append(d...) - - return listVal, diags -} - -func expandBufferOptions(tfList []bufferOptionsData) *awstypes.BufferOptions { - if len(tfList) == 0 { - return nil - } - bo := tfList[0] - return &awstypes.BufferOptions{ - PersistentBufferEnabled: aws.Bool(bo.PersistentBufferEnabled.ValueBool()), - } -} - -func expandEncryptionAtRestOptions(tfList []encryptionAtRestOptionsData) *awstypes.EncryptionAtRestOptions { - if len(tfList) == 0 { - return nil - } - earo := tfList[0] - return &awstypes.EncryptionAtRestOptions{ - KmsKeyArn: aws.String(earo.KmsKeyArn.ValueString()), - } -} - -func expandLogPublishingOptions(ctx context.Context, tfList []logPublishingOptionsData) (*awstypes.LogPublishingOptions, diag.Diagnostics) { - var diags diag.Diagnostics - - if len(tfList) == 0 { - return nil, diags - } - - lpo := tfList[0] - apiObject := &awstypes.LogPublishingOptions{} - if !lpo.IsLoggingEnabled.IsNull() { - apiObject.IsLoggingEnabled = aws.Bool(lpo.IsLoggingEnabled.ValueBool()) - } - - if !lpo.CloudWatchLogDestination.IsNull() { - var cloudWatchLogDestination []cloudWatchLogDestinationData - diags.Append(lpo.CloudWatchLogDestination.ElementsAs(ctx, &cloudWatchLogDestination, false)...) - apiObject.CloudWatchLogDestination = expandCloudWatchLogDestination(cloudWatchLogDestination) - } - - return apiObject, diags + return nil, err } -func expandCloudWatchLogDestination(tfList []cloudWatchLogDestinationData) *awstypes.CloudWatchLogDestination { - if len(tfList) == 0 { - return nil - } - cwld := tfList[0] - return &awstypes.CloudWatchLogDestination{ - LogGroup: aws.String(cwld.LogGroup.ValueString()), - } +type pipelineResourceModel struct { + BufferOptions fwtypes.ListNestedObjectValueOf[bufferOptionsModel] `tfsdk:"buffer_options"` + EncryptionAtRestOptions fwtypes.ListNestedObjectValueOf[encryptionAtRestOptionsModel] `tfsdk:"encryption_at_rest_options"` + ID types.String `tfsdk:"id"` + IngestEndpointUrls fwtypes.SetValueOf[types.String] `tfsdk:"ingest_endpoint_urls"` + LogPublishingOptions fwtypes.ListNestedObjectValueOf[logPublishingOptionsModel] `tfsdk:"log_publishing_options"` + MaxUnits types.Int64 `tfsdk:"max_units"` + MinUnits types.Int64 `tfsdk:"min_units"` + PipelineARN types.String `tfsdk:"pipeline_arn"` + PipelineConfigurationBody types.String `tfsdk:"pipeline_configuration_body"` + PipelineName types.String `tfsdk:"pipeline_name"` + Tags types.Map `tfsdk:"tags"` + TagsAll types.Map `tfsdk:"tags_all"` + Timeouts timeouts.Value `tfsdk:"timeouts"` + VPCOptions fwtypes.ListNestedObjectValueOf[vpcOptionsModel] `tfsdk:"vpc_options"` } -func expandVPCOptions(ctx context.Context, tfList []vpcOptionsData) *awstypes.VpcOptions { - if len(tfList) == 0 { - return nil - } - vo := tfList[0] - apiObject := &awstypes.VpcOptions{ - SubnetIds: flex.ExpandFrameworkStringValueSet(ctx, vo.SubnetIds), - } - - if !vo.SecurityGroupIds.IsNull() { - apiObject.SecurityGroupIds = flex.ExpandFrameworkStringValueSet(ctx, vo.SecurityGroupIds) - } +func (data *pipelineResourceModel) InitFromID() error { + data.PipelineName = data.ID - return apiObject + return nil } -type resourcePipelineData struct { - ARN types.String `tfsdk:"arn"` - BufferOptions types.List `tfsdk:"buffer_options"` - EncryptionAtRestOptions types.List `tfsdk:"encryption_at_rest_options"` - ID types.String `tfsdk:"id"` - IngestEndpointUrls types.Set `tfsdk:"ingest_endpoint_urls"` - LogPublishingOptions types.List `tfsdk:"log_publishing_options"` - MaxUnits types.Int64 `tfsdk:"max_units"` - MinUnits types.Int64 `tfsdk:"min_units"` - PipelineConfigurationBody types.String `tfsdk:"pipeline_configuration_body"` - PipelineName types.String `tfsdk:"pipeline_name"` - Tags types.Map `tfsdk:"tags"` - TagsAll types.Map `tfsdk:"tags_all"` - Timeouts timeouts.Value `tfsdk:"timeouts"` - VpcOptions types.List `tfsdk:"vpc_options"` +func (data *pipelineResourceModel) setID() { + data.ID = data.PipelineName } -type bufferOptionsData struct { +type bufferOptionsModel struct { PersistentBufferEnabled types.Bool `tfsdk:"persistent_buffer_enabled"` } -type encryptionAtRestOptionsData struct { +type encryptionAtRestOptionsModel struct { KmsKeyArn fwtypes.ARN `tfsdk:"kms_key_arn"` } -type logPublishingOptionsData struct { - CloudWatchLogDestination types.List `tfsdk:"cloudwatch_log_destination"` - IsLoggingEnabled types.Bool `tfsdk:"is_logging_enabled"` +type logPublishingOptionsModel struct { + CloudWatchLogDestination fwtypes.ListNestedObjectValueOf[cloudWatchLogDestinationModel] `tfsdk:"cloudwatch_log_destination"` + IsLoggingEnabled types.Bool `tfsdk:"is_logging_enabled"` } -type cloudWatchLogDestinationData struct { +type cloudWatchLogDestinationModel struct { LogGroup types.String `tfsdk:"log_group"` } -type vpcOptionsData struct { - SecurityGroupIds types.Set `tfsdk:"security_group_ids"` - SubnetIds types.Set `tfsdk:"subnet_ids"` +type vpcOptionsModel struct { + SecurityGroupIDs fwtypes.SetValueOf[types.String] `tfsdk:"security_group_ids"` + SubnetIDs fwtypes.SetValueOf[types.String] `tfsdk:"subnet_ids"` } var ( diff --git a/internal/service/osis/pipeline_test.go b/internal/service/osis/pipeline_test.go index cf3d0870b073..014c125c5ebf 100644 --- a/internal/service/osis/pipeline_test.go +++ b/internal/service/osis/pipeline_test.go @@ -5,7 +5,6 @@ package osis_test import ( "context" - "errors" "fmt" "testing" @@ -17,7 +16,6 @@ import ( "github.com/hashicorp/terraform-plugin-testing/terraform" "github.com/hashicorp/terraform-provider-aws/internal/acctest" "github.com/hashicorp/terraform-provider-aws/internal/conns" - "github.com/hashicorp/terraform-provider-aws/internal/create" tfosis "github.com/hashicorp/terraform-provider-aws/internal/service/osis" "github.com/hashicorp/terraform-provider-aws/internal/tfresource" "github.com/hashicorp/terraform-provider-aws/names" @@ -25,7 +23,6 @@ import ( func TestAccOpenSearchIngestionPipeline_basic(t *testing.T) { ctx := acctest.Context(t) - var pipeline types.Pipeline rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) resourceName := "aws_osis_pipeline.test" @@ -64,7 +61,6 @@ func TestAccOpenSearchIngestionPipeline_basic(t *testing.T) { func TestAccOpenSearchIngestionPipeline_disappears(t *testing.T) { ctx := acctest.Context(t) - var pipeline types.Pipeline rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) resourceName := "aws_osis_pipeline.test" @@ -93,7 +89,6 @@ func TestAccOpenSearchIngestionPipeline_disappears(t *testing.T) { func TestAccOpenSearchIngestionPipeline_buffer(t *testing.T) { ctx := acctest.Context(t) - var pipeline types.Pipeline rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) resourceName := "aws_osis_pipeline.test" @@ -137,7 +132,6 @@ func TestAccOpenSearchIngestionPipeline_buffer(t *testing.T) { func TestAccOpenSearchIngestionPipeline_encryption(t *testing.T) { ctx := acctest.Context(t) - var pipeline types.Pipeline rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) resourceName := "aws_osis_pipeline.test" @@ -172,7 +166,6 @@ func TestAccOpenSearchIngestionPipeline_encryption(t *testing.T) { func TestAccOpenSearchIngestionPipeline_logPublishing(t *testing.T) { ctx := acctest.Context(t) - var pipeline types.Pipeline rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) resourceName := "aws_osis_pipeline.test" @@ -209,7 +202,6 @@ func TestAccOpenSearchIngestionPipeline_logPublishing(t *testing.T) { func TestAccOpenSearchIngestionPipeline_vpc(t *testing.T) { ctx := acctest.Context(t) - var pipeline types.Pipeline rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) resourceName := "aws_osis_pipeline.test" @@ -307,35 +299,32 @@ func testAccCheckPipelineDestroy(ctx context.Context) resource.TestCheckFunc { } if err != nil { - return create.Error(names.OpenSearchIngestion, create.ErrActionCheckingDestroyed, tfosis.ResNamePipeline, rs.Primary.ID, err) + return err } - return create.Error(names.OpenSearchIngestion, create.ErrActionCheckingDestroyed, tfosis.ResNamePipeline, rs.Primary.ID, errors.New("not destroyed")) + return fmt.Errorf("OpenSearch Ingestion Pipeline %s still exists", rs.Primary.ID) } return nil } } -func testAccCheckPipelineExists(ctx context.Context, name string, pipeline *types.Pipeline) resource.TestCheckFunc { +func testAccCheckPipelineExists(ctx context.Context, n string, v *types.Pipeline) resource.TestCheckFunc { return func(s *terraform.State) error { - rs, ok := s.RootModule().Resources[name] + rs, ok := s.RootModule().Resources[n] if !ok { - return create.Error(names.OpenSearchIngestion, create.ErrActionCheckingExistence, tfosis.ResNamePipeline, name, errors.New("not found")) - } - - if rs.Primary.ID == "" { - return create.Error(names.OpenSearchIngestion, create.ErrActionCheckingExistence, tfosis.ResNamePipeline, name, errors.New("not set")) + return fmt.Errorf("Not found: %s", n) } conn := acctest.Provider.Meta().(*conns.AWSClient).OpenSearchIngestionClient(ctx) - resp, err := tfosis.FindPipelineByID(ctx, conn, rs.Primary.ID) + + output, err := tfosis.FindPipelineByID(ctx, conn, rs.Primary.ID) if err != nil { - return create.Error(names.OpenSearchIngestion, create.ErrActionCheckingExistence, tfosis.ResNamePipeline, rs.Primary.ID, err) + return err } - *pipeline = *resp + *v = *output return nil } diff --git a/internal/service/osis/service_package_gen.go b/internal/service/osis/service_package_gen.go index a8888e878a99..fd0e0860ac96 100644 --- a/internal/service/osis/service_package_gen.go +++ b/internal/service/osis/service_package_gen.go @@ -21,7 +21,7 @@ func (p *servicePackage) FrameworkDataSources(ctx context.Context) []*types.Serv func (p *servicePackage) FrameworkResources(ctx context.Context) []*types.ServicePackageFrameworkResource { return []*types.ServicePackageFrameworkResource{ { - Factory: newResourcePipeline, + Factory: newPipelineResource, Name: "Pipeline", Tags: &types.ServicePackageResourceTags{ IdentifierAttribute: "arn", diff --git a/website/docs/r/osis_pipeline.html.markdown b/website/docs/r/osis_pipeline.html.markdown index 23bd32c2366e..00cb03435e62 100644 --- a/website/docs/r/osis_pipeline.html.markdown +++ b/website/docs/r/osis_pipeline.html.markdown @@ -111,9 +111,9 @@ The following arguments are optional: This resource exports the following attributes in addition to the arguments above: -* `arn` - Amazon Resource Name (ARN) of the pipeline. * `id` - Unique identifier for the pipeline. * `ingest_endpoint_urls` - The list of ingestion endpoints for the pipeline, which you can send data to. +* `pipeline_arn` - Amazon Resource Name (ARN) of the pipeline. ## Timeouts From 3623934b7869fa6f9f770ded4065d27b6682a98c Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 8 Feb 2024 07:40:33 -0500 Subject: [PATCH 17/21] r/aws_osis_pipeline: 'vpc_config' is ForceNew. --- internal/service/osis/pipeline.go | 12 +++++++++++- internal/service/osis/service_package_gen.go | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/internal/service/osis/pipeline.go b/internal/service/osis/pipeline.go index bd04d0c80be0..4eba5846d198 100644 --- a/internal/service/osis/pipeline.go +++ b/internal/service/osis/pipeline.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/terraform-plugin-framework/attr" "github.com/hashicorp/terraform-plugin-framework/resource" "github.com/hashicorp/terraform-plugin-framework/resource/schema" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/listplanmodifier" "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" "github.com/hashicorp/terraform-plugin-framework/resource/schema/setplanmodifier" "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" @@ -39,7 +40,7 @@ import ( ) // @FrameworkResource(name="Pipeline") -// @Tags(identifierAttribute="arn") +// @Tags(identifierAttribute="pipeline_arn") func newPipelineResource(_ context.Context) (resource.ResourceWithConfigure, error) { r := &pipelineResource{} @@ -169,6 +170,9 @@ func (r *pipelineResource) Schema(ctx context.Context, request resource.SchemaRe }), "vpc_options": schema.ListNestedBlock{ CustomType: fwtypes.NewListNestedObjectTypeOf[vpcOptionsModel](ctx), + PlanModifiers: []planmodifier.List{ + listplanmodifier.RequiresReplace(), + }, Validators: []validator.List{ listvalidator.SizeAtMost(1), }, @@ -178,6 +182,9 @@ func (r *pipelineResource) Schema(ctx context.Context, request resource.SchemaRe CustomType: fwtypes.SetOfStringType, Optional: true, ElementType: types.StringType, + PlanModifiers: []planmodifier.Set{ + setplanmodifier.RequiresReplace(), + }, Validators: []validator.Set{ setvalidator.SizeBetween(1, 12), }, @@ -186,6 +193,9 @@ func (r *pipelineResource) Schema(ctx context.Context, request resource.SchemaRe CustomType: fwtypes.SetOfStringType, Required: true, ElementType: types.StringType, + PlanModifiers: []planmodifier.Set{ + setplanmodifier.RequiresReplace(), + }, Validators: []validator.Set{ setvalidator.SizeBetween(1, 12), }, diff --git a/internal/service/osis/service_package_gen.go b/internal/service/osis/service_package_gen.go index fd0e0860ac96..d894ebf0cb2d 100644 --- a/internal/service/osis/service_package_gen.go +++ b/internal/service/osis/service_package_gen.go @@ -24,7 +24,7 @@ func (p *servicePackage) FrameworkResources(ctx context.Context) []*types.Servic Factory: newPipelineResource, Name: "Pipeline", Tags: &types.ServicePackageResourceTags{ - IdentifierAttribute: "arn", + IdentifierAttribute: "pipeline_arn", }, }, } From 53a7415e01ba3737a644cda02b6a7c6f16e216c4 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 8 Feb 2024 08:33:53 -0500 Subject: [PATCH 18/21] Fixup 'TestAccOpenSearchIngestionPipeline_basic'. --- internal/service/osis/exports_test.go | 2 +- internal/service/osis/pipeline_test.go | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/internal/service/osis/exports_test.go b/internal/service/osis/exports_test.go index 2e3060b848df..83496ede5f26 100644 --- a/internal/service/osis/exports_test.go +++ b/internal/service/osis/exports_test.go @@ -7,5 +7,5 @@ package osis var ( ResourcePipeline = newPipelineResource - FindPipelineByID = findPipelineByName + FindPipelineByName = findPipelineByName ) diff --git a/internal/service/osis/pipeline_test.go b/internal/service/osis/pipeline_test.go index 014c125c5ebf..8c58ac2039ed 100644 --- a/internal/service/osis/pipeline_test.go +++ b/internal/service/osis/pipeline_test.go @@ -39,15 +39,19 @@ func TestAccOpenSearchIngestionPipeline_basic(t *testing.T) { Steps: []resource.TestStep{ { Config: testAccPipelineConfig_basic(rName), - Check: resource.ComposeTestCheckFunc( + Check: resource.ComposeAggregateTestCheckFunc( testAccCheckPipelineExists(ctx, resourceName, &pipeline), - resource.TestCheckResourceAttr(resourceName, "min_units", "1"), + resource.TestCheckResourceAttr(resourceName, "buffer_options.#", "0"), + resource.TestCheckResourceAttr(resourceName, "encryption_at_rest_options.#", "0"), + acctest.CheckResourceAttrGreaterThanOrEqualValue(resourceName, "ingest_endpoint_urls.#", 1), + resource.TestCheckResourceAttr(resourceName, "log_publishing_options.#", "0"), resource.TestCheckResourceAttr(resourceName, "max_units", "1"), + resource.TestCheckResourceAttr(resourceName, "min_units", "1"), + acctest.MatchResourceAttrRegionalARN(resourceName, "pipeline_arn", "osis", regexache.MustCompile(`pipeline/.+$`)), resource.TestCheckResourceAttrSet(resourceName, "pipeline_configuration_body"), resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), - resource.TestCheckResourceAttr(resourceName, "ingest_endpoint_urls.#", "1"), - resource.TestCheckResourceAttrPair(resourceName, "pipeline_name", "aws_osis_pipeline.test", "id"), - acctest.MatchResourceAttrRegionalARN(resourceName, "arn", "osis", regexache.MustCompile(`pipeline/.+$`)), + resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + resource.TestCheckResourceAttr(resourceName, "vpc_options.#", "0"), ), }, { @@ -292,7 +296,7 @@ func testAccCheckPipelineDestroy(ctx context.Context) resource.TestCheckFunc { continue } - _, err := tfosis.FindPipelineByID(ctx, conn, rs.Primary.ID) + _, err := tfosis.FindPipelineByName(ctx, conn, rs.Primary.ID) if tfresource.NotFound(err) { continue @@ -318,7 +322,7 @@ func testAccCheckPipelineExists(ctx context.Context, n string, v *types.Pipeline conn := acctest.Provider.Meta().(*conns.AWSClient).OpenSearchIngestionClient(ctx) - output, err := tfosis.FindPipelineByID(ctx, conn, rs.Primary.ID) + output, err := tfosis.FindPipelineByName(ctx, conn, rs.Primary.ID) if err != nil { return err From 10a3ec320608c2f5efe24a007e6aa9045f745167 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 8 Feb 2024 08:34:07 -0500 Subject: [PATCH 19/21] Acceptance test output: % make testacc TESTARGS='-run=TestAccOpenSearchIngestionPipeline_basic' PKG=osis ==> Checking that code complies with gofmt requirements... TF_ACC=1 go test ./internal/service/osis/... -v -count 1 -parallel 20 -run=TestAccOpenSearchIngestionPipeline_basic -timeout 360m === RUN TestAccOpenSearchIngestionPipeline_basic === PAUSE TestAccOpenSearchIngestionPipeline_basic === CONT TestAccOpenSearchIngestionPipeline_basic --- PASS: TestAccOpenSearchIngestionPipeline_basic (602.09s) PASS ok github.com/hashicorp/terraform-provider-aws/internal/service/osis 630.121s From d6d99b27fc68a96c9821769a07b2702510a8a828 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 8 Feb 2024 10:28:01 -0500 Subject: [PATCH 20/21] Fix golangci-lint 'unused'. --- internal/service/osis/pipeline.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/internal/service/osis/pipeline.go b/internal/service/osis/pipeline.go index 4eba5846d198..53f8f176e73d 100644 --- a/internal/service/osis/pipeline.go +++ b/internal/service/osis/pipeline.go @@ -17,7 +17,6 @@ import ( "github.com/hashicorp/terraform-plugin-framework-validators/listvalidator" "github.com/hashicorp/terraform-plugin-framework-validators/setvalidator" "github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator" - "github.com/hashicorp/terraform-plugin-framework/attr" "github.com/hashicorp/terraform-plugin-framework/resource" "github.com/hashicorp/terraform-plugin-framework/resource/schema" "github.com/hashicorp/terraform-plugin-framework/resource/schema/listplanmodifier" @@ -532,22 +531,3 @@ type vpcOptionsModel struct { SecurityGroupIDs fwtypes.SetValueOf[types.String] `tfsdk:"security_group_ids"` SubnetIDs fwtypes.SetValueOf[types.String] `tfsdk:"subnet_ids"` } - -var ( - bufferOptionsAttrTypes = map[string]attr.Type{ - "persistent_buffer_enabled": types.BoolType, - } - - encryptionAtRestOptionsAttrTypes = map[string]attr.Type{ - "kms_key_arn": types.StringType, - } - - logPublishingOptionsAttrTypes = map[string]attr.Type{ - "cloudwatch_log_destination": types.ListType{ElemType: types.ObjectType{AttrTypes: cloudWatchLogDestinationAttrTypes}}, - "is_logging_enabled": types.BoolType, - } - - cloudWatchLogDestinationAttrTypes = map[string]attr.Type{ - "log_group": types.StringType, - } -) From 67b6f1e94619ef3794e56f6a78707ed92da39cbc Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 8 Feb 2024 10:29:16 -0500 Subject: [PATCH 21/21] Fix golangci-lint 'stylecheck'. --- internal/service/osis/pipeline.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/service/osis/pipeline.go b/internal/service/osis/pipeline.go index 53f8f176e73d..1ea6d661ddcf 100644 --- a/internal/service/osis/pipeline.go +++ b/internal/service/osis/pipeline.go @@ -30,7 +30,6 @@ import ( "github.com/hashicorp/terraform-provider-aws/internal/errs" "github.com/hashicorp/terraform-provider-aws/internal/errs/fwdiag" "github.com/hashicorp/terraform-provider-aws/internal/framework" - "github.com/hashicorp/terraform-provider-aws/internal/framework/flex" fwflex "github.com/hashicorp/terraform-provider-aws/internal/framework/flex" fwtypes "github.com/hashicorp/terraform-provider-aws/internal/framework/types" tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" @@ -248,7 +247,7 @@ func (r *pipelineResource) Create(ctx context.Context, request resource.CreateRe // Set values for unknowns. data.IngestEndpointUrls.SetValue = fwflex.FlattenFrameworkStringValueSet(ctx, pipeline.IngestEndpointUrls) - data.PipelineARN = flex.StringToFramework(ctx, pipeline.PipelineArn) + data.PipelineARN = fwflex.StringToFramework(ctx, pipeline.PipelineArn) response.Diagnostics.Append(response.State.Set(ctx, &data)...) }