Skip to content

Commit

Permalink
feat(spanner): add support for Optimistic Concurrency Control (#7332)
Browse files Browse the repository at this point in the history
  • Loading branch information
ko3a4ok authored Feb 1, 2023
1 parent cf1332d commit 48ba16f
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 4 deletions.
88 changes: 88 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,88 @@ func TestClient_ReadWriteTransactionWithOptions(t *testing.T) {
}
}

func TestClient_ReadWriteTransactionWithOptimisticLockMode_ExecuteSqlRequest(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable"), status.Error(codes.Aborted, "Transaction aborted")},
})
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
_, err := iter.Next()
return err
}, TransactionOptions{ReadLockMode: sppb.TransactionOptions_ReadWrite_OPTIMISTIC})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if requests[1].(*sppb.ExecuteSqlRequest).GetTransaction().GetBegin().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC {
t.Fatal("Transaction is not set to optimistic")
}
if requests[2].(*sppb.ExecuteSqlRequest).GetTransaction().GetBegin().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC {
t.Fatal("Transaction is not set to optimistic")
}
if requests[3].(*sppb.BeginTransactionRequest).GetOptions().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC {
t.Fatal("Begin Transaction is not set to optimistic")
}
if _, ok := requests[4].(*sppb.ExecuteSqlRequest).Transaction.GetSelector().(*sppb.TransactionSelector_Id); !ok {
t.Fatal("expected streaming query to use transactionID from explicit begin transaction")
}
}

func TestClient_ReadWriteTransactionWithOptimisticLockMode_ReadRequest(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()
server.TestSpanner.PutExecutionTime(MethodStreamingRead,
SimulatedExecutionTime{
Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable"), status.Error(codes.Aborted, "Transaction aborted")},
})
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"})
defer iter.Stop()
_, err := iter.Next()
return err
}, TransactionOptions{ReadLockMode: sppb.TransactionOptions_ReadWrite_OPTIMISTIC})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ReadRequest{},
&sppb.ReadRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ReadRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if requests[1].(*sppb.ReadRequest).GetTransaction().GetBegin().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC {
t.Fatal("Transaction is not set to optimistic")
}
if requests[2].(*sppb.ReadRequest).GetTransaction().GetBegin().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC {
t.Fatal("Transaction is not set to optimistic")
}
if requests[3].(*sppb.BeginTransactionRequest).GetOptions().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC {
t.Fatal("Begin Transaction is not set to optimistic")
}
if _, ok := requests[4].(*sppb.ReadRequest).Transaction.GetSelector().(*sppb.TransactionSelector_Id); !ok {
t.Fatal("expected streaming read to use transactionID from explicit begin transaction")
}
}

func TestClient_ReadWriteStmtBasedTransaction_TransactionOptions(t *testing.T) {
for _, tt := range transactionOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -2999,6 +3081,12 @@ func transactionOptionsTestCases() []TransactionOptionsTestCase {
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
},
{
name: "Read lock mode is optimistic",
client: &TransactionOptions{ReadLockMode: sppb.TransactionOptions_ReadWrite_OPTIMISTIC},
write: &TransactionOptions{},
want: &TransactionOptions{},
},
}
}

Expand Down
19 changes: 15 additions & 4 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ type TransactionOptions struct {
// CommitPriority is the priority to use for the Commit RPC for the
// transaction.
CommitPriority sppb.RequestOptions_Priority

// the transaction lock mode is used to specify a concurrency mode for the
// read/query operations. It works for a read/write transaction only.
ReadLockMode sppb.TransactionOptions_ReadWrite_ReadLockMode
}

// merge combines two TransactionOptions that the input parameter will have higher
Expand All @@ -119,6 +123,9 @@ func (to TransactionOptions) merge(opts TransactionOptions) TransactionOptions {
if opts.CommitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
merged.CommitPriority = opts.CommitPriority
}
if opts.ReadLockMode != sppb.TransactionOptions_ReadWrite_READ_LOCK_MODE_UNSPECIFIED {
merged.ReadLockMode = opts.ReadLockMode
}
return merged
}

Expand Down Expand Up @@ -1244,7 +1251,9 @@ func (t *ReadWriteTransaction) getTransactionSelector() *sppb.TransactionSelecto
Selector: &sppb.TransactionSelector_Begin{
Begin: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadWrite_{
ReadWrite: &sppb.TransactionOptions_ReadWrite{},
ReadWrite: &sppb.TransactionOptions_ReadWrite{
ReadLockMode: t.txOpts.ReadLockMode,
},
},
},
},
Expand Down Expand Up @@ -1278,12 +1287,14 @@ func (t *ReadWriteTransaction) release(err error) {
}
}

func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) {
func beginTransaction(ctx context.Context, sid string, client *vkit.Client, opts TransactionOptions) (transactionID, error) {
res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sid,
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadWrite_{
ReadWrite: &sppb.TransactionOptions_ReadWrite{},
ReadWrite: &sppb.TransactionOptions_ReadWrite{
ReadLockMode: opts.ReadLockMode,
},
},
},
})
Expand Down Expand Up @@ -1344,7 +1355,7 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error {
return err
}
}
tx, err = beginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), sh.getID(), sh.getClient())
tx, err = beginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), sh.getID(), sh.getClient(), t.txOpts)
if isSessionNotFoundError(err) {
sh.destroy()
continue
Expand Down

0 comments on commit 48ba16f

Please sign in to comment.