diff --git a/.changelog/35582.txt b/.changelog/35582.txt new file mode 100644 index 00000000000..a92c0aa7b70 --- /dev/null +++ b/.changelog/35582.txt @@ -0,0 +1,3 @@ +```release-note:new-resource +aws_osis_pipeline +``` diff --git a/internal/service/osis/consts.go b/internal/service/osis/consts.go new file mode 100644 index 00000000000..6ddec039f74 --- /dev/null +++ b/internal/service/osis/consts.go @@ -0,0 +1,12 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package osis + +import ( + "time" +) + +const ( + propagationTimeout = 2 * time.Minute +) diff --git a/internal/service/osis/exports_test.go b/internal/service/osis/exports_test.go new file mode 100644 index 00000000000..83496ede5f2 --- /dev/null +++ b/internal/service/osis/exports_test.go @@ -0,0 +1,11 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package osis + +// Exports for use in tests only. +var ( + ResourcePipeline = newPipelineResource + + FindPipelineByName = findPipelineByName +) diff --git a/internal/service/osis/generate.go b/internal/service/osis/generate.go index c2281bfd620..b88b4e3a96d 100644 --- a/internal/service/osis/generate.go +++ b/internal/service/osis/generate.go @@ -1,6 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 +//go:generate go run ../../generate/tags/main.go -AWSSDKVersion=2 -ListTags -ServiceTagsSlice -TagInIDElem=Arn -ListTagsInIDElem=Arn -UpdateTags //go:generate go run ../../generate/servicepackage/main.go // ONLY generate directives and package declaration! Do not add anything else to this file. diff --git a/internal/service/osis/pipeline.go b/internal/service/osis/pipeline.go new file mode 100644 index 00000000000..1ea6d661ddc --- /dev/null +++ b/internal/service/osis/pipeline.go @@ -0,0 +1,532 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package osis + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/osis" + awstypes "github.com/aws/aws-sdk-go-v2/service/osis/types" + "github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts" + "github.com/hashicorp/terraform-plugin-framework-validators/int64validator" + "github.com/hashicorp/terraform-plugin-framework-validators/listvalidator" + "github.com/hashicorp/terraform-plugin-framework-validators/setvalidator" + "github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/resource/schema" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/listplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/setplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/schema/validator" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry" + "github.com/hashicorp/terraform-provider-aws/internal/enum" + "github.com/hashicorp/terraform-provider-aws/internal/errs" + "github.com/hashicorp/terraform-provider-aws/internal/errs/fwdiag" + "github.com/hashicorp/terraform-provider-aws/internal/framework" + fwflex "github.com/hashicorp/terraform-provider-aws/internal/framework/flex" + fwtypes "github.com/hashicorp/terraform-provider-aws/internal/framework/types" + tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" + "github.com/hashicorp/terraform-provider-aws/names" +) + +// @FrameworkResource(name="Pipeline") +// @Tags(identifierAttribute="pipeline_arn") +func newPipelineResource(_ context.Context) (resource.ResourceWithConfigure, error) { + r := &pipelineResource{} + + r.SetDefaultCreateTimeout(45 * time.Minute) + r.SetDefaultUpdateTimeout(45 * time.Minute) + r.SetDefaultDeleteTimeout(45 * time.Minute) + + return r, nil +} + +type pipelineResource struct { + framework.ResourceWithConfigure + framework.WithImportByID + framework.WithTimeouts +} + +func (r *pipelineResource) Metadata(_ context.Context, request resource.MetadataRequest, response *resource.MetadataResponse) { + response.TypeName = "aws_osis_pipeline" +} + +func (r *pipelineResource) Schema(ctx context.Context, request resource.SchemaRequest, response *resource.SchemaResponse) { + response.Schema = schema.Schema{ + Attributes: map[string]schema.Attribute{ + names.AttrID: framework.IDAttribute(), + "ingest_endpoint_urls": schema.SetAttribute{ + CustomType: fwtypes.SetOfStringType, + Computed: true, + ElementType: types.StringType, + PlanModifiers: []planmodifier.Set{ + setplanmodifier.UseStateForUnknown(), + }, + }, + "max_units": schema.Int64Attribute{ + Required: true, + Validators: []validator.Int64{ + int64validator.AtLeast(1), + }, + }, + "min_units": schema.Int64Attribute{ + Required: true, + Validators: []validator.Int64{ + int64validator.AtLeast(1), + }, + }, + "pipeline_arn": framework.ARNAttributeComputedOnly(), + "pipeline_configuration_body": schema.StringAttribute{ + Required: true, + Validators: []validator.String{ + stringvalidator.LengthBetween(1, 24000), + }, + }, + "pipeline_name": schema.StringAttribute{ + Required: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.RequiresReplace(), + }, + Validators: []validator.String{ + stringvalidator.LengthBetween(3, 28), + }, + }, + names.AttrTags: tftags.TagsAttribute(), + names.AttrTagsAll: tftags.TagsAttributeComputedOnly(), + }, + Blocks: map[string]schema.Block{ + "buffer_options": schema.ListNestedBlock{ + CustomType: fwtypes.NewListNestedObjectTypeOf[bufferOptionsModel](ctx), + Validators: []validator.List{ + listvalidator.SizeAtMost(1), + }, + NestedObject: schema.NestedBlockObject{ + Attributes: map[string]schema.Attribute{ + "persistent_buffer_enabled": schema.BoolAttribute{ + Required: true, + }, + }, + }, + }, + "encryption_at_rest_options": schema.ListNestedBlock{ + CustomType: fwtypes.NewListNestedObjectTypeOf[encryptionAtRestOptionsModel](ctx), + Validators: []validator.List{ + listvalidator.SizeAtMost(1), + }, + NestedObject: schema.NestedBlockObject{ + Attributes: map[string]schema.Attribute{ + "kms_key_arn": schema.StringAttribute{ + CustomType: fwtypes.ARNType, + Required: true, + }, + }, + }, + }, + "log_publishing_options": schema.ListNestedBlock{ + CustomType: fwtypes.NewListNestedObjectTypeOf[logPublishingOptionsModel](ctx), + Validators: []validator.List{ + listvalidator.SizeAtMost(1), + }, + NestedObject: schema.NestedBlockObject{ + Attributes: map[string]schema.Attribute{ + "is_logging_enabled": schema.BoolAttribute{ + Optional: true, + }, + }, + Blocks: map[string]schema.Block{ + "cloudwatch_log_destination": schema.ListNestedBlock{ + CustomType: fwtypes.NewListNestedObjectTypeOf[cloudWatchLogDestinationModel](ctx), + Validators: []validator.List{ + listvalidator.SizeAtMost(1), + }, + NestedObject: schema.NestedBlockObject{ + Attributes: map[string]schema.Attribute{ + "log_group": schema.StringAttribute{ + Required: true, + Validators: []validator.String{ + stringvalidator.LengthBetween(1, 512), + }, + }, + }, + }, + }, + }, + }, + }, + "timeouts": timeouts.Block(ctx, timeouts.Opts{ + Create: true, + Update: true, + Delete: true, + }), + "vpc_options": schema.ListNestedBlock{ + CustomType: fwtypes.NewListNestedObjectTypeOf[vpcOptionsModel](ctx), + PlanModifiers: []planmodifier.List{ + listplanmodifier.RequiresReplace(), + }, + Validators: []validator.List{ + listvalidator.SizeAtMost(1), + }, + NestedObject: schema.NestedBlockObject{ + Attributes: map[string]schema.Attribute{ + "security_group_ids": schema.SetAttribute{ + CustomType: fwtypes.SetOfStringType, + Optional: true, + ElementType: types.StringType, + PlanModifiers: []planmodifier.Set{ + setplanmodifier.RequiresReplace(), + }, + Validators: []validator.Set{ + setvalidator.SizeBetween(1, 12), + }, + }, + "subnet_ids": schema.SetAttribute{ + CustomType: fwtypes.SetOfStringType, + Required: true, + ElementType: types.StringType, + PlanModifiers: []planmodifier.Set{ + setplanmodifier.RequiresReplace(), + }, + Validators: []validator.Set{ + setvalidator.SizeBetween(1, 12), + }, + }, + }, + }, + }, + }, + } +} + +func (r *pipelineResource) Create(ctx context.Context, request resource.CreateRequest, response *resource.CreateResponse) { + var data pipelineResourceModel + response.Diagnostics.Append(request.Plan.Get(ctx, &data)...) + if response.Diagnostics.HasError() { + return + } + + conn := r.Meta().OpenSearchIngestionClient(ctx) + + name := data.PipelineName.ValueString() + input := &osis.CreatePipelineInput{} + response.Diagnostics.Append(fwflex.Expand(ctx, data, input)...) + if response.Diagnostics.HasError() { + return + } + + // Additional fields. + input.Tags = getTagsIn(ctx) + + // Retry for IAM eventual consistency. + _, err := tfresource.RetryWhenIsA[*awstypes.ValidationException](ctx, propagationTimeout, func() (interface{}, error) { + return conn.CreatePipeline(ctx, input) + }) + + if err != nil { + response.Diagnostics.AddError(fmt.Sprintf("creating OpenSearch Ingestion Pipeline (%s)", name), err.Error()) + + return + } + + data.setID() + + pipeline, err := waitPipelineCreated(ctx, conn, name, r.CreateTimeout(ctx, data.Timeouts)) + + if err != nil { + response.Diagnostics.AddError(fmt.Sprintf("waiting for OpenSearch Ingestion Pipeline (%s) create", name), err.Error()) + + return + } + + // Set values for unknowns. + data.IngestEndpointUrls.SetValue = fwflex.FlattenFrameworkStringValueSet(ctx, pipeline.IngestEndpointUrls) + data.PipelineARN = fwflex.StringToFramework(ctx, pipeline.PipelineArn) + + response.Diagnostics.Append(response.State.Set(ctx, &data)...) +} + +func (r *pipelineResource) Read(ctx context.Context, request resource.ReadRequest, response *resource.ReadResponse) { + var data pipelineResourceModel + response.Diagnostics.Append(request.State.Get(ctx, &data)...) + if response.Diagnostics.HasError() { + return + } + + if err := data.InitFromID(); err != nil { + response.Diagnostics.AddError("parsing resource ID", err.Error()) + + return + } + + conn := r.Meta().OpenSearchIngestionClient(ctx) + + name := data.PipelineName.ValueString() + pipeline, err := findPipelineByName(ctx, conn, name) + + if tfresource.NotFound(err) { + response.Diagnostics.Append(fwdiag.NewResourceNotFoundWarningDiagnostic(err)) + response.State.RemoveResource(ctx) + + return + } + + if err != nil { + response.Diagnostics.AddError(fmt.Sprintf("reading OpenSearch Ingestion Pipeline (%s)", name), err.Error()) + + return + } + + response.Diagnostics.Append(fwflex.Flatten(ctx, pipeline, &data)...) + if response.Diagnostics.HasError() { + return + } + + response.Diagnostics.Append(response.State.Set(ctx, &data)...) +} + +func (r *pipelineResource) Update(ctx context.Context, request resource.UpdateRequest, response *resource.UpdateResponse) { + var old, new pipelineResourceModel + response.Diagnostics.Append(request.State.Get(ctx, &old)...) + if response.Diagnostics.HasError() { + return + } + response.Diagnostics.Append(request.Plan.Get(ctx, &new)...) + if response.Diagnostics.HasError() { + return + } + + conn := r.Meta().OpenSearchIngestionClient(ctx) + + if !new.BufferOptions.Equal(old.BufferOptions) || + !new.EncryptionAtRestOptions.Equal(old.EncryptionAtRestOptions) || + !new.LogPublishingOptions.Equal(old.LogPublishingOptions) || + !new.MaxUnits.Equal(old.MaxUnits) || + !new.MinUnits.Equal(old.MinUnits) || + !new.PipelineConfigurationBody.Equal(old.PipelineConfigurationBody) { + input := &osis.UpdatePipelineInput{} + response.Diagnostics.Append(fwflex.Expand(ctx, new, input)...) + if response.Diagnostics.HasError() { + return + } + + name := new.PipelineName.ValueString() + _, err := conn.UpdatePipeline(ctx, input) + + if err != nil { + response.Diagnostics.AddError(fmt.Sprintf("updating OpenSearch Ingestion Pipeline (%s)", name), err.Error()) + + return + } + + if _, err := waitPipelineUpdated(ctx, conn, name, r.UpdateTimeout(ctx, new.Timeouts)); err != nil { + response.Diagnostics.AddError(fmt.Sprintf("waiting for OpenSearch Ingestion Pipeline (%s) update", name), err.Error()) + + return + } + } + + response.Diagnostics.Append(response.State.Set(ctx, &new)...) +} + +func (r *pipelineResource) Delete(ctx context.Context, request resource.DeleteRequest, response *resource.DeleteResponse) { + var data pipelineResourceModel + response.Diagnostics.Append(request.State.Get(ctx, &data)...) + if response.Diagnostics.HasError() { + return + } + + conn := r.Meta().OpenSearchIngestionClient(ctx) + + name := data.PipelineName.ValueString() + input := &osis.DeletePipelineInput{ + PipelineName: aws.String(name), + } + + _, err := conn.DeletePipeline(ctx, input) + + if errs.IsA[*awstypes.ResourceNotFoundException](err) { + return + } + + if err != nil { + response.Diagnostics.AddError(fmt.Sprintf("deleting OpenSearch Ingestion Pipeline (%s)", name), err.Error()) + + return + } + + if _, err := waitPipelineDeleted(ctx, conn, name, r.DeleteTimeout(ctx, data.Timeouts)); err != nil { + response.Diagnostics.AddError(fmt.Sprintf("waiting for OpenSearch Ingestion Pipeline (%s) delete", name), err.Error()) + + return + } +} + +func (r *pipelineResource) ModifyPlan(ctx context.Context, request resource.ModifyPlanRequest, response *resource.ModifyPlanResponse) { + r.SetTagsAll(ctx, request, response) +} + +func findPipelineByName(ctx context.Context, conn *osis.Client, name string) (*awstypes.Pipeline, error) { + input := &osis.GetPipelineInput{ + PipelineName: aws.String(name), + } + + output, err := conn.GetPipeline(ctx, input) + + if errs.IsA[*awstypes.ResourceNotFoundException](err) { + return nil, &retry.NotFoundError{ + LastError: err, + LastRequest: input, + } + } + + if err != nil { + return nil, err + } + + if output == nil || output.Pipeline == nil { + return nil, tfresource.NewEmptyResultError(input) + } + + return output.Pipeline, nil +} + +func statusPipeline(ctx context.Context, conn *osis.Client, name string) retry.StateRefreshFunc { + return func() (interface{}, string, error) { + output, err := findPipelineByName(ctx, conn, name) + + if tfresource.NotFound(err) { + return nil, "", nil + } + + if err != nil { + return nil, "", err + } + + return output, string(output.Status), nil + } +} + +func waitPipelineCreated(ctx context.Context, conn *osis.Client, name string, timeout time.Duration) (*awstypes.Pipeline, error) { + stateConf := &retry.StateChangeConf{ + Pending: enum.Slice(awstypes.PipelineStatusCreating, awstypes.PipelineStatusStarting), + Target: enum.Slice(awstypes.PipelineStatusActive), + Refresh: statusPipeline(ctx, conn, name), + Timeout: timeout, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, + } + + outputRaw, err := stateConf.WaitForStateContext(ctx) + + if output, ok := outputRaw.(*awstypes.Pipeline); ok { + if reason := output.StatusReason; reason != nil { + tfresource.SetLastError(err, errors.New(aws.ToString(reason.Description))) + } + + return output, err + } + + return nil, err +} + +func waitPipelineUpdated(ctx context.Context, conn *osis.Client, name string, timeout time.Duration) (*awstypes.Pipeline, error) { + stateConf := &retry.StateChangeConf{ + Pending: enum.Slice(awstypes.PipelineStatusUpdating), + Target: enum.Slice(awstypes.PipelineStatusActive), + Refresh: statusPipeline(ctx, conn, name), + Timeout: timeout, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, + } + + outputRaw, err := stateConf.WaitForStateContext(ctx) + + if output, ok := outputRaw.(*awstypes.Pipeline); ok { + if reason := output.StatusReason; reason != nil { + tfresource.SetLastError(err, errors.New(aws.ToString(reason.Description))) + } + + return output, err + } + + return nil, err +} + +func waitPipelineDeleted(ctx context.Context, conn *osis.Client, name string, timeout time.Duration) (*awstypes.Pipeline, error) { + stateConf := &retry.StateChangeConf{ + Pending: enum.Slice(awstypes.PipelineStatusDeleting), + Target: []string{}, + Refresh: statusPipeline(ctx, conn, name), + Timeout: timeout, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, + } + + outputRaw, err := stateConf.WaitForStateContext(ctx) + + if output, ok := outputRaw.(*awstypes.Pipeline); ok { + if reason := output.StatusReason; reason != nil { + tfresource.SetLastError(err, errors.New(aws.ToString(reason.Description))) + } + + return output, err + } + + return nil, err +} + +type pipelineResourceModel struct { + BufferOptions fwtypes.ListNestedObjectValueOf[bufferOptionsModel] `tfsdk:"buffer_options"` + EncryptionAtRestOptions fwtypes.ListNestedObjectValueOf[encryptionAtRestOptionsModel] `tfsdk:"encryption_at_rest_options"` + ID types.String `tfsdk:"id"` + IngestEndpointUrls fwtypes.SetValueOf[types.String] `tfsdk:"ingest_endpoint_urls"` + LogPublishingOptions fwtypes.ListNestedObjectValueOf[logPublishingOptionsModel] `tfsdk:"log_publishing_options"` + MaxUnits types.Int64 `tfsdk:"max_units"` + MinUnits types.Int64 `tfsdk:"min_units"` + PipelineARN types.String `tfsdk:"pipeline_arn"` + PipelineConfigurationBody types.String `tfsdk:"pipeline_configuration_body"` + PipelineName types.String `tfsdk:"pipeline_name"` + Tags types.Map `tfsdk:"tags"` + TagsAll types.Map `tfsdk:"tags_all"` + Timeouts timeouts.Value `tfsdk:"timeouts"` + VPCOptions fwtypes.ListNestedObjectValueOf[vpcOptionsModel] `tfsdk:"vpc_options"` +} + +func (data *pipelineResourceModel) InitFromID() error { + data.PipelineName = data.ID + + return nil +} + +func (data *pipelineResourceModel) setID() { + data.ID = data.PipelineName +} + +type bufferOptionsModel struct { + PersistentBufferEnabled types.Bool `tfsdk:"persistent_buffer_enabled"` +} + +type encryptionAtRestOptionsModel struct { + KmsKeyArn fwtypes.ARN `tfsdk:"kms_key_arn"` +} + +type logPublishingOptionsModel struct { + CloudWatchLogDestination fwtypes.ListNestedObjectValueOf[cloudWatchLogDestinationModel] `tfsdk:"cloudwatch_log_destination"` + IsLoggingEnabled types.Bool `tfsdk:"is_logging_enabled"` +} + +type cloudWatchLogDestinationModel struct { + LogGroup types.String `tfsdk:"log_group"` +} + +type vpcOptionsModel struct { + SecurityGroupIDs fwtypes.SetValueOf[types.String] `tfsdk:"security_group_ids"` + SubnetIDs fwtypes.SetValueOf[types.String] `tfsdk:"subnet_ids"` +} diff --git a/internal/service/osis/pipeline_test.go b/internal/service/osis/pipeline_test.go new file mode 100644 index 00000000000..8c58ac2039e --- /dev/null +++ b/internal/service/osis/pipeline_test.go @@ -0,0 +1,731 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package osis_test + +import ( + "context" + "fmt" + "testing" + + "github.com/YakDriver/regexache" + "github.com/aws/aws-sdk-go-v2/service/osis" + "github.com/aws/aws-sdk-go-v2/service/osis/types" + sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/hashicorp/terraform-plugin-testing/terraform" + "github.com/hashicorp/terraform-provider-aws/internal/acctest" + "github.com/hashicorp/terraform-provider-aws/internal/conns" + tfosis "github.com/hashicorp/terraform-provider-aws/internal/service/osis" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" + "github.com/hashicorp/terraform-provider-aws/names" +) + +func TestAccOpenSearchIngestionPipeline_basic(t *testing.T) { + ctx := acctest.Context(t) + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_basic(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "buffer_options.#", "0"), + resource.TestCheckResourceAttr(resourceName, "encryption_at_rest_options.#", "0"), + acctest.CheckResourceAttrGreaterThanOrEqualValue(resourceName, "ingest_endpoint_urls.#", 1), + resource.TestCheckResourceAttr(resourceName, "log_publishing_options.#", "0"), + resource.TestCheckResourceAttr(resourceName, "max_units", "1"), + resource.TestCheckResourceAttr(resourceName, "min_units", "1"), + acctest.MatchResourceAttrRegionalARN(resourceName, "pipeline_arn", "osis", regexache.MustCompile(`pipeline/.+$`)), + resource.TestCheckResourceAttrSet(resourceName, "pipeline_configuration_body"), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + resource.TestCheckResourceAttr(resourceName, "vpc_options.#", "0"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_disappears(t *testing.T) { + ctx := acctest.Context(t) + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_basic(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + acctest.CheckFrameworkResourceDisappears(ctx, acctest.Provider, tfosis.ResourcePipeline, resourceName), + ), + ExpectNonEmptyPlan: true, + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_buffer(t *testing.T) { + ctx := acctest.Context(t) + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_bufferOptions(rName, true), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "buffer_options.#", "1"), + resource.TestCheckResourceAttr(resourceName, "buffer_options.0.persistent_buffer_enabled", "true"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccPipelineConfig_bufferOptions(rName, false), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "buffer_options.#", "1"), + resource.TestCheckResourceAttr(resourceName, "buffer_options.0.persistent_buffer_enabled", "false"), + ), + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_encryption(t *testing.T) { + ctx := acctest.Context(t) + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_encryption(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "encryption_at_rest_options.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "encryption_at_rest_options.0.kms_key_arn"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_logPublishing(t *testing.T) { + ctx := acctest.Context(t) + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_logPublishing(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "log_publishing_options.#", "1"), + resource.TestCheckResourceAttr(resourceName, "log_publishing_options.0.is_logging_enabled", "true"), + resource.TestCheckResourceAttr(resourceName, "log_publishing_options.0.cloudwatch_log_destination.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "log_publishing_options.0.cloudwatch_log_destination.0.log_group"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_vpc(t *testing.T) { + ctx := acctest.Context(t) + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_vpc(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "pipeline_name", rName), + resource.TestCheckResourceAttr(resourceName, "vpc_options.#", "1"), + resource.TestCheckResourceAttr(resourceName, "vpc_options.0.security_group_ids.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "vpc_options.0.security_group_ids.0"), + resource.TestCheckResourceAttr(resourceName, "vpc_options.0.subnet_ids.#", "1"), + resource.TestCheckResourceAttrSet(resourceName, "vpc_options.0.subnet_ids.0"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"vpc_options"}, + }, + }, + }) +} + +func TestAccOpenSearchIngestionPipeline_tags(t *testing.T) { + ctx := acctest.Context(t) + var pipeline types.Pipeline + rName := fmt.Sprintf("%s-%s", acctest.ResourcePrefix, sdkacctest.RandString(10)) + resourceName := "aws_osis_pipeline.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckPartitionHasService(t, names.OpenSearchIngestionEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.OpenSearchIngestionEndpointID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckPipelineDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccPipelineConfig_tags1(rName, "key1", "value1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"), + ), + }, + { + Config: testAccPipelineConfig_tags2(rName, "key1", "value1", "key2", "value2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "tags.%", "2"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + { + Config: testAccPipelineConfig_tags1(rName, "key2", "value2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckPipelineExists(ctx, resourceName, &pipeline), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + }, + }) +} + +func testAccCheckPipelineDestroy(ctx context.Context) resource.TestCheckFunc { + return func(s *terraform.State) error { + conn := acctest.Provider.Meta().(*conns.AWSClient).OpenSearchIngestionClient(ctx) + + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_osis_pipeline" { + continue + } + + _, err := tfosis.FindPipelineByName(ctx, conn, rs.Primary.ID) + + if tfresource.NotFound(err) { + continue + } + + if err != nil { + return err + } + + return fmt.Errorf("OpenSearch Ingestion Pipeline %s still exists", rs.Primary.ID) + } + + return nil + } +} + +func testAccCheckPipelineExists(ctx context.Context, n string, v *types.Pipeline) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + + conn := acctest.Provider.Meta().(*conns.AWSClient).OpenSearchIngestionClient(ctx) + + output, err := tfosis.FindPipelineByName(ctx, conn, rs.Primary.ID) + + if err != nil { + return err + } + + *v = *output + + return nil + } +} + +func testAccPreCheck(ctx context.Context, t *testing.T) { + conn := acctest.Provider.Meta().(*conns.AWSClient).OpenSearchIngestionClient(ctx) + + input := &osis.ListPipelinesInput{} + _, err := conn.ListPipelines(ctx, input) + + if acctest.PreCheckSkipError(err) { + t.Skipf("skipping acceptance testing: %s", err) + } + if err != nil { + t.Fatalf("unexpected PreCheck error: %s", err) + } +} + +func testAccPipelineConfig_basic(rName string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 +} +`, rName) +} + +func testAccPipelineConfig_tags1(rName string, key1, value1 string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 + + tags = { + %[2]q = %[3]q + } +} +`, rName, key1, value1) +} + +func testAccPipelineConfig_tags2(rName string, key1, value1, key2, value2 string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 + + tags = { + %[2]q = %[3]q + %[4]q = %[5]q + } +} +`, rName, key1, value1, key2, value2) +} + +func testAccPipelineConfig_bufferOptions(rName string, bufferEnabled bool) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 2 + min_units = 2 + + buffer_options { + persistent_buffer_enabled = %[2]t + } +} +`, rName, bufferEnabled) +} + +func testAccPipelineConfig_encryption(rName string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 + + encryption_at_rest_options { + kms_key_arn = aws_kms_key.test.arn + } +} + +resource "aws_kms_key" "test" { + description = %[1]q + deletion_window_in_days = 7 +} +`, rName) +} + +func testAccPipelineConfig_logPublishing(rName string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_cloudwatch_log_group" "test" { + name = "/aws/vendedlogs/OpenSearchIngestion/example-pipeline/test-logs" +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 + + log_publishing_options { + is_logging_enabled = true + cloudwatch_log_destination { + log_group = aws_cloudwatch_log_group.test.name + } + + } +} +`, rName) +} + +func testAccPipelineConfig_vpc(rName string) string { + return fmt.Sprintf(` +data "aws_region" "current" {} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_vpc" "test" { + cidr_block = "10.0.0.0/16" +} + +resource "aws_subnet" "test" { + cidr_block = "10.0.1.0/24" + vpc_id = aws_vpc.test.id +} + +resource "aws_security_group" "test" { + name = %[1]q + vpc_id = aws_vpc.test.id +} + +resource "aws_osis_pipeline" "test" { + pipeline_name = %[1]q + pipeline_configuration_body = <<-EOT + version: "2" + test-pipeline: + source: + http: + path: "/test" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.test.arn}" + region: "${data.aws_region.current.name}" + bucket: "test" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 + + vpc_options { + security_group_ids = [aws_security_group.test.id] + subnet_ids = [aws_subnet.test.id] + } +} +`, rName) +} diff --git a/internal/service/osis/service_package_gen.go b/internal/service/osis/service_package_gen.go index 75a8c07868f..d894ebf0cb2 100644 --- a/internal/service/osis/service_package_gen.go +++ b/internal/service/osis/service_package_gen.go @@ -19,7 +19,15 @@ func (p *servicePackage) FrameworkDataSources(ctx context.Context) []*types.Serv } func (p *servicePackage) FrameworkResources(ctx context.Context) []*types.ServicePackageFrameworkResource { - return []*types.ServicePackageFrameworkResource{} + return []*types.ServicePackageFrameworkResource{ + { + Factory: newPipelineResource, + Name: "Pipeline", + Tags: &types.ServicePackageResourceTags{ + IdentifierAttribute: "pipeline_arn", + }, + }, + } } func (p *servicePackage) SDKDataSources(ctx context.Context) []*types.ServicePackageSDKDataSource { diff --git a/internal/service/osis/tags_gen.go b/internal/service/osis/tags_gen.go new file mode 100644 index 00000000000..15e22819585 --- /dev/null +++ b/internal/service/osis/tags_gen.go @@ -0,0 +1,146 @@ +// Code generated by internal/generate/tags/main.go; DO NOT EDIT. +package osis + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/osis" + awstypes "github.com/aws/aws-sdk-go-v2/service/osis/types" + "github.com/hashicorp/terraform-plugin-log/tflog" + "github.com/hashicorp/terraform-provider-aws/internal/conns" + "github.com/hashicorp/terraform-provider-aws/internal/logging" + tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" + "github.com/hashicorp/terraform-provider-aws/internal/types/option" + "github.com/hashicorp/terraform-provider-aws/names" +) + +// listTags lists osis service tags. +// The identifier is typically the Amazon Resource Name (ARN), although +// it may also be a different identifier depending on the service. +func listTags(ctx context.Context, conn *osis.Client, identifier string, optFns ...func(*osis.Options)) (tftags.KeyValueTags, error) { + input := &osis.ListTagsForResourceInput{ + Arn: aws.String(identifier), + } + + output, err := conn.ListTagsForResource(ctx, input, optFns...) + + if err != nil { + return tftags.New(ctx, nil), err + } + + return KeyValueTags(ctx, output.Tags), nil +} + +// ListTags lists osis service tags and set them in Context. +// It is called from outside this package. +func (p *servicePackage) ListTags(ctx context.Context, meta any, identifier string) error { + tags, err := listTags(ctx, meta.(*conns.AWSClient).OpenSearchIngestionClient(ctx), identifier) + + if err != nil { + return err + } + + if inContext, ok := tftags.FromContext(ctx); ok { + inContext.TagsOut = option.Some(tags) + } + + return nil +} + +// []*SERVICE.Tag handling + +// Tags returns osis service tags. +func Tags(tags tftags.KeyValueTags) []awstypes.Tag { + result := make([]awstypes.Tag, 0, len(tags)) + + for k, v := range tags.Map() { + tag := awstypes.Tag{ + Key: aws.String(k), + Value: aws.String(v), + } + + result = append(result, tag) + } + + return result +} + +// KeyValueTags creates tftags.KeyValueTags from osis service tags. +func KeyValueTags(ctx context.Context, tags []awstypes.Tag) tftags.KeyValueTags { + m := make(map[string]*string, len(tags)) + + for _, tag := range tags { + m[aws.ToString(tag.Key)] = tag.Value + } + + return tftags.New(ctx, m) +} + +// getTagsIn returns osis service tags from Context. +// nil is returned if there are no input tags. +func getTagsIn(ctx context.Context) []awstypes.Tag { + if inContext, ok := tftags.FromContext(ctx); ok { + if tags := Tags(inContext.TagsIn.UnwrapOrDefault()); len(tags) > 0 { + return tags + } + } + + return nil +} + +// setTagsOut sets osis service tags in Context. +func setTagsOut(ctx context.Context, tags []awstypes.Tag) { + if inContext, ok := tftags.FromContext(ctx); ok { + inContext.TagsOut = option.Some(KeyValueTags(ctx, tags)) + } +} + +// updateTags updates osis service tags. +// The identifier is typically the Amazon Resource Name (ARN), although +// it may also be a different identifier depending on the service. +func updateTags(ctx context.Context, conn *osis.Client, identifier string, oldTagsMap, newTagsMap any, optFns ...func(*osis.Options)) error { + oldTags := tftags.New(ctx, oldTagsMap) + newTags := tftags.New(ctx, newTagsMap) + + ctx = tflog.SetField(ctx, logging.KeyResourceId, identifier) + + removedTags := oldTags.Removed(newTags) + removedTags = removedTags.IgnoreSystem(names.OpenSearchIngestion) + if len(removedTags) > 0 { + input := &osis.UntagResourceInput{ + Arn: aws.String(identifier), + TagKeys: removedTags.Keys(), + } + + _, err := conn.UntagResource(ctx, input, optFns...) + + if err != nil { + return fmt.Errorf("untagging resource (%s): %w", identifier, err) + } + } + + updatedTags := oldTags.Updated(newTags) + updatedTags = updatedTags.IgnoreSystem(names.OpenSearchIngestion) + if len(updatedTags) > 0 { + input := &osis.TagResourceInput{ + Arn: aws.String(identifier), + Tags: Tags(updatedTags), + } + + _, err := conn.TagResource(ctx, input, optFns...) + + if err != nil { + return fmt.Errorf("tagging resource (%s): %w", identifier, err) + } + } + + return nil +} + +// UpdateTags updates osis service tags. +// It is called from outside this package. +func (p *servicePackage) UpdateTags(ctx context.Context, meta any, identifier string, oldTags, newTags any) error { + return updateTags(ctx, meta.(*conns.AWSClient).OpenSearchIngestionClient(ctx), identifier, oldTags, newTags) +} diff --git a/names/names.go b/names/names.go index 967d5df7677..18bc4b09309 100644 --- a/names/names.go +++ b/names/names.go @@ -79,6 +79,7 @@ const ( MQEndpointID = "mq" ObservabilityAccessManagerEndpointID = "oam" OpenSearchServerlessEndpointID = "aoss" + OpenSearchIngestionEndpointID = "osis" PipesEndpointID = "pipes" PollyEndpointID = "polly" PricingEndpointID = "pricing" diff --git a/website/docs/r/osis_pipeline.html.markdown b/website/docs/r/osis_pipeline.html.markdown new file mode 100644 index 00000000000..00cb03435e6 --- /dev/null +++ b/website/docs/r/osis_pipeline.html.markdown @@ -0,0 +1,141 @@ +--- +subcategory: "OpenSearch Ingestion" +layout: "aws" +page_title: "AWS: aws_osis_pipeline" +description: |- + Terraform resource for managing an AWS OpenSearch Ingestion Pipeline. +--- + +# Resource: aws_osis_pipeline + +Terraform resource for managing an AWS OpenSearch Ingestion Pipeline. + +## Example Usage + +### Basic Usage + +```terraform +data "aws_region" "current" {} + +resource "aws_iam_role" "example" { + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "osis-pipelines.amazonaws.com" + } + }, + ] + }) +} + +resource "aws_osis_pipeline" "example" { + pipeline_name = "example" + pipeline_configuration_body = <<-EOT + version: "2" + example-pipeline: + source: + http: + path: "/example" + sink: + - s3: + aws: + sts_role_arn: "${aws_iam_role.example.arn}" + region: "${data.aws_region.current.name}" + bucket: "example" + threshold: + event_collect_timeout: "60s" + codec: + ndjson: + EOT + max_units = 1 + min_units = 1 +} +``` + +### Using file function + +```terraform +resource "aws_osis_pipeline" "example" { + pipeline_name = "example" + pipeline_configuration_body = file("example.yaml") + max_units = 1 + min_units = 1 +} +``` + +## Argument Reference + +The following arguments are required: + +* `max_units` - (Required) The maximum pipeline capacity, in Ingestion Compute Units (ICUs). +* `min_units` - (Required) The minimum pipeline capacity, in Ingestion Compute Units (ICUs). +* `pipeline_configuration_body` - (Required) The pipeline configuration in YAML format. This argument accepts the pipeline configuration as a string or within a .yaml file. If you provide the configuration as a string, each new line must be escaped with \n. +* `pipeline_name` - (Required) The name of the OpenSearch Ingestion pipeline to create. Pipeline names are unique across the pipelines owned by an account within an AWS Region. + +The following arguments are optional: + +* `buffer_options` - (Optional) Key-value pairs to configure persistent buffering for the pipeline. See [`buffer_options`](#buffer_options) below. +* `encryption_at_rest_options` - (Optional) Key-value pairs to configure encryption for data that is written to a persistent buffer. See [`encryption_at_rest_options`](#encryption_at_rest_options) below. +* `log_publishing_options` - (Optional) Key-value pairs to configure log publishing. See [`log_publishing_options`](#log_publishing_options) below. +* `tags` - (Optional) A map of tags to assign to the pipeline. If configured with a provider [`default_tags` configuration block](https://registry.terraform.io/providers/hashicorp/aws/latest/docs#default_tags-configuration-block) present, tags with matching keys will overwrite those defined at the provider-level. +* `vpc_options` - (Optional) Container for the values required to configure VPC access for the pipeline. If you don't specify these values, OpenSearch Ingestion creates the pipeline with a public endpoint. See [`vpc_options`](#vpc_options) below. + +### buffer_options + +* `persistent_buffer_enabled` - (Required) Whether persistent buffering should be enabled. + +### encryption_at_rest_options + +* `kms_key_arn` - (Required) The ARN of the KMS key used to encrypt data-at-rest in OpenSearch Ingestion. By default, data is encrypted using an AWS owned key. + +### log_publishing_options + +* `cloudwatch_log_destination` - (Optional) The destination for OpenSearch Ingestion logs sent to Amazon CloudWatch Logs. This parameter is required if IsLoggingEnabled is set to true. See [`cloudwatch_log_destination`](#cloudwatch_log_destination) below. +* `is_logging_enabled` - (Optional) Whether logs should be published. + +### cloudwatch_log_destination + +* `log_group` - (Required) The name of the CloudWatch Logs group to send pipeline logs to. You can specify an existing log group or create a new one. For example, /aws/OpenSearchService/IngestionService/my-pipeline. + +### vpc_options + +* `subnet_ids` - (Required) A list of subnet IDs associated with the VPC endpoint. +* `security_group_ids` - (Optional) A list of security groups associated with the VPC endpoint. + +## Attribute Reference + +This resource exports the following attributes in addition to the arguments above: + +* `id` - Unique identifier for the pipeline. +* `ingest_endpoint_urls` - The list of ingestion endpoints for the pipeline, which you can send data to. +* `pipeline_arn` - Amazon Resource Name (ARN) of the pipeline. + +## Timeouts + +[Configuration options](https://developer.hashicorp.com/terraform/language/resources/syntax#operation-timeouts): + +* `create` - (Default `45m`) +* `update` - (Default `45m`) +* `delete` - (Default `45m`) + +## Import + +In Terraform v1.5.0 and later, use an [`import` block](https://developer.hashicorp.com/terraform/language/import) to import OpenSearch Ingestion Pipeline using the `id`. For example: + +```terraform +import { + to = aws_osis_pipeline.example + id = "example" +} +``` + +Using `terraform import`, import OpenSearch Ingestion Pipeline using the `id`. For example: + +```console +% terraform import aws_osis_pipeline.example example +```