Skip to content

Commit

Permalink
[APIT-2476] Terraform provider does not work well when deploying a Fl…
Browse files Browse the repository at this point in the history
…ink Model/Statement that uses sql.secrets.*
  • Loading branch information
tmalikconfluent committed Dec 4, 2024
1 parent eb1cb1c commit 083e04b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 4 deletions.
20 changes: 20 additions & 0 deletions docs/resources/confluent_flink_statement.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,23 @@ resource "confluent_flink_statement" "example" {
}
```

Example of `confluent_flink_statement` that creates a model:
```
resource "confluent_flink_statement" "example" {
statement = "CREATE MODEL `vector_encoding` INPUT (input STRING) OUTPUT (vector ARRAY<FLOAT>) WITH( 'TASK' = 'classification','PROVIDER' = 'OPENAI','OPENAI.ENDPOINT' = 'https://api.openai.com/v1/embeddings','OPENAI.API_KEY' = '{{sessionconfig/sql.secrets.openaikey}}');"
properties = {
"sql.current-catalog" = var.confluent_environment_display_name
"sql.current-database" = var.confluent_kafka_cluster_display_name
}
properties_sensitive = {
"sql.secrets.openaikey" : "***REDACTED***"
}
lifecycle {
prevent_destroy = true
}
}
```

<!-- schema generated by tfplugindocs -->
## Argument Reference

Expand Down Expand Up @@ -107,6 +124,9 @@ The following arguments are supported:
- `properties` - (Optional Map) The custom topic settings to set:
- `name` - (Required String) The setting name, for example, `sql.local-time-zone`.
- `value` - (Required String) The setting value, for example, `GMT-08:00`.
- `properties_sensitive` - (Optional Map) Block for sensitive statement properties:
- `name` - (Required String) The setting name, for example, `sql.secrets.openaikey`.
- `value` - (Required String) The setting value, for example, `s1234`.

- `stopped` - (Optional Boolean) The boolean flag is used to indicate the statement's running status and to control whether the Flink Statement should be stopped or resumed. Defaults to `false`. Update it to `true` to stop the statement. Subsequently update it to `false` to resume the statement.

Expand Down
57 changes: 53 additions & 4 deletions internal/provider/resource_flink_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/samber/lo"
"net/http"
"regexp"
"strings"
"time"

"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff"

fgb "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1"
)
Expand All @@ -36,6 +38,7 @@ const (
paramStatement = "statement"
paramComputePool = "compute_pool"
paramProperties = "properties"
paramPropertiesSensitive = "properties_sensitive"
paramStopped = "stopped"
paramLatestOffsets = "latest_offsets"
paramLatestOffsetsTimestamp = "latest_offsets_timestamp"
Expand Down Expand Up @@ -86,6 +89,16 @@ func flinkStatementResource() *schema.Resource {
Optional: true,
Computed: true,
},
paramPropertiesSensitive: {
Type: schema.TypeMap,
Elem: &schema.Schema{
Type: schema.TypeString,
},
Sensitive: true,
Optional: true,
Computed: true,
ForceNew: false,
},
paramStopped: {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -157,11 +170,14 @@ func flinkStatementCreate(ctx context.Context, d *schema.ResourceData, meta inte
}

statement := d.Get(paramStatement).(string)
properties := convertToStringStringMap(d.Get(paramProperties).(map[string]interface{}))

mergedProperties, sensitiveProperties, _ := extractFlinkProperties(d)

tflog.Debug(ctx, fmt.Sprintf("SENSITIVE VALUES: %s", sensitiveProperties))

spec := fgb.NewSqlV1StatementSpec()
spec.SetStatement(statement)
spec.SetProperties(properties)
spec.SetProperties(mergedProperties)
spec.SetComputePoolId(computePoolId)
spec.SetPrincipal(principalId)

Expand All @@ -173,6 +189,11 @@ func flinkStatementCreate(ctx context.Context, d *schema.ResourceData, meta inte
if err != nil {
return diag.Errorf("error creating Flink Statement: error marshaling %#v to json: %s", createFlinkStatementRequest, createDescriptiveError(err))
}

if err := d.Set(paramPropertiesSensitive, sensitiveProperties); err != nil {
return diag.FromErr(createDescriptiveError(err))
}

tflog.Debug(ctx, fmt.Sprintf("Creating new Flink Statement: %s", createFlinkStatementRequestJson))

createdFlinkStatement, _, err := executeFlinkStatementCreate(flinkRestClient.apiContext(ctx), flinkRestClient, createFlinkStatementRequest)
Expand Down Expand Up @@ -425,7 +446,7 @@ func setFlinkStatementAttributes(d *schema.ResourceData, c *FlinkRestClient, sta
if err := d.Set(paramStatement, statement.Spec.GetStatement()); err != nil {
return nil, err
}
if err := d.Set(paramProperties, statement.Spec.GetProperties()); err != nil {
if err := d.Set(paramProperties, extractNonsensitiveProperties(statement.Spec.GetProperties())); err != nil {
return nil, err
}
if err := d.Set(paramStopped, statement.Spec.GetStopped()); err != nil {
Expand Down Expand Up @@ -463,6 +484,21 @@ func setFlinkStatementAttributes(d *schema.ResourceData, c *FlinkRestClient, sta
return d, nil
}

func extractNonsensitiveProperties(properties map[string]string) map[string]string {
nonsensitiveProperties := make(map[string]string)

for propertiesSettingName, propertiesSettingValue := range properties {
// Skip all sensitive config settings since we don't want to store them in TF state
isSensitiveSetting := strings.HasPrefix(propertiesSettingName, "sql.secrets")
if isSensitiveSetting {
continue
}
nonsensitiveProperties[propertiesSettingName] = propertiesSettingValue
}

return nonsensitiveProperties
}

func flinkStatementDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
tflog.Debug(ctx, fmt.Sprintf("Deleting Flink Statement %q", d.Id()), map[string]interface{}{flinkStatementLoggingKey: d.Id()})

Expand Down Expand Up @@ -702,6 +738,19 @@ func createFlinkStatementId(environmentId, computePoolId, statementName string)
return fmt.Sprintf("%s/%s/%s", environmentId, computePoolId, statementName)
}

func extractFlinkProperties(d *schema.ResourceData) (map[string]string, map[string]string, map[string]string) {
sensitiveProperties := convertToStringStringMap(d.Get(paramPropertiesSensitive).(map[string]interface{}))
nonsensitiveProperties := convertToStringStringMap(d.Get(paramProperties).(map[string]interface{}))

// Merge both configs
properties := lo.Assign(
nonsensitiveProperties,
sensitiveProperties,
)

return properties, sensitiveProperties, nonsensitiveProperties
}

func resourceFlinkStatementDiff(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error {
// Allow new resource creation without restriction
if diff.Id() == "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func TestAccFlinkStatementWithEnhancedProviderBlock(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampEmptyValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "0"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.secret"),
Expand Down Expand Up @@ -248,6 +249,7 @@ func TestAccFlinkStatementWithEnhancedProviderBlock(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampStoppedValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "0"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.secret"),
Expand Down
2 changes: 2 additions & 0 deletions internal/provider/resource_flink_statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func TestAccFlinkStatement(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampEmptyValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.%", "2"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key", kafkaApiKey),
Expand Down Expand Up @@ -280,6 +281,7 @@ func TestAccFlinkStatement(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampStoppedValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.%", "2"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key", kafkaApiKey),
Expand Down

0 comments on commit 083e04b

Please sign in to comment.