Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/infrastructure/docker-compose-sqlserver.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: '2'
services:
sqlserver:
image: mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04
ports:
- "1433:1433"
environment:
ACCEPT_EULA: Y
SA_PASSWORD: "Pass@Word1"
5 changes: 5 additions & 0 deletions .github/workflows/conformance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ jobs:
- secretstores.localfile
- state.mongodb
- state.redis
- state.sqlserver
EOF
)
echo "::set-output name=pr-components::$PR_COMPONENTS"
Expand Down Expand Up @@ -191,6 +192,10 @@ jobs:
mongodb-replica-set: test-rs
if: contains(matrix.component, 'mongodb')

- name: Start sqlserver
run: docker-compose -f ./.github/infrastructure/docker-compose-sqlserver.yml -p sqlserver up -d
if: contains(matrix.component, 'sqlserver')

- name: Start kafka
run: docker-compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d
if: contains(matrix.component, 'kafka')
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/dancannon/gorethink v4.0.0+incompatible
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233
github.com/deepmap/oapi-codegen v1.8.1 // indirect
github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73
github.com/denisenkom/go-mssqldb v0.0.0-20210411162248-d9abbec934ba
github.com/dghubble/go-twitter v0.0.0-20190719072343-39e5462e111f
github.com/dghubble/oauth1 v0.6.0
github.com/didip/tollbooth v4.0.2+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/deepmap/oapi-codegen v1.3.6/go.mod h1:aBozjEveG+33xPiP55Iw/XbVkhtZHEGLq3nxlX0+hfU=
github.com/deepmap/oapi-codegen v1.8.1 h1:gSKgzu1DvWfRctnr0UVwieWkg1LEecP0C2htZyBwDTA=
github.com/deepmap/oapi-codegen v1.8.1/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw=
github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73 h1:OGNva6WhsKst5OZf7eZOklDztV3hwtTHovdrLHV+MsA=
github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/denisenkom/go-mssqldb v0.0.0-20210411162248-d9abbec934ba h1:HuzamveGKQH9cN1TrsZgEoG0sHvTa5j3LKquWaHR3sY=
github.com/denisenkom/go-mssqldb v0.0.0-20210411162248-d9abbec934ba/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dghubble/go-twitter v0.0.0-20190719072343-39e5462e111f h1:M2wB039zeS1/LZtN/3A7tWyfctiOBL4ty5PURBmDdWU=
Expand Down
103 changes: 76 additions & 27 deletions state/sqlserver/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ func (m *migration) executeMigrations() (migrationResult, error) {
}

