Skip to content

Commit

Permalink
Merge branch 'main' into auth1
Browse files Browse the repository at this point in the history
  • Loading branch information
codyoss authored Aug 23, 2023
2 parents 1f00148 + 3dda7b2 commit 740c2c6
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .release-please-manifest-individual.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"auth": "0.0.0",
"bigquery": "1.54.0",
"bigtable": "1.19.0",
"datastore": "1.13.0",
"datastore": "1.14.0",
"errorreporting": "0.3.0",
"firestore": "1.12.0",
"logging": "1.8.1",
Expand Down
19 changes: 19 additions & 0 deletions datastore/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
# Changes

## [1.14.0](https://github.com/googleapis/google-cloud-go/compare/datastore/v1.13.0...datastore/v1.14.0) (2023-08-22)


### Features

* **datastore:** SUM and AVG aggregations ([#8307](https://github.com/googleapis/google-cloud-go/issues/8307)) ([a9fff18](https://github.com/googleapis/google-cloud-go/commit/a9fff181e4ea8281ad907e7b2e0d90e70013a4de))
* **datastore:** Support aggregation query in transaction ([#8439](https://github.com/googleapis/google-cloud-go/issues/8439)) ([37681ff](https://github.com/googleapis/google-cloud-go/commit/37681ff291c0ccf4c908be55b97639c04b9dec48))


### Bug Fixes

* **datastore:** Correcting string representation of Key ([#8363](https://github.com/googleapis/google-cloud-go/issues/8363)) ([4cb1211](https://github.com/googleapis/google-cloud-go/commit/4cb12110ba229dfbe21568eb06c243bdffd1fee7))
* **datastore:** Fix NoIndex for array property ([#7674](https://github.com/googleapis/google-cloud-go/issues/7674)) ([01951e6](https://github.com/googleapis/google-cloud-go/commit/01951e64f3955dc337172a30d78e2f92f65becb2))


### Documentation

* **datastore/admin:** Specify limit for `properties` in `Index` message in Datastore Admin API ([b890425](https://github.com/googleapis/google-cloud-go/commit/b8904253a0f8424ea4548469e5feef321bd7396a))

## [1.13.0](https://github.com/googleapis/google-cloud-go/compare/datastore/v1.12.1...datastore/v1.13.0) (2023-07-26)


Expand Down
57 changes: 51 additions & 6 deletions datastore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,15 @@ func TestIntegration_AggregationQueries(t *testing.T) {
for i := range keys {
keys[i] = IncompleteKey("SQChild", parent)
}
keys, err := client.PutMulti(ctx, keys, children)

// Create transaction with read before creating entities
readTime := time.Now()
txBeforeCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(readTime)}...)
if err != nil {
t.Fatalf("client.NewTransaction: %v", err)
}

keys, err = client.PutMulti(ctx, keys, children)
if err != nil {
t.Fatalf("client.PutMulti: %v", err)
}
Expand All @@ -733,13 +741,22 @@ func TestIntegration_AggregationQueries(t *testing.T) {
}
}()

// Create transaction with read after creating entities
readTime = time.Now()
txAfterCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(readTime)}...)
if err != nil {
t.Fatalf("client.NewTransaction: %v", err)
}

testCases := []struct {
desc string
aggQuery *AggregationQuery
wantFailure bool
wantErrMsg string
wantAggResult AggregationResult
desc string
aggQuery *AggregationQuery
transactionOpts []TransactionOption
wantFailure bool
wantErrMsg string
wantAggResult AggregationResult
}{

{
desc: "Count Failure - Missing index",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T>=", now).
Expand All @@ -757,6 +774,34 @@ func TestIntegration_AggregationQueries(t *testing.T) {
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}},
},
},
{
desc: "Aggregations in transaction before creating entities",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).
Transaction(txBeforeCreate).
NewAggregationQuery().
WithCount("count").
WithSum("I", "sum").
WithAvg("I", "avg"),
wantAggResult: map[string]interface{}{
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}},
"sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}},
"avg": &pb.Value{ValueType: &pb.Value_NullValue{NullValue: structpb.NullValue_NULL_VALUE}},
},
},
{
desc: "Aggregations in transaction after creating entities",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).
Transaction(txAfterCreate).
NewAggregationQuery().
WithCount("count").
WithSum("I", "sum").
WithAvg("I", "avg"),
wantAggResult: map[string]interface{}{
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 8}},
"sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 28}},
"avg": &pb.Value{ValueType: &pb.Value_DoubleValue{DoubleValue: 3.5}},
},
},
{
desc: "Multiple aggregations",
aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).
Expand Down
2 changes: 1 addition & 1 deletion datastore/internal/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
package internal

