Skip to content

Commit

Permalink
fix(spanner): reset buffer after abort on first SQL statement (#8440)
Browse files Browse the repository at this point in the history
* fix: reset buffer after abort on first SQL statement

Reset the mutations buffer of a transaction if a transaction is retried
after the first SQL statement in the transaction failed with aborted.
This can happen on the emulator if the client tries to start multiple
parallel read/write transactions on the emulator, which the emulator
does not support.

* chore: run formatter
  • Loading branch information
olavloite authored Aug 18, 2023
1 parent db6dc5a commit d980b42
Show file tree
Hide file tree
Showing 2 changed files with 303 additions and 0 deletions.
1 change: 1 addition & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txReadOnly.disableRouteToLeader = c.disableRouteToLeader
t.wb = []*Mutation{}
t.txOpts = c.txo.merge(options)
t.ct = c.ct

Expand Down
302 changes: 302 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,308 @@ func TestClient_ReadWriteTransactionCommitAborted(t *testing.T) {
}
}

func TestClient_ReadWriteTransaction_BufferedWriteBeforeAbortedFirstSqlStatement(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}})

var attempts int
expectedAttempts := 2
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
// Buffer mutations before executing a SQL statement.
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
// Then execute a SQL statement that will return Aborted from the backend.
// This will force a retry of the transaction with an explicit BeginTransaction RPC.
c, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
if g, w := c, int64(UpdateBarSetFooRowCount); g != w {
return fmt.Errorf("update count mismatch\nGot: %v\nWant: %v", g, w)
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
g, w := len(commit.Mutations), 1
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_BufferedWriteBeforeAbortedFirstSqlStatementTwice(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{Errors: []error{
status.Error(codes.Aborted, "Transaction aborted"),
status.Error(codes.Aborted, "Transaction aborted"),
}})

var attempts int
expectedAttempts := 3
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
// Buffer mutations before executing a SQL statement.
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
// Then execute a SQL statement that will return Aborted from the backend.
// This will force a retry of the transaction with an explicit BeginTransaction RPC.
c, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
if g, w := c, int64(UpdateBarSetFooRowCount); g != w {
return fmt.Errorf("update count mismatch\nGot: %v\nWant: %v", g, w)
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
g, w := len(commit.Mutations), 1
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_BufferedWriteBeforeSqlStatementWithError(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{Errors: []error{
status.Error(codes.InvalidArgument, "Invalid"),
status.Error(codes.InvalidArgument, "Invalid"),
}})

var attempts int
expectedAttempts := 2
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
// Buffer mutations before executing a SQL statement.
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
// Then execute a SQL statement that will return InvalidArgument from the backend.
// This will initially force a retry of the transaction with an explicit BeginTransaction RPC.
// We ignore the error and proceed to commit the transaction.
_, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err == nil {
return fmt.Errorf("missing expected InvalidArgument error")
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
g, w := len(commit.Mutations), 1
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_BufferedWriteBeforeSqlStatementWithErrorThatGoesAway(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{Errors: []error{
status.Error(codes.AlreadyExists, "Row already exists"),
}})

var attempts int
expectedAttempts := 2
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
// Buffer mutations before executing a SQL statement.
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
// Then execute a SQL statement that will return InvalidArgument from the backend.
// This will initially force a retry of the transaction with an explicit BeginTransaction RPC.
// The error does not occur during the retry and the transaction is allowed to continue.
_, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
g, w := len(commit.Mutations), 1
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_OnlyBufferWritesDuringInitialAttempt(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{Errors: []error{
status.Error(codes.AlreadyExists, "Row already exists"),
}})

expectedAttempts := 2
var attempts int
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
if attempts == 1 {
// Only do a blind write if it is not a retry of the transaction.
if err := tx.BufferWrite([]*Mutation{
Delete("foo", AllKeys()),
}); err != nil {
return err
}
}
// Then execute a SQL statement that will return InvalidArgument from the backend.
// This will initially force a retry of the transaction with an explicit BeginTransaction RPC.
// The error does not occur during the retry and the transaction is allowed to continue.
_, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
g, w := len(commit.Mutations), 0
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_BlindWriteWithAbortedCommit(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodCommitTransaction, SimulatedExecutionTime{Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}})

var attempts int
expectedAttempts := 2
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
// Do a blind write and then commit. The CommitRequest will be aborted and cause the transaction to retry.
if err := tx.BufferWrite([]*Mutation{Insert("foo", []string{"col1"}, []interface{}{"key1"})}); err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
commit := requests[len(requests)-1].(*sppb.CommitRequest)
// TODO: Update to 1 when the bug is fixed
g, w := len(commit.Mutations), 1
if g != w {
t.Fatalf("mutations count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundOnCommit(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
Expand Down

0 comments on commit d980b42

Please sign in to comment.