Skip to content

Commit dafa457

Browse files
authored
Merge pull request #1159 from berndverst/tablestorage
Conformance Tests for Azure Table Storage
2 parents 988fed0 + 905cbc0 commit dafa457

File tree

6 files changed

+70
-21
lines changed

6 files changed

+70
-21
lines changed

.github/workflows/conformance.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,10 @@ jobs:
9191
#
9292
# Only list the secrets you need for the component.
9393
CRON_COMPONENTS=$(yq -I0 --tojson eval - << EOF
94-
- component: state.cosmosdb
94+
- component: state.azure.cosmosdb
9595
required-secrets: AzureCosmosDBMasterKey,AzureCosmosDBUrl,AzureCosmosDB,AzureCosmosDBCollection
96+
- component: state.azure.tablestorage
97+
required-secrets: AzureBlobStorageAccessKey,AzureBlobStorageAccount
9698
- component: pubsub.azure.servicebus
9799
required-secrets: AzureServiceBusConnectionString
98100
- component: bindings.azure.blobstorage

state/azure/tablestorage/tablestorage.go

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ func (r *StateStore) Delete(req *state.DeleteRequest) error {
107107
if err != nil {
108108
if req.ETag != nil {
109109
return state.NewETagError(state.ETagMismatch, err)
110+
} else if isNotFoundError(err) {
111+
// deleting an item that doesn't exist without specifying an ETAG is a noop
112+
return nil
110113
}
111114
}
112115

@@ -138,11 +141,6 @@ func (r *StateStore) Set(req *state.SetRequest) error {
138141
r.logger.Debugf("saving %s", req.Key)
139142

140143
err := r.writeRow(req)
141-
if err != nil {
142-
if req.ETag != nil {
143-
return state.NewETagError(state.ETagMismatch, err)
144-
}
145-
}
146144

147145
return err
148146
}
@@ -195,20 +193,41 @@ func (r *StateStore) writeRow(req *state.SetRequest) error {
195193
}
196194
entity.OdataEtag = etag
197195

198-
// InsertOrReplace does not support ETag concurrency, therefore we will try to use Update method first
199-
// as it's more frequent, and then Insert
196+
// InsertOrReplace does not support ETag concurrency, therefore we will use Insert to check for key existence
197+
// and then use Update to update the key if it exists with the specified ETag
200198

201-
err := entity.Update(false, nil)
199+
err := entity.Insert(storage.FullMetadata, nil)
202200
if err != nil {
203-
if isNotFoundError(err) {
204-
// When entity is not found (set state first time) create it
205-
entity.OdataEtag = ""
206-
207-
return entity.Insert(storage.FullMetadata, nil)
201+
// If Insert failed because item already exists, try to Update instead per Upsert semantics
202+
if isEntityAlreadyExistsError(err) {
203+
// Always Update using the etag when provided even if Concurrency != FirstWrite.
204+
// Today the presence of etag takes precedence over Concurrency.
205+
// In the future #2739 will impose a breaking change which must disallow the use of etag when not using FirstWrite.
206+
if etag != "" {
207+
uerr := entity.Update(false, nil)
208+
if uerr != nil {
209+
if isNotFoundError(uerr) {
210+
return state.NewETagError(state.ETagMismatch, uerr)
211+
}
212+
return uerr
213+
}
214+
} else if req.Options.Concurrency == state.FirstWrite {
215+
// Otherwise, if FirstWrite was set, but no etag was provided for an Update operation
216+
// explicitly flag it as an error.
217+
// entity.Update itself does not flag the test case as a mismatch as it does not distinguish
218+
// between nil and "" etags, the initial etag will always be "", which would match on update.
219+
return state.NewETagError(state.ETagMismatch, errors.New("update with Concurrency.FirstWrite without ETag"))
220+
} else {
221+
// Finally, last write semantics without ETag should always perform a force update.
222+
return entity.Update(true, nil)
223+
}
224+
} else {
225+
// Any other unexpected error on Insert is propagated to the caller
226+
return err
208227
}
209228
}
210229

211-
return err
230+
return nil
212231
}
213232

214233
func isNotFoundError(err error) bool {
@@ -217,6 +236,12 @@ func isNotFoundError(err error) bool {
217236
return ok && azureError.Code == "ResourceNotFound"
218237
}
219238

239+
func isEntityAlreadyExistsError(err error) bool {
240+
azureError, ok := err.(storage.AzureStorageServiceError)
241+
242+
return ok && azureError.Code == "EntityAlreadyExists"
243+
}
244+
220245
func isTableAlreadyExistsError(err error) bool {
221246
azureError, ok := err.(storage.AzureStorageServiceError)
222247

@@ -227,12 +252,16 @@ func (r *StateStore) deleteRow(req *state.DeleteRequest) error {
227252
pk, rk := getPartitionAndRowKey(req.Key)
228253
entity := r.table.GetEntityReference(pk, rk)
229254

230-
var etag string
231255
if req.ETag != nil {
232-
etag = *req.ETag
256+
entity.OdataEtag = *req.ETag
257+
258+
// force=false sets the "If-Match: <ETag>" header to ensure that the delete is only performed if the
259+
// entity's ETag matches the specified ETag
260+
return entity.Delete(false, nil)
233261
}
234-
entity.OdataEtag = etag
235262

263+
// force=true sets the "If-Match: *" header to ensure that we delete a matching entity
264+
// regardless of the entity's ETag value
236265
return entity.Delete(true, nil)
237266
}
238267

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: statestore
5+
spec:
6+
type: state.azure.tablestorage
7+
version: v1
8+
metadata:
9+
- name: accountName
10+
value: ${{AzureBlobStorageAccount}}
11+
- name: accountKey
12+
value: ${{AzureBlobStorageAccessKey}}
13+
- name: tableName
14+
value: TestTable

tests/config/state/tests.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ components:
55
allOperations: true
66
- component: mongodb
77
allOperations: true
8-
- component: cosmosdb
8+
- component: azure.cosmosdb
99
allOperations: true
1010
- component: sqlserver
1111
allOperations: true
1212
- component: mysql
13-
allOperations: false
1413
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag" ]
14+
- component: azure.tablestorage
15+
operations: ["set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write"]

tests/conformance/common.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
ss_local_env "github.com/dapr/components-contrib/secretstores/local/env"
5353
ss_local_file "github.com/dapr/components-contrib/secretstores/local/file"
5454
s_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb"
55+
s_azuretablestorage "github.com/dapr/components-contrib/state/azure/tablestorage"
5556
s_mongodb "github.com/dapr/components-contrib/state/mongodb"
5657
s_mysql "github.com/dapr/components-contrib/state/mysql"
5758
s_redis "github.com/dapr/components-contrib/state/redis"
@@ -369,14 +370,16 @@ func loadStateStore(tc TestComponent) state.Store {
369370
switch tc.Component {
370371
case redis:
371372
store = s_redis.NewRedisStateStore(testLogger)
372-
case "cosmosdb":
373+
case "azure.cosmosdb":
373374
store = s_cosmosdb.NewCosmosDBStateStore(testLogger)
374375
case "mongodb":
375376
store = s_mongodb.NewMongoDB(testLogger)
376377
case "sqlserver":
377378
store = s_sqlserver.NewSQLServerStateStore(testLogger)
378379
case "mysql":
379380
store = s_mysql.NewMySQLStateStore(testLogger)
381+
case "azure.tablestorage":
382+
store = s_azuretablestorage.NewAzureTablesStateStore(testLogger)
380383
default:
381384
return nil
382385
}

0 commit comments

Comments
 (0)