func runCommand(tsql string, db *sql.DB) error {
_, err := db.Exec(tsql)
if err != nil {
if _, err := db.Exec(tsql); err != nil {
return err
}

Expand Down Expand Up @@ -272,35 +271,85 @@ func (m *migration) createStoredProcedureIfNotExists(db *sql.DB, name string, es
/* #nosec */
func (m *migration) ensureUpsertStoredProcedureExists(db *sql.DB, mr migrationResult) error {
tsql := fmt.Sprintf(`
CREATE PROCEDURE %s (
@Key %s,
@Data NVARCHAR(MAX),
@RowVersion BINARY(8))
AS
IF (@RowVersion IS NOT NULL)
BEGIN
UPDATE [%s]
SET [Data]=@Data, UpdateDate=GETDATE()
WHERE [Key]=@Key AND RowVersion = @RowVersion

RETURN
END

BEGIN TRY
INSERT INTO [%s] ([Key], [Data]) VALUES (@Key, @Data);
END TRY

BEGIN CATCH
IF ERROR_NUMBER() IN (2601, 2627)
UPDATE [%s]
SET [Data]=@Data, UpdateDate=GETDATE()
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
END CATCH`,
CREATE PROCEDURE %s (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I think the logic here is still problematic. I understand the desire not to churn existing code, but it looks like there are still issues relative to the expected behaviors expressed in #2739 1c. Condensed here for reference:

StateOptions.Concurrency Req.Etag Expected Behavior in State Store
CONCURRENCY_FIRST_WRITE not nil Write if etag matches
CONCURRENCY_FIRST_WRITE nil Write if key does not exist
CONCURRENCY_LAST_WRITE or unspecified not nil Return error (Upcoming breaking change. Today, state components write regardless of etag)
CONCURRENCY_LAST_WRITE or unspecified nil Write regardless of etag

"Write" in this case refers to an upsert (i.e. insert key if it does not already exist, otherwise update). The original code ignores the top half of that table, but it also looks like it already did not implement the bottom half correctly. Please correct any readings of the code here:

  • If an ETag is provided and the key does not exist, it attempts to update, which returns 0 rows modified but does not error and does not insert the value. It is silently lost.
  • If an ETag is provided and the key does exist, it will attempt to update using the ETag. If the ETag does not match, it again does not error and does not update the value, which has FIRST_WRITE semantics instead of LAST_WRITE (default) semantics. It should ignore the ETag completely in this case.

Preserving the original logic propagates these issues forwards. The new logic for handling FIRST_WRITE works, although it raises some questions about the specification for behavior:

  • If ETag provided and key does not exist, ETag can never match. Should the method raise an error or silently ignore the write?
    • This implementation returns an error
  • If ETag provided and key exists, but ETag does not match, should the method raise an error or silently ignore the write?
    • This implementation silently ignores the update. (The update on row 290 finds no matches).

That seems potentially inconsistent as an error contract for Dapr.

@artursouza To keep this moving forwards, since you've already approved the PR and the new code is okay, I suggest we file a separate issue to track the logical inconsistencies in the existing code. In continuing the discussion of Dapr ETag/concurrency state handling, I'll also follow up on the issue with my questions on the expected error handling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue for LAST_WRITE and eTag not null is behavior change beyond the scope of this change because it is a broader concern of the expectations for the state API behavior. I am OK with the current logic in this component.

The error handling for silent etag mistmatch in the stored procedure is handled in code by checking number of rows affected.

@CodeMonkeyLeet Let me know if you have any other concerns on this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I think it's definitely a step forward and we should take it, thanks for reminding me about the downstream handling of returning errors from the zero rows updated check.

@Key %s,
@Data NVARCHAR(MAX),
@RowVersion BINARY(8),
@FirstWrite BIT)
AS
IF (@FirstWrite=1)
BEGIN
IF (@RowVersion IS NOT NULL)
BEGIN
BEGIN TRANSACTION;
IF NOT EXISTS (SELECT * FROM [%s] WHERE [KEY]=@KEY AND RowVersion = @RowVersion)
BEGIN
THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1
END
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not always the case though: this case can also be because the row was never written before and the key does not exist.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an excellent point. What do you suggest? Changing the error message?

If the record does not exist it's not actually possible to specify an ETAG (Row Version) to be written -- the stored procedure / DB won't allow the use of ETAGs other than to update an existing record.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would just change the error message to something more general

BEGIN
UPDATE [%s]
SET [Data]=@Data, UpdateDate=GETDATE()
WHERE [Key]=@Key AND RowVersion = @RowVersion
END
COMMIT;
END
ELSE
BEGIN
BEGIN TRANSACTION;
IF EXISTS (SELECT * FROM [%s] WHERE [KEY]=@KEY)
BEGIN
THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1
END
BEGIN
BEGIN TRY
INSERT INTO [%s] ([Key], [Data]) VALUES (@Key, @Data);
END TRY

BEGIN CATCH
IF ERROR_NUMBER() IN (2601, 2627)
UPDATE [%s]
SET [Data]=@Data, UpdateDate=GETDATE()
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
END CATCH
END
COMMIT;
END
END
ELSE
BEGIN
IF (@RowVersion IS NOT NULL)
BEGIN
UPDATE [%s]
SET [Data]=@Data, UpdateDate=GETDATE()
WHERE [Key]=@Key AND RowVersion = @RowVersion
RETURN
END
ELSE
BEGIN
BEGIN TRY
INSERT INTO [%s] ([Key], [Data]) VALUES (@Key, @Data);
END TRY

BEGIN CATCH
IF ERROR_NUMBER() IN (2601, 2627)
UPDATE [%s]
SET [Data]=@Data, UpdateDate=GETDATE()
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
END CATCH
END
END
`,
mr.upsertProcFullName,
mr.pkColumnType,
m.store.tableName,
m.store.tableName,
m.store.tableName)
m.store.tableName,
m.store.tableName,
m.store.tableName,
m.store.tableName,
m.store.tableName,
m.store.tableName,
)

return m.createStoredProcedureIfNotExists(db, mr.upsertProcName, tsql)
}
25 changes: 16 additions & 9 deletions state/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,23 +440,23 @@ func (s *SQLServer) Delete(req *state.DeleteRequest) error {
res, err = s.db.Exec(s.deleteWithoutETagCommand, sql.Named(keyColumnName, req.Key))
}

// err represents errors thrown by the stored procedure or the database itself
if err != nil {
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

// if the row with matching key (and ETag if specified) is not found, then the stored procedure returns 0 rows affected
rows, err := res.RowsAffected()
if err != nil {
return err
}

if rows != 1 {
return fmt.Errorf("items was not updated")
// When an ETAG is specified, a row must have been deleted or else we return an ETag mismatch error
if rows != 1 && req.ETag != nil && *req.ETag != "" {
return state.NewETagError(state.ETagMismatch, nil)
}

// successful deletion, or noop if no ETAG specified
return nil
}

Expand Down Expand Up @@ -578,15 +578,22 @@ func (s *SQLServer) executeSet(db dbExecutor, req *state.SetRequest) error {
return err
}
etag := sql.Named(rowVersionColumnName, nil)
if req.ETag != nil {
if req.ETag != nil && *req.ETag != "" {
var b []byte
b, err = hex.DecodeString(*req.ETag)
if err != nil {
return state.NewETagError(state.ETagInvalid, err)
}
etag.Value = b
etag = sql.Named(rowVersionColumnName, b)
}
res, err := db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag)

var res sql.Result
if req.Options.Concurrency == state.FirstWrite {
res, err = db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag, sql.Named("FirstWrite", 1))
} else {
res, err = db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag, sql.Named("FirstWrite", 0))
}

if err != nil {
if req.ETag != nil && *req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
Expand Down
11 changes: 11 additions & 0 deletions tests/config/state/sqlserver/statestore.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.sqlserver
metadata:
- name: connectionString
value: "server=localhost;user id=sa;password=Pass@Word1;port=1433;"
- name: tableName
value: mytable
2 changes: 2 additions & 0 deletions tests/config/state/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ components:
allOperations: true
- component: cosmosdb
allOperations: true
- component: sqlserver
allOperations: true
3 changes: 3 additions & 0 deletions tests/conformance/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
s_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb"
s_mongodb "github.com/dapr/components-contrib/state/mongodb"
s_redis "github.com/dapr/components-contrib/state/redis"
s_sqlserver "github.com/dapr/components-contrib/state/sqlserver"
conf_bindings "github.com/dapr/components-contrib/tests/conformance/bindings"
conf_pubsub "github.com/dapr/components-contrib/tests/conformance/pubsub"
conf_secret "github.com/dapr/components-contrib/tests/conformance/secretstores"
Expand Down Expand Up @@ -363,6 +364,8 @@ func loadStateStore(tc TestComponent) state.Store {
store = s_cosmosdb.NewCosmosDBStateStore(testLogger)
case "mongodb":
store = s_mongodb.NewMongoDB(testLogger)
case "sqlserver":
store = s_sqlserver.NewSQLServerStateStore(testLogger)
default:
return nil
}
Expand Down
1 change: 1 addition & 0 deletions tests/conformance/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
t.Run("delete", func(t *testing.T) {
for _, scenario := range scenarios {
if !scenario.bulkOnly && scenario.toBeDeleted {
// this also deletes two keys that were not inserted in the set operation
t.Logf("Deleting %s", scenario.key)
err := statestore.Delete(&state.DeleteRequest{
Key: scenario.key,
Expand Down