// Version is the current tagged release of the library.
const Version = "1.13.0"
const Version = "1.14.0"
1 change: 0 additions & 1 deletion datastore/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,6 @@ func DecodeCursor(s string) (Cursor, error) {
// NewAggregationQuery returns an AggregationQuery with this query as its
// base query.
func (q *Query) NewAggregationQuery() *AggregationQuery {
q.eventual = true
return &AggregationQuery{
query: q,
aggregationQueries: make([]*pb.AggregationQuery_Aggregation, 0),
Expand Down
5 changes: 0 additions & 5 deletions datastore/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,6 @@ func fakeRunAggregationQuery(req *pb.RunAggregationQueryRequest) (*pb.RunAggrega
},
},
},
ReadOptions: &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_ReadConsistency_{
ReadConsistency: pb.ReadOptions_EVENTUAL,
},
},
}
if !proto.Equal(req, expectedIn) {
return nil, fmt.Errorf("unsupported argument: got %v want %v", req, expectedIn)
Expand Down
6 changes: 5 additions & 1 deletion spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,10 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
}
}
if t.shouldExplicitBegin(attempt) {
// Make sure we set the current session handle before calling BeginTransaction.
// Note that the t.begin(ctx) call could change the session that is being used by the transaction, as the
// BeginTransaction RPC invocation will be retried on a new session if it returns SessionNotFound.
t.txReadOnly.sh = sh
if err = t.begin(ctx); err != nil {
trace.TracePrintf(ctx, nil, "Error while BeginTransaction during retrying a ReadWrite transaction: %v", ToSpannerError(err))
return ToSpannerError(err)
Expand All @@ -571,9 +575,9 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t = &ReadWriteTransaction{
txReadyOrClosed: make(chan struct{}),
}
t.txReadOnly.sh = sh
}
attempt++
t.txReadOnly.sh = sh
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
Expand Down
196 changes: 196 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,202 @@ func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction_WithMaxOne
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundForFirstStatement(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteStreamingSql,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)

expectedAttempts := 2
var attempts int
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
iter := tx.Query(ctx, NewStatement(SelectFooFromBar))
defer iter.Stop()
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundForFirstStatement_AndThenSessionNotFoundForBeginTransaction(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteStreamingSql,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
server.TestSpanner.PutExecutionTime(
MethodBeginTransaction,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)

expectedAttempts := 2
var attempts int
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
iter := tx.Query(ctx, NewStatement(SelectFooFromBar))
defer iter.Stop()
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
}

func TestClient_ReadWriteTransaction_AbortedForFirstStatement_AndThenSessionNotFoundForBeginTransaction(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteStreamingSql,
SimulatedExecutionTime{Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}},
)
server.TestSpanner.PutExecutionTime(
MethodBeginTransaction,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)

expectedAttempts := 2
var attempts int
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
iter := tx.Query(ctx, NewStatement(SelectFooFromBar))
defer iter.Stop()
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundForFirstStatement_DoesNotLeakSession(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteStreamingSql,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)

expectedAttempts := 2
var attempts int
_, err := client.ReadWriteTransaction(
ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
iter := tx.Query(ctx, NewStatement(SelectFooFromBar))
defer iter.Stop()
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatal(err)
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BatchCreateSessionsRequest{}, // We need to create more sessions, as the one used first was destroyed.
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{},
}, requests); err != nil {
t.Fatal(err)
}
}

func TestClient_ReadOnlyTransaction_QueryOptions(t *testing.T) {
for _, tt := range queryOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading

0 comments on commit 740c2c6

Please sign in to comment.