Skip to content

Commit

Permalink
Revert "[APIT-2476] Terraform provider does not work well when deploy…
Browse files Browse the repository at this point in the history
…ing a Flink Model/Statement that uses sql.secrets.* (#503)" (#529)

This reverts commit 100a3a4.

Revert
  • Loading branch information
tmalikconfluent authored Jan 15, 2025
1 parent f7037e6 commit fdc71bf
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 75 deletions.
20 changes: 0 additions & 20 deletions docs/resources/confluent_flink_statement.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,6 @@ resource "confluent_flink_statement" "example" {
}
```

Example of `confluent_flink_statement` that creates a model:
```
resource "confluent_flink_statement" "example" {
statement = "CREATE MODEL `vector_encoding` INPUT (input STRING) OUTPUT (vector ARRAY<FLOAT>) WITH( 'TASK' = 'classification','PROVIDER' = 'OPENAI','OPENAI.ENDPOINT' = 'https://api.openai.com/v1/embeddings','OPENAI.API_KEY' = '{{sessionconfig/sql.secrets.openaikey}}');"
properties = {
"sql.current-catalog" = var.confluent_environment_display_name
"sql.current-database" = var.confluent_kafka_cluster_display_name
}
properties_sensitive = {
"sql.secrets.openaikey" : "***REDACTED***"
}
lifecycle {
prevent_destroy = true
}
}
```

<!-- schema generated by tfplugindocs -->
## Argument Reference

Expand Down Expand Up @@ -124,9 +107,6 @@ The following arguments are supported:
- `properties` - (Optional Map) The custom topic settings to set:
- `name` - (Required String) The setting name, for example, `sql.local-time-zone`.
- `value` - (Required String) The setting value, for example, `GMT-08:00`.
- `properties_sensitive` - (Optional Map) Block for sensitive statement properties:
- `name` - (Required String) The setting name, for example, `sql.secrets.openaikey`.
- `value` - (Required String) The setting value, for example, `s1234`.

- `stopped` - (Optional Boolean) The boolean flag is used to indicate the statement's running status and to control whether the Flink Statement should be stopped or resumed. Defaults to `false`. Update it to `true` to stop the statement. Subsequently update it to `false` to resume the statement.

Expand Down
55 changes: 4 additions & 51 deletions internal/provider/resource_flink_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/samber/lo"
"net/http"
"regexp"
"strings"
"time"

"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff"

fgb "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1"
)
Expand All @@ -38,7 +36,6 @@ const (
paramStatement = "statement"
paramComputePool = "compute_pool"
paramProperties = "properties"
paramPropertiesSensitive = "properties_sensitive"
paramStopped = "stopped"
paramLatestOffsets = "latest_offsets"
paramLatestOffsetsTimestamp = "latest_offsets_timestamp"
Expand Down Expand Up @@ -89,16 +86,6 @@ func flinkStatementResource() *schema.Resource {
Optional: true,
Computed: true,
},
paramPropertiesSensitive: {
Type: schema.TypeMap,
Elem: &schema.Schema{
Type: schema.TypeString,
},
Sensitive: true,
Optional: true,
Computed: true,
ForceNew: false,
},
paramStopped: {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -170,12 +157,11 @@ func flinkStatementCreate(ctx context.Context, d *schema.ResourceData, meta inte
}

statement := d.Get(paramStatement).(string)

mergedProperties, sensitiveProperties, _ := extractFlinkProperties(d)
properties := convertToStringStringMap(d.Get(paramProperties).(map[string]interface{}))

spec := fgb.NewSqlV1StatementSpec()
spec.SetStatement(statement)
spec.SetProperties(mergedProperties)
spec.SetProperties(properties)
spec.SetComputePoolId(computePoolId)
spec.SetPrincipal(principalId)

Expand All @@ -187,11 +173,6 @@ func flinkStatementCreate(ctx context.Context, d *schema.ResourceData, meta inte
if err != nil {
return diag.Errorf("error creating Flink Statement: error marshaling %#v to json: %s", createFlinkStatementRequest, createDescriptiveError(err))
}

if err := d.Set(paramPropertiesSensitive, sensitiveProperties); err != nil {
return diag.FromErr(createDescriptiveError(err))
}

tflog.Debug(ctx, fmt.Sprintf("Creating new Flink Statement: %s", createFlinkStatementRequestJson))

createdFlinkStatement, _, err := executeFlinkStatementCreate(flinkRestClient.apiContext(ctx), flinkRestClient, createFlinkStatementRequest)
Expand Down Expand Up @@ -444,7 +425,7 @@ func setFlinkStatementAttributes(d *schema.ResourceData, c *FlinkRestClient, sta
if err := d.Set(paramStatement, statement.Spec.GetStatement()); err != nil {
return nil, err
}
if err := d.Set(paramProperties, extractNonsensitiveProperties(statement.Spec.GetProperties())); err != nil {
if err := d.Set(paramProperties, statement.Spec.GetProperties()); err != nil {
return nil, err
}
if err := d.Set(paramStopped, statement.Spec.GetStopped()); err != nil {
Expand Down Expand Up @@ -482,21 +463,6 @@ func setFlinkStatementAttributes(d *schema.ResourceData, c *FlinkRestClient, sta
return d, nil
}

func extractNonsensitiveProperties(properties map[string]string) map[string]string {
nonsensitiveProperties := make(map[string]string)

for propertiesSettingName, propertiesSettingValue := range properties {
// Skip all sensitive config settings since we don't want to store them in TF state
isSensitiveSetting := strings.HasPrefix(propertiesSettingName, "sql.secrets")
if isSensitiveSetting {
continue
}
nonsensitiveProperties[propertiesSettingName] = propertiesSettingValue
}

return nonsensitiveProperties
}

func flinkStatementDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
tflog.Debug(ctx, fmt.Sprintf("Deleting Flink Statement %q", d.Id()), map[string]interface{}{flinkStatementLoggingKey: d.Id()})

Expand Down Expand Up @@ -736,19 +702,6 @@ func createFlinkStatementId(environmentId, computePoolId, statementName string)
return fmt.Sprintf("%s/%s/%s", environmentId, computePoolId, statementName)
}

func extractFlinkProperties(d *schema.ResourceData) (map[string]string, map[string]string, map[string]string) {
sensitiveProperties := convertToStringStringMap(d.Get(paramPropertiesSensitive).(map[string]interface{}))
nonsensitiveProperties := convertToStringStringMap(d.Get(paramProperties).(map[string]interface{}))

// Merge both configs
properties := lo.Assign(
nonsensitiveProperties,
sensitiveProperties,
)

return properties, sensitiveProperties, nonsensitiveProperties
}

func resourceFlinkStatementDiff(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error {
// Allow new resource creation without restriction
if diff.Id() == "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ func TestAccFlinkStatementWithEnhancedProviderBlock(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampEmptyValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "0"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.secret"),
Expand Down Expand Up @@ -249,7 +248,6 @@ func TestAccFlinkStatementWithEnhancedProviderBlock(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampStoppedValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "0"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.secret"),
Expand Down
2 changes: 0 additions & 2 deletions internal/provider/resource_flink_statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func TestAccFlinkStatement(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampEmptyValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.%", "2"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key", kafkaApiKey),
Expand Down Expand Up @@ -281,7 +280,6 @@ func TestAccFlinkStatement(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampStoppedValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.%", "2"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key", kafkaApiKey),
Expand Down

0 comments on commit fdc71bf

Please sign in to comment.