diff --git a/docs/resources/confluent_flink_statement.md b/docs/resources/confluent_flink_statement.md index 2f4223fb..ee301427 100644 --- a/docs/resources/confluent_flink_statement.md +++ b/docs/resources/confluent_flink_statement.md @@ -80,6 +80,23 @@ resource "confluent_flink_statement" "example" { } ``` +Example of `confluent_flink_statement` that creates a model: +``` +resource "confluent_flink_statement" "example" { + statement = "CREATE MODEL `vector_encoding` INPUT (input STRING) OUTPUT (vector ARRAY) WITH( 'TASK' = 'classification','PROVIDER' = 'OPENAI','OPENAI.ENDPOINT' = 'https://api.openai.com/v1/embeddings','OPENAI.API_KEY' = '{{sessionconfig/sql.secrets.openaikey}}');" + properties = { + "sql.current-catalog" = var.confluent_environment_display_name + "sql.current-database" = var.confluent_kafka_cluster_display_name + } + properties_sensitive = { + "sql.secrets.openaikey" : "***REDACTED***" + } + lifecycle { + prevent_destroy = true + } +} +``` + ## Argument Reference @@ -107,6 +124,9 @@ The following arguments are supported: - `properties` - (Optional Map) The custom topic settings to set: - `name` - (Required String) The setting name, for example, `sql.local-time-zone`. - `value` - (Required String) The setting value, for example, `GMT-08:00`. +- `properties_sensitive` - (Optional Map) Block for sensitive statement properties: + - `name` - (Required String) The setting name, for example, `sql.secrets.openaikey`. + - `value` - (Required String) The setting value, for example, `s1234`. - `stopped` - (Optional Boolean) The boolean flag is used to indicate the statement's running status and to control whether the Flink Statement should be stopped or resumed. Defaults to `false`. Update it to `true` to stop the statement. Subsequently update it to `false` to resume the statement. diff --git a/internal/provider/resource_flink_statement.go b/internal/provider/resource_flink_statement.go index 7fd48bd2..dcac6b83 100644 --- a/internal/provider/resource_flink_statement.go +++ b/internal/provider/resource_flink_statement.go @@ -18,15 +18,17 @@ import ( "context" "encoding/json" "fmt" + "github.com/samber/lo" "net/http" "regexp" + "strings" "time" "github.com/hashicorp/terraform-plugin-log/tflog" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff" fgb "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1" ) @@ -36,6 +38,7 @@ const ( paramStatement = "statement" paramComputePool = "compute_pool" paramProperties = "properties" + paramPropertiesSensitive = "properties_sensitive" paramStopped = "stopped" paramLatestOffsets = "latest_offsets" paramLatestOffsetsTimestamp = "latest_offsets_timestamp" @@ -86,6 +89,16 @@ func flinkStatementResource() *schema.Resource { Optional: true, Computed: true, }, + paramPropertiesSensitive: { + Type: schema.TypeMap, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + Sensitive: true, + Optional: true, + Computed: true, + ForceNew: false, + }, paramStopped: { Type: schema.TypeBool, Optional: true, @@ -157,11 +170,14 @@ func flinkStatementCreate(ctx context.Context, d *schema.ResourceData, meta inte } statement := d.Get(paramStatement).(string) - properties := convertToStringStringMap(d.Get(paramProperties).(map[string]interface{})) + + mergedProperties, sensitiveProperties, _ := extractFlinkProperties(d) + + tflog.Debug(ctx, fmt.Sprintf("SENSITIVE VALUES: %s", sensitiveProperties)) spec := fgb.NewSqlV1StatementSpec() spec.SetStatement(statement) - spec.SetProperties(properties) + spec.SetProperties(mergedProperties) spec.SetComputePoolId(computePoolId) spec.SetPrincipal(principalId) @@ -173,6 +189,11 @@ func flinkStatementCreate(ctx context.Context, d *schema.ResourceData, meta inte if err != nil { return diag.Errorf("error creating Flink Statement: error marshaling %#v to json: %s", createFlinkStatementRequest, createDescriptiveError(err)) } + + if err := d.Set(paramPropertiesSensitive, sensitiveProperties); err != nil { + return diag.FromErr(createDescriptiveError(err)) + } + tflog.Debug(ctx, fmt.Sprintf("Creating new Flink Statement: %s", createFlinkStatementRequestJson)) createdFlinkStatement, _, err := executeFlinkStatementCreate(flinkRestClient.apiContext(ctx), flinkRestClient, createFlinkStatementRequest) @@ -425,7 +446,7 @@ func setFlinkStatementAttributes(d *schema.ResourceData, c *FlinkRestClient, sta if err := d.Set(paramStatement, statement.Spec.GetStatement()); err != nil { return nil, err } - if err := d.Set(paramProperties, statement.Spec.GetProperties()); err != nil { + if err := d.Set(paramProperties, extractNonsensitiveProperties(statement.Spec.GetProperties())); err != nil { return nil, err } if err := d.Set(paramStopped, statement.Spec.GetStopped()); err != nil { @@ -463,6 +484,21 @@ func setFlinkStatementAttributes(d *schema.ResourceData, c *FlinkRestClient, sta return d, nil } +func extractNonsensitiveProperties(properties map[string]string) map[string]string { + nonsensitiveProperties := make(map[string]string) + + for propertiesSettingName, propertiesSettingValue := range properties { + // Skip all sensitive config settings since we don't want to store them in TF state + isSensitiveSetting := strings.HasPrefix(propertiesSettingName, "sql.secrets") + if isSensitiveSetting { + continue + } + nonsensitiveProperties[propertiesSettingName] = propertiesSettingValue + } + + return nonsensitiveProperties +} + func flinkStatementDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { tflog.Debug(ctx, fmt.Sprintf("Deleting Flink Statement %q", d.Id()), map[string]interface{}{flinkStatementLoggingKey: d.Id()}) @@ -702,6 +738,19 @@ func createFlinkStatementId(environmentId, computePoolId, statementName string) return fmt.Sprintf("%s/%s/%s", environmentId, computePoolId, statementName) } +func extractFlinkProperties(d *schema.ResourceData) (map[string]string, map[string]string, map[string]string) { + sensitiveProperties := convertToStringStringMap(d.Get(paramPropertiesSensitive).(map[string]interface{})) + nonsensitiveProperties := convertToStringStringMap(d.Get(paramProperties).(map[string]interface{})) + + // Merge both configs + properties := lo.Assign( + nonsensitiveProperties, + sensitiveProperties, + ) + + return properties, sensitiveProperties, nonsensitiveProperties +} + func resourceFlinkStatementDiff(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error { // Allow new resource creation without restriction if diff.Id() == "" { diff --git a/internal/provider/resource_flink_statement_provider_block_test.go b/internal/provider/resource_flink_statement_provider_block_test.go index d85eac6f..2fc4b573 100644 --- a/internal/provider/resource_flink_statement_provider_block_test.go +++ b/internal/provider/resource_flink_statement_provider_block_test.go @@ -221,6 +221,7 @@ func TestAccFlinkStatementWithEnhancedProviderBlock(t *testing.T) { resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampEmptyValueTest), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest), + resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "0"), resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key"), resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.secret"), @@ -248,6 +249,7 @@ func TestAccFlinkStatementWithEnhancedProviderBlock(t *testing.T) { resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampStoppedValueTest), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest), + resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "0"), resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key"), resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.secret"), diff --git a/internal/provider/resource_flink_statement_test.go b/internal/provider/resource_flink_statement_test.go index 64c7a919..ce7c0b4d 100644 --- a/internal/provider/resource_flink_statement_test.go +++ b/internal/provider/resource_flink_statement_test.go @@ -217,6 +217,7 @@ func TestAccFlinkStatement(t *testing.T) { resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampEmptyValueTest), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest), + resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "1"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.%", "2"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key", kafkaApiKey), @@ -280,6 +281,7 @@ func TestAccFlinkStatement(t *testing.T) { resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampStoppedValueTest), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest), + resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "1"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.%", "2"), resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key", kafkaApiKey),