Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/conformance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ jobs:
#
# Only list the secrets you need for the component.
CRON_COMPONENTS=$(yq -I0 --tojson eval - << EOF
- component: state.cosmosdb
- component: state.azure.cosmosdb
required-secrets: AzureCosmosDBMasterKey,AzureCosmosDBUrl,AzureCosmosDB,AzureCosmosDBCollection
- component: state.azure.tablestorage
required-secrets: AzureBlobStorageAccessKey,AzureBlobStorageAccount
- component: pubsub.azure.servicebus
required-secrets: AzureServiceBusConnectionString
- component: bindings.azure.blobstorage
Expand Down
63 changes: 46 additions & 17 deletions state/azure/tablestorage/tablestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ func (r *StateStore) Delete(req *state.DeleteRequest) error {
if err != nil {
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
} else if isNotFoundError(err) {
// deleting an item that doesn't exist without specifying an ETAG is a noop
return nil
}
}

Expand Down Expand Up @@ -138,11 +141,6 @@ func (r *StateStore) Set(req *state.SetRequest) error {
r.logger.Debugf("saving %s", req.Key)

err := r.writeRow(req)
if err != nil {
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
}
}

return err
}
Expand Down Expand Up @@ -195,20 +193,41 @@ func (r *StateStore) writeRow(req *state.SetRequest) error {
}
entity.OdataEtag = etag

// InsertOrReplace does not support ETag concurrency, therefore we will try to use Update method first
// as it's more frequent, and then Insert
// InsertOrReplace does not support ETag concurrency, therefore we will use Insert to check for key existence
// and then use Update to update the key if it exists with the specified ETag

err := entity.Update(false, nil)
err := entity.Insert(storage.FullMetadata, nil)
if err != nil {
if isNotFoundError(err) {
// When entity is not found (set state first time) create it
entity.OdataEtag = ""

return entity.Insert(storage.FullMetadata, nil)
// If Insert failed because item already exists, try to Update instead per Upsert semantics
if isEntityAlreadyExistsError(err) {
// Always Update using the etag when provided even if Concurrency != FirstWrite.
// Today the presence of etag takes precedence over Concurrency.
// In the future #2739 will impose a breaking change which must disallow the use of etag when not using FirstWrite.
if etag != "" {
uerr := entity.Update(false, nil)
if uerr != nil {
if isNotFoundError(uerr) {
return state.NewETagError(state.ETagMismatch, uerr)
}
return uerr
}
} else if req.Options.Concurrency == state.FirstWrite {
// Otherwise, if FirstWrite was set, but no etag was provided for an Update operation
// explicitly flag it as an error.
// entity.Update itself does not flag the test case as a mismatch as it does not distinguish
// between nil and "" etags, the initial etag will always be "", which would match on update.
return state.NewETagError(state.ETagMismatch, errors.New("update with Concurrency.FirstWrite without ETag"))
} else {
// Finally, last write semantics without ETag should always perform a force update.
return entity.Update(true, nil)
}
} else {
// Any other unexpected error on Insert is propagated to the caller
return err
}
}

return err
return nil
}

func isNotFoundError(err error) bool {
Expand All @@ -217,6 +236,12 @@ func isNotFoundError(err error) bool {
return ok && azureError.Code == "ResourceNotFound"
}

func isEntityAlreadyExistsError(err error) bool {
azureError, ok := err.(storage.AzureStorageServiceError)

return ok && azureError.Code == "EntityAlreadyExists"
}

func isTableAlreadyExistsError(err error) bool {
azureError, ok := err.(storage.AzureStorageServiceError)

Expand All @@ -227,12 +252,16 @@ func (r *StateStore) deleteRow(req *state.DeleteRequest) error {
pk, rk := getPartitionAndRowKey(req.Key)
entity := r.table.GetEntityReference(pk, rk)

var etag string
if req.ETag != nil {
etag = *req.ETag
entity.OdataEtag = *req.ETag

// force=false sets the "If-Match: <ETag>" header to ensure that the delete is only performed if the
// entity's ETag matches the specified ETag
return entity.Delete(false, nil)
}
entity.OdataEtag = etag

// force=true sets the "If-Match: *" header to ensure that we delete a matching entity
// regardless of the entity's ETag value
return entity.Delete(true, nil)
}

Expand Down
14 changes: 14 additions & 0 deletions tests/config/state/azure/tablestorage/statestore.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.azure.tablestorage
version: v1
metadata:
- name: accountName
value: ${{AzureBlobStorageAccount}}
- name: accountKey
value: ${{AzureBlobStorageAccessKey}}
- name: tableName
value: TestTable
5 changes: 3 additions & 2 deletions tests/config/state/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ components:
allOperations: true
- component: mongodb
allOperations: true
- component: cosmosdb
- component: azure.cosmosdb
allOperations: true
- component: sqlserver
allOperations: true
- component: mysql
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag" ]
- component: azure.tablestorage
operations: ["set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write"]
5 changes: 4 additions & 1 deletion tests/conformance/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
ss_local_env "github.com/dapr/components-contrib/secretstores/local/env"
ss_local_file "github.com/dapr/components-contrib/secretstores/local/file"
s_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb"
s_azuretablestorage "github.com/dapr/components-contrib/state/azure/tablestorage"
s_mongodb "github.com/dapr/components-contrib/state/mongodb"
s_mysql "github.com/dapr/components-contrib/state/mysql"
s_redis "github.com/dapr/components-contrib/state/redis"
Expand Down Expand Up @@ -369,14 +370,16 @@ func loadStateStore(tc TestComponent) state.Store {
switch tc.Component {
case redis:
store = s_redis.NewRedisStateStore(testLogger)
case "cosmosdb":
case "azure.cosmosdb":
store = s_cosmosdb.NewCosmosDBStateStore(testLogger)
case "mongodb":
store = s_mongodb.NewMongoDB(testLogger)
case "sqlserver":
store = s_sqlserver.NewSQLServerStateStore(testLogger)
case "mysql":
store = s_mysql.NewMySQLStateStore(testLogger)
case "azure.tablestorage":
store = s_azuretablestorage.NewAzureTablesStateStore(testLogger)
default:
return nil
}
Expand Down