diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index 9a90f6e83b..8d81af131b 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -91,8 +91,10 @@ jobs: # # Only list the secrets you need for the component. CRON_COMPONENTS=$(yq -I0 --tojson eval - << EOF - - component: state.cosmosdb + - component: state.azure.cosmosdb required-secrets: AzureCosmosDBMasterKey,AzureCosmosDBUrl,AzureCosmosDB,AzureCosmosDBCollection + - component: state.azure.tablestorage + required-secrets: AzureBlobStorageAccessKey,AzureBlobStorageAccount - component: pubsub.azure.servicebus required-secrets: AzureServiceBusConnectionString - component: bindings.azure.blobstorage diff --git a/state/azure/tablestorage/tablestorage.go b/state/azure/tablestorage/tablestorage.go index becc3d8e06..61d1bc133c 100644 --- a/state/azure/tablestorage/tablestorage.go +++ b/state/azure/tablestorage/tablestorage.go @@ -107,6 +107,9 @@ func (r *StateStore) Delete(req *state.DeleteRequest) error { if err != nil { if req.ETag != nil { return state.NewETagError(state.ETagMismatch, err) + } else if isNotFoundError(err) { + // deleting an item that doesn't exist without specifying an ETAG is a noop + return nil } } @@ -138,11 +141,6 @@ func (r *StateStore) Set(req *state.SetRequest) error { r.logger.Debugf("saving %s", req.Key) err := r.writeRow(req) - if err != nil { - if req.ETag != nil { - return state.NewETagError(state.ETagMismatch, err) - } - } return err } @@ -195,20 +193,41 @@ func (r *StateStore) writeRow(req *state.SetRequest) error { } entity.OdataEtag = etag - // InsertOrReplace does not support ETag concurrency, therefore we will try to use Update method first - // as it's more frequent, and then Insert + // InsertOrReplace does not support ETag concurrency, therefore we will use Insert to check for key existence + // and then use Update to update the key if it exists with the specified ETag - err := entity.Update(false, nil) + err := entity.Insert(storage.FullMetadata, nil) if err != nil { - if isNotFoundError(err) { - // When entity is not found (set state first time) create it - entity.OdataEtag = "" - - return entity.Insert(storage.FullMetadata, nil) + // If Insert failed because item already exists, try to Update instead per Upsert semantics + if isEntityAlreadyExistsError(err) { + // Always Update using the etag when provided even if Concurrency != FirstWrite. + // Today the presence of etag takes precedence over Concurrency. + // In the future #2739 will impose a breaking change which must disallow the use of etag when not using FirstWrite. + if etag != "" { + uerr := entity.Update(false, nil) + if uerr != nil { + if isNotFoundError(uerr) { + return state.NewETagError(state.ETagMismatch, uerr) + } + return uerr + } + } else if req.Options.Concurrency == state.FirstWrite { + // Otherwise, if FirstWrite was set, but no etag was provided for an Update operation + // explicitly flag it as an error. + // entity.Update itself does not flag the test case as a mismatch as it does not distinguish + // between nil and "" etags, the initial etag will always be "", which would match on update. + return state.NewETagError(state.ETagMismatch, errors.New("update with Concurrency.FirstWrite without ETag")) + } else { + // Finally, last write semantics without ETag should always perform a force update. + return entity.Update(true, nil) + } + } else { + // Any other unexpected error on Insert is propagated to the caller + return err } } - return err + return nil } func isNotFoundError(err error) bool { @@ -217,6 +236,12 @@ func isNotFoundError(err error) bool { return ok && azureError.Code == "ResourceNotFound" } +func isEntityAlreadyExistsError(err error) bool { + azureError, ok := err.(storage.AzureStorageServiceError) + + return ok && azureError.Code == "EntityAlreadyExists" +} + func isTableAlreadyExistsError(err error) bool { azureError, ok := err.(storage.AzureStorageServiceError) @@ -227,12 +252,16 @@ func (r *StateStore) deleteRow(req *state.DeleteRequest) error { pk, rk := getPartitionAndRowKey(req.Key) entity := r.table.GetEntityReference(pk, rk) - var etag string if req.ETag != nil { - etag = *req.ETag + entity.OdataEtag = *req.ETag + + // force=false sets the "If-Match: " header to ensure that the delete is only performed if the + // entity's ETag matches the specified ETag + return entity.Delete(false, nil) } - entity.OdataEtag = etag + // force=true sets the "If-Match: *" header to ensure that we delete a matching entity + // regardless of the entity's ETag value return entity.Delete(true, nil) } diff --git a/tests/config/state/cosmosdb/statestore.yaml b/tests/config/state/azure/cosmosdb/statestore.yaml similarity index 100% rename from tests/config/state/cosmosdb/statestore.yaml rename to tests/config/state/azure/cosmosdb/statestore.yaml diff --git a/tests/config/state/azure/tablestorage/statestore.yml b/tests/config/state/azure/tablestorage/statestore.yml new file mode 100644 index 0000000000..8bd3d7a701 --- /dev/null +++ b/tests/config/state/azure/tablestorage/statestore.yml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.azure.tablestorage + version: v1 + metadata: + - name: accountName + value: ${{AzureBlobStorageAccount}} + - name: accountKey + value: ${{AzureBlobStorageAccessKey}} + - name: tableName + value: TestTable diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index 711ed0555f..a4e50a638e 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -5,10 +5,11 @@ components: allOperations: true - component: mongodb allOperations: true - - component: cosmosdb + - component: azure.cosmosdb allOperations: true - component: sqlserver allOperations: true - component: mysql - allOperations: false operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag" ] + - component: azure.tablestorage + operations: ["set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write"] diff --git a/tests/conformance/common.go b/tests/conformance/common.go index ef0c91f505..50ca6af90b 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -52,6 +52,7 @@ import ( ss_local_env "github.com/dapr/components-contrib/secretstores/local/env" ss_local_file "github.com/dapr/components-contrib/secretstores/local/file" s_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb" + s_azuretablestorage "github.com/dapr/components-contrib/state/azure/tablestorage" s_mongodb "github.com/dapr/components-contrib/state/mongodb" s_mysql "github.com/dapr/components-contrib/state/mysql" s_redis "github.com/dapr/components-contrib/state/redis" @@ -369,7 +370,7 @@ func loadStateStore(tc TestComponent) state.Store { switch tc.Component { case redis: store = s_redis.NewRedisStateStore(testLogger) - case "cosmosdb": + case "azure.cosmosdb": store = s_cosmosdb.NewCosmosDBStateStore(testLogger) case "mongodb": store = s_mongodb.NewMongoDB(testLogger) @@ -377,6 +378,8 @@ func loadStateStore(tc TestComponent) state.Store { store = s_sqlserver.NewSQLServerStateStore(testLogger) case "mysql": store = s_mysql.NewMySQLStateStore(testLogger) + case "azure.tablestorage": + store = s_azuretablestorage.NewAzureTablesStateStore(testLogger) default: return nil }