readOptions =
- this.readOptionProtoPreparer.prepare(
- ImmutableList.of(ReadOption.transactionId(transactionId)));
+ this.readOptionProtoPreparer.prepare(ImmutableList.of(transactionId(transactionId)));
return datastore.run(readOptions, query);
}
+ @Override
+ public AggregationResults runAggregation(AggregationQuery query) {
+ return datastore.runAggregation(query, transactionId(transactionId));
+ }
+
@Override
public Transaction.Response commit() {
validateActive();
diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/aggregation/Aggregation.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/aggregation/Aggregation.java
new file mode 100644
index 000000000..8a8e8cc18
--- /dev/null
+++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/aggregation/Aggregation.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.datastore.aggregation;
+
+import com.google.api.core.BetaApi;
+import com.google.api.core.InternalApi;
+import com.google.datastore.v1.AggregationQuery;
+
+/**
+ * Represents a Google Cloud Datastore Aggregation which is used with an {@link AggregationQuery}.
+ */
+@BetaApi
+public abstract class Aggregation {
+
+ private final String alias;
+
+ public Aggregation(String alias) {
+ this.alias = alias;
+ }
+
+ /** Returns the alias for this aggregation. */
+ public String getAlias() {
+ return alias;
+ }
+
+ @InternalApi
+ public abstract AggregationQuery.Aggregation toPb();
+
+ /** Returns a {@link CountAggregation} builder. */
+ public static CountAggregation.Builder count() {
+ return new CountAggregation.Builder();
+ }
+}
diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/aggregation/AggregationBuilder.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/aggregation/AggregationBuilder.java
new file mode 100644
index 000000000..5e90b86aa
--- /dev/null
+++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/aggregation/AggregationBuilder.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.datastore.aggregation;
+
+import com.google.api.core.BetaApi;
+
+/**
+ * An interface to represent the builders which build and customize {@link Aggregation} for {@link
+ * com.google.cloud.datastore.AggregationQuery}.
+ *
+ * Used by {@link
+ * com.google.cloud.datastore.AggregationQuery.Builder#addAggregation(AggregationBuilder)}.
+ */
+@BetaApi
+public interface AggregationBuilder {
+ A build();
+}
diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/aggregation/CountAggregation.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/aggregation/CountAggregation.java
new file mode 100644
index 000000000..a5295addf
--- /dev/null
+++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/aggregation/CountAggregation.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.datastore.aggregation;
+
+import com.google.api.core.BetaApi;
+import com.google.datastore.v1.AggregationQuery;
+import com.google.datastore.v1.AggregationQuery.Aggregation.Count;
+import java.util.Objects;
+
+/** Represents an {@link Aggregation} which returns count. */
+@BetaApi
+public class CountAggregation extends Aggregation {
+
+ /** @param alias Alias to used when running this aggregation. */
+ public CountAggregation(String alias) {
+ super(alias);
+ }
+
+ @Override
+ public AggregationQuery.Aggregation toPb() {
+ Count.Builder countBuilder = Count.newBuilder();
+
+ AggregationQuery.Aggregation.Builder aggregationBuilder =
+ AggregationQuery.Aggregation.newBuilder().setCount(countBuilder);
+ if (this.getAlias() != null) {
+ aggregationBuilder.setAlias(this.getAlias());
+ }
+ return aggregationBuilder.build();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CountAggregation that = (CountAggregation) o;
+ boolean bothAliasAreNull = getAlias() == null && that.getAlias() == null;
+ if (bothAliasAreNull) {
+ return true;
+ } else {
+ boolean bothArePresent = getAlias() != null && that.getAlias() != null;
+ return bothArePresent && getAlias().equals(that.getAlias());
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getAlias());
+ }
+
+ /** A builder class to create and customize a {@link CountAggregation}. */
+ public static class Builder implements AggregationBuilder {
+
+ private String alias;
+
+ public Builder as(String alias) {
+ this.alias = alias;
+ return this;
+ }
+
+ @Override
+ public CountAggregation build() {
+ return new CountAggregation(alias);
+ }
+ }
+}
diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/AggregationQueryExecutor.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/AggregationQueryExecutor.java
new file mode 100644
index 000000000..14e425845
--- /dev/null
+++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/AggregationQueryExecutor.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore.execution;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.datastore.AggregationQuery;
+import com.google.cloud.datastore.AggregationResults;
+import com.google.cloud.datastore.DatastoreOptions;
+import com.google.cloud.datastore.ReadOption;
+import com.google.cloud.datastore.ReadOption.QueryAndReadOptions;
+import com.google.cloud.datastore.execution.request.AggregationQueryRequestProtoPreparer;
+import com.google.cloud.datastore.execution.response.AggregationQueryResponseTransformer;
+import com.google.cloud.datastore.spi.v1.DatastoreRpc;
+import com.google.datastore.v1.RunAggregationQueryRequest;
+import com.google.datastore.v1.RunAggregationQueryResponse;
+import java.util.Arrays;
+
+/**
+ * An implementation of {@link QueryExecutor} which executes {@link AggregationQuery} and returns
+ * {@link AggregationResults}.
+ */
+@InternalApi
+public class AggregationQueryExecutor
+ implements QueryExecutor {
+
+ private final DatastoreRpc datastoreRpc;
+ private final AggregationQueryRequestProtoPreparer protoPreparer;
+ private final AggregationQueryResponseTransformer responseTransformer;
+
+ public AggregationQueryExecutor(DatastoreRpc datastoreRpc, DatastoreOptions datastoreOptions) {
+ this.datastoreRpc = datastoreRpc;
+ this.protoPreparer = new AggregationQueryRequestProtoPreparer(datastoreOptions);
+ this.responseTransformer = new AggregationQueryResponseTransformer();
+ }
+
+ @Override
+ public AggregationResults execute(AggregationQuery query, ReadOption... readOptions) {
+ RunAggregationQueryRequest runAggregationQueryRequest =
+ getRunAggregationQueryRequest(query, readOptions);
+ RunAggregationQueryResponse runAggregationQueryResponse =
+ this.datastoreRpc.runAggregationQuery(runAggregationQueryRequest);
+ return this.responseTransformer.transform(runAggregationQueryResponse);
+ }
+
+ private RunAggregationQueryRequest getRunAggregationQueryRequest(
+ AggregationQuery query, ReadOption... readOptions) {
+ QueryAndReadOptions queryAndReadOptions =
+ readOptions == null
+ ? QueryAndReadOptions.create(query)
+ : QueryAndReadOptions.create(query, Arrays.asList(readOptions));
+ return this.protoPreparer.prepare(queryAndReadOptions);
+ }
+}
diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/QueryExecutor.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/QueryExecutor.java
new file mode 100644
index 000000000..856c64a02
--- /dev/null
+++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/QueryExecutor.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore.execution;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.datastore.Query;
+import com.google.cloud.datastore.ReadOption;
+
+/**
+ * An internal functional interface whose implementation has the responsibility to execute a {@link
+ * Query} and returns the result. This class will have the responsibility to orchestrate between
+ * {@link com.google.cloud.datastore.execution.request.ProtoPreparer}, {@link
+ * com.google.cloud.datastore.spi.v1.DatastoreRpc} and {@link
+ * com.google.cloud.datastore.execution.response.ResponseTransformer} layers.
+ *
+ * @param A {@link Query} to execute.
+ * @param the type of result produced by Query.
+ */
+@InternalApi
+public interface QueryExecutor , OUTPUT> {
+
+ /**
+ * @param query A {@link Query} to execute.
+ * @param readOptions Optional {@link ReadOption}s to be used when executing {@link Query}.
+ */
+ OUTPUT execute(INPUT query, ReadOption... readOptions);
+}
diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/request/AggregationQueryRequestProtoPreparer.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/request/AggregationQueryRequestProtoPreparer.java
new file mode 100644
index 000000000..b5da8d9fe
--- /dev/null
+++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/request/AggregationQueryRequestProtoPreparer.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore.execution.request;
+
+import static com.google.cloud.datastore.AggregationQuery.Mode.GQL;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.datastore.AggregationQuery;
+import com.google.cloud.datastore.DatastoreOptions;
+import com.google.cloud.datastore.GqlQueryProtoPreparer;
+import com.google.cloud.datastore.ReadOption;
+import com.google.cloud.datastore.ReadOption.QueryAndReadOptions;
+import com.google.cloud.datastore.ReadOptionProtoPreparer;
+import com.google.cloud.datastore.StructuredQueryProtoPreparer;
+import com.google.cloud.datastore.aggregation.Aggregation;
+import com.google.datastore.v1.GqlQuery;
+import com.google.datastore.v1.PartitionId;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.ReadOptions;
+import com.google.datastore.v1.RunAggregationQueryRequest;
+import java.util.List;
+import java.util.Optional;
+
+@InternalApi
+public class AggregationQueryRequestProtoPreparer
+ implements ProtoPreparer, RunAggregationQueryRequest> {
+
+ private final DatastoreOptions datastoreOptions;
+ private final StructuredQueryProtoPreparer structuredQueryProtoPreparer;
+ private final GqlQueryProtoPreparer gqlQueryProtoPreparer;
+ private final ReadOptionProtoPreparer readOptionProtoPreparer;
+
+ public AggregationQueryRequestProtoPreparer(DatastoreOptions datastoreOptions) {
+ this.datastoreOptions = datastoreOptions;
+ this.structuredQueryProtoPreparer = new StructuredQueryProtoPreparer();
+ this.gqlQueryProtoPreparer = new GqlQueryProtoPreparer();
+ this.readOptionProtoPreparer = new ReadOptionProtoPreparer();
+ }
+
+ @Override
+ public RunAggregationQueryRequest prepare(
+ QueryAndReadOptions aggregationQueryAndReadOptions) {
+ AggregationQuery aggregationQuery = aggregationQueryAndReadOptions.getQuery();
+ List readOptions = aggregationQueryAndReadOptions.getReadOptions();
+ PartitionId partitionId = getPartitionId(aggregationQuery);
+ RunAggregationQueryRequest.Builder aggregationQueryRequestBuilder =
+ RunAggregationQueryRequest.newBuilder()
+ .setPartitionId(partitionId)
+ .setProjectId(datastoreOptions.getProjectId());
+
+ if (aggregationQuery.getMode() == GQL) {
+ aggregationQueryRequestBuilder.setGqlQuery(buildGqlQuery(aggregationQuery));
+ } else {
+ aggregationQueryRequestBuilder.setAggregationQuery(getAggregationQuery(aggregationQuery));
+ }
+
+ Optional readOptionsPb = readOptionProtoPreparer.prepare(readOptions);
+ readOptionsPb.ifPresent(aggregationQueryRequestBuilder::setReadOptions);
+ return aggregationQueryRequestBuilder.build();
+ }
+
+ private GqlQuery buildGqlQuery(AggregationQuery aggregationQuery) {
+ return gqlQueryProtoPreparer.prepare(aggregationQuery.getNestedGqlQuery());
+ }
+
+ private com.google.datastore.v1.AggregationQuery getAggregationQuery(
+ AggregationQuery aggregationQuery) {
+ Query nestedQueryProto =
+ structuredQueryProtoPreparer.prepare(aggregationQuery.getNestedStructuredQuery());
+
+ com.google.datastore.v1.AggregationQuery.Builder aggregationQueryProtoBuilder =
+ com.google.datastore.v1.AggregationQuery.newBuilder().setNestedQuery(nestedQueryProto);
+ for (Aggregation aggregation : aggregationQuery.getAggregations()) {
+ aggregationQueryProtoBuilder.addAggregations(aggregation.toPb());
+ }
+ return aggregationQueryProtoBuilder.build();
+ }
+
+ private PartitionId getPartitionId(AggregationQuery aggregationQuery) {
+ PartitionId.Builder builder =
+ PartitionId.newBuilder().setProjectId(datastoreOptions.getProjectId());
+ if (aggregationQuery.getNamespace() != null) {
+ builder.setNamespaceId(aggregationQuery.getNamespace());
+ }
+ return builder.build();
+ }
+}
diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/response/AggregationQueryResponseTransformer.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/response/AggregationQueryResponseTransformer.java
new file mode 100644
index 000000000..1515a1147
--- /dev/null
+++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/response/AggregationQueryResponseTransformer.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore.execution.response;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.Timestamp;
+import com.google.cloud.datastore.AggregationResult;
+import com.google.cloud.datastore.AggregationResults;
+import com.google.cloud.datastore.LongValue;
+import com.google.datastore.v1.RunAggregationQueryResponse;
+import com.google.datastore.v1.Value;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@InternalApi
+public class AggregationQueryResponseTransformer
+ implements ResponseTransformer {
+
+ @Override
+ public AggregationResults transform(RunAggregationQueryResponse response) {
+ Timestamp readTime = Timestamp.fromProto(response.getBatch().getReadTime());
+ List aggregationResults =
+ response.getBatch().getAggregationResultsList().stream()
+ .map(
+ aggregationResult -> new AggregationResult(resultWithLongValues(aggregationResult)))
+ .collect(Collectors.toCollection(LinkedList::new));
+ return new AggregationResults(aggregationResults, readTime);
+ }
+
+ private Map resultWithLongValues(
+ com.google.datastore.v1.AggregationResult aggregationResult) {
+ return aggregationResult.getAggregatePropertiesMap().entrySet().stream()
+ .map(
+ (Function, Entry>)
+ entry ->
+ new SimpleEntry<>(
+ entry.getKey(), (LongValue) LongValue.fromPb(entry.getValue())))
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+ }
+}
diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/response/ResponseTransformer.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/response/ResponseTransformer.java
new file mode 100644
index 000000000..b17da3f79
--- /dev/null
+++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/execution/response/ResponseTransformer.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore.execution.response;
+
+import com.google.api.core.InternalApi;
+
+/**
+ * An internal functional interface whose implementation has the responsibility to populate a Domain
+ * object from a proto response.
+ *
+ * @param the type of proto response object.
+ * @param the type of domain object.
+ */
+@InternalApi
+public interface ResponseTransformer {
+ OUTPUT transform(INPUT response);
+}
diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/DatastoreRpc.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/DatastoreRpc.java
index 5e64c9255..33b8e11ea 100644
--- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/DatastoreRpc.java
+++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/DatastoreRpc.java
@@ -30,6 +30,8 @@
import com.google.datastore.v1.ReserveIdsResponse;
import com.google.datastore.v1.RollbackRequest;
import com.google.datastore.v1.RollbackResponse;
+import com.google.datastore.v1.RunAggregationQueryRequest;
+import com.google.datastore.v1.RunAggregationQueryResponse;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
@@ -85,4 +87,13 @@ BeginTransactionResponse beginTransaction(BeginTransactionRequest request)
* @throws DatastoreException upon failure
*/
RunQueryResponse runQuery(RunQueryRequest request);
+
+ /**
+ * Sends a request to run an aggregation query.
+ *
+ * @throws DatastoreException upon failure
+ */
+ default RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryRequest request) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
}
diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/HttpDatastoreRpc.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/HttpDatastoreRpc.java
index 4f13b4600..fd3cdc658 100644
--- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/HttpDatastoreRpc.java
+++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/HttpDatastoreRpc.java
@@ -36,6 +36,8 @@
import com.google.datastore.v1.ReserveIdsResponse;
import com.google.datastore.v1.RollbackRequest;
import com.google.datastore.v1.RollbackResponse;
+import com.google.datastore.v1.RunAggregationQueryRequest;
+import com.google.datastore.v1.RunAggregationQueryResponse;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
import java.io.IOException;
@@ -200,4 +202,13 @@ public RunQueryResponse runQuery(RunQueryRequest request) {
throw translate(ex);
}
}
+
+ @Override
+ public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryRequest request) {
+ try {
+ return client.runAggregationQuery(request);
+ } catch (com.google.datastore.v1.client.DatastoreException ex) {
+ throw translate(ex);
+ }
+ }
}
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/AggregationQueryTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/AggregationQueryTest.java
new file mode 100644
index 000000000..840d23bca
--- /dev/null
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/AggregationQueryTest.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore;
+
+import static com.google.cloud.datastore.AggregationQuery.Mode.GQL;
+import static com.google.cloud.datastore.AggregationQuery.Mode.STRUCTURED;
+import static com.google.cloud.datastore.StructuredQuery.PropertyFilter.eq;
+import static com.google.cloud.datastore.aggregation.Aggregation.count;
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+import com.google.cloud.datastore.aggregation.CountAggregation;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class AggregationQueryTest {
+
+ private static final String KIND = "Task";
+ private static final String NAMESPACE = "ns";
+ private static final EntityQuery COMPLETED_TASK_QUERY =
+ Query.newEntityQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .setKind(KIND)
+ .setFilter(eq("done", true))
+ .setLimit(100)
+ .build();
+
+ @Rule public ExpectedException exceptionRule = ExpectedException.none();
+
+ @Test
+ public void testAggregations() {
+ AggregationQuery aggregationQuery =
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .addAggregation(new CountAggregation("total"))
+ .over(COMPLETED_TASK_QUERY)
+ .build();
+
+ assertThat(aggregationQuery.getNamespace()).isEqualTo(NAMESPACE);
+ assertThat(aggregationQuery.getAggregations())
+ .isEqualTo(ImmutableSet.of(count().as("total").build()));
+ assertThat(aggregationQuery.getNestedStructuredQuery()).isEqualTo(COMPLETED_TASK_QUERY);
+ assertThat(aggregationQuery.getMode()).isEqualTo(STRUCTURED);
+ }
+
+ @Test
+ public void testAggregationBuilderWithMoreThanOneAggregations() {
+ AggregationQuery aggregationQuery =
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .addAggregation(count().as("total"))
+ .addAggregation(count().as("new_total"))
+ .over(COMPLETED_TASK_QUERY)
+ .build();
+
+ assertThat(aggregationQuery.getNamespace()).isEqualTo(NAMESPACE);
+ assertThat(aggregationQuery.getAggregations())
+ .isEqualTo(ImmutableSet.of(count().as("total").build(), count().as("new_total").build()));
+ assertThat(aggregationQuery.getNestedStructuredQuery()).isEqualTo(COMPLETED_TASK_QUERY);
+ assertThat(aggregationQuery.getMode()).isEqualTo(STRUCTURED);
+ }
+
+ @Test
+ public void testAggregationBuilderWithDuplicateAggregations() {
+ AggregationQuery aggregationQuery =
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .addAggregation(count().as("total"))
+ .addAggregation(count().as("total"))
+ .over(COMPLETED_TASK_QUERY)
+ .build();
+
+ assertThat(aggregationQuery.getNamespace()).isEqualTo(NAMESPACE);
+ assertThat(aggregationQuery.getAggregations())
+ .isEqualTo(ImmutableSet.of(count().as("total").build()));
+ assertThat(aggregationQuery.getNestedStructuredQuery()).isEqualTo(COMPLETED_TASK_QUERY);
+ assertThat(aggregationQuery.getMode()).isEqualTo(STRUCTURED);
+ }
+
+ @Test
+ public void testAggregationQueryBuilderWithoutNamespace() {
+ AggregationQuery aggregationQuery =
+ Query.newAggregationQueryBuilder()
+ .addAggregation(count().as("total"))
+ .over(COMPLETED_TASK_QUERY)
+ .build();
+
+ assertNull(aggregationQuery.getNamespace());
+ assertThat(aggregationQuery.getAggregations())
+ .isEqualTo(ImmutableSet.of(count().as("total").build()));
+ assertThat(aggregationQuery.getNestedStructuredQuery()).isEqualTo(COMPLETED_TASK_QUERY);
+ assertThat(aggregationQuery.getMode()).isEqualTo(STRUCTURED);
+ }
+
+ @Test
+ public void testAggregationQueryBuilderWithoutNestedQuery() {
+ assertThrows(
+ "Nested query is required for an aggregation query to run",
+ IllegalArgumentException.class,
+ () ->
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .addAggregation(count().as("total"))
+ .build());
+ }
+
+ @Test
+ public void testAggregationQueryBuilderWithoutAggregation() {
+ assertThrows(
+ "At least one aggregation is required for an aggregation query to run",
+ IllegalArgumentException.class,
+ () ->
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .over(COMPLETED_TASK_QUERY)
+ .build());
+ }
+
+ @Test
+ public void testAggregationQueryBuilderWithGqlQuery() {
+ GqlQuery> gqlQuery = Query.newGqlQueryBuilder("SELECT * FROM Task WHERE done = true").build();
+
+ AggregationQuery aggregationQuery =
+ Query.newAggregationQueryBuilder().setNamespace(NAMESPACE).over(gqlQuery).build();
+
+ assertThat(aggregationQuery.getNestedGqlQuery()).isEqualTo(gqlQuery);
+ assertThat(aggregationQuery.getMode()).isEqualTo(GQL);
+ }
+
+ @Test
+ public void testAggregationQueryBuilderWithoutProvidingAnyNestedQuery() {
+ assertThrows(
+ "Nested query is required for an aggregation query to run",
+ IllegalArgumentException.class,
+ () -> Query.newAggregationQueryBuilder().setNamespace(NAMESPACE).build());
+ }
+}
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/AggregationResultTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/AggregationResultTest.java
new file mode 100644
index 000000000..06a5cb5f7
--- /dev/null
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/AggregationResultTest.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+public class AggregationResultTest {
+
+ @Test
+ public void shouldGetAggregationResultValueByAlias() {
+ AggregationResult aggregationResult =
+ new AggregationResult(
+ ImmutableMap.of(
+ "count", LongValue.of(45),
+ "property_2", LongValue.of(30)));
+
+ assertThat(aggregationResult.get("count")).isEqualTo(45L);
+ assertThat(aggregationResult.get("property_2")).isEqualTo(30L);
+ }
+}
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreTest.java
index 72067fd20..7dc625bad 100644
--- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreTest.java
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreTest.java
@@ -16,6 +16,11 @@
package com.google.cloud.datastore;
+import static com.google.cloud.datastore.ProtoTestData.intValue;
+import static com.google.cloud.datastore.TestUtils.matches;
+import static com.google.cloud.datastore.aggregation.Aggregation.count;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static com.google.common.truth.Truth.assertThat;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
@@ -37,8 +42,10 @@
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.testing.LocalDatastoreHelper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.datastore.v1.AggregationResultBatch;
import com.google.datastore.v1.BeginTransactionRequest;
import com.google.datastore.v1.BeginTransactionResponse;
import com.google.datastore.v1.CommitRequest;
@@ -54,6 +61,8 @@
import com.google.datastore.v1.ReserveIdsResponse;
import com.google.datastore.v1.RollbackRequest;
import com.google.datastore.v1.RollbackResponse;
+import com.google.datastore.v1.RunAggregationQueryRequest;
+import com.google.datastore.v1.RunAggregationQueryResponse;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
import com.google.datastore.v1.TransactionOptions;
@@ -62,11 +71,14 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
import org.easymock.EasyMock;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -526,6 +538,27 @@ public void testGqlQueryPagination() throws DatastoreException {
EasyMock.verify(rpcFactoryMock, rpcMock);
}
+ @Test
+ public void testRunAggregationQuery() {
+ RunAggregationQueryResponse aggregationQueryResponse = placeholderAggregationQueryResponse();
+ EasyMock.expect(rpcMock.runAggregationQuery(matches(aggregationQueryWithAlias("total_count"))))
+ .andReturn(aggregationQueryResponse);
+ EasyMock.replay(rpcFactoryMock, rpcMock);
+
+ Datastore mockDatastore = rpcMockOptions.getService();
+
+ EntityQuery selectAllQuery = Query.newEntityQueryBuilder().build();
+ AggregationQuery getCountQuery =
+ Query.newAggregationQueryBuilder()
+ .addAggregation(count().as("total_count"))
+ .over(selectAllQuery)
+ .build();
+ AggregationResult result = getOnlyElement(mockDatastore.runAggregation(getCountQuery));
+
+ assertThat(result.get("total_count")).isEqualTo(209L);
+ EasyMock.verify(rpcFactoryMock, rpcMock);
+ }
+
@Test
public void testRunStructuredQuery() {
Query query =
@@ -1311,4 +1344,28 @@ public void testQueryWithStartCursor() {
assertEquals(cursor2, cursor1);
datastore.delete(entity1.getKey(), entity2.getKey(), entity3.getKey());
}
+
+ private RunAggregationQueryResponse placeholderAggregationQueryResponse() {
+ Map result1 =
+ new HashMap<>(ImmutableMap.of("total_count", intValue(209)));
+
+ AggregationResultBatch resultBatch =
+ AggregationResultBatch.newBuilder()
+ .addAggregationResults(
+ com.google.datastore.v1.AggregationResult.newBuilder()
+ .putAllAggregateProperties(result1)
+ .build())
+ .build();
+ return RunAggregationQueryResponse.newBuilder().setBatch(resultBatch).build();
+ }
+
+ private Predicate aggregationQueryWithAlias(String alias) {
+ return runAggregationQueryRequest ->
+ alias.equals(
+ runAggregationQueryRequest
+ .getAggregationQuery()
+ .getAggregationsList()
+ .get(0)
+ .getAlias());
+ }
}
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/ProtoTestData.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/ProtoTestData.java
index a923b618b..25b902fd4 100644
--- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/ProtoTestData.java
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/ProtoTestData.java
@@ -17,6 +17,8 @@
import static com.google.datastore.v1.PropertyOrder.Direction.ASCENDING;
+import com.google.datastore.v1.AggregationQuery.Aggregation;
+import com.google.datastore.v1.AggregationQuery.Aggregation.Count;
import com.google.datastore.v1.Filter;
import com.google.datastore.v1.GqlQueryParameter;
import com.google.datastore.v1.KindExpression;
@@ -63,6 +65,10 @@ public static PropertyReference propertyReference(String value) {
return PropertyReference.newBuilder().setName(value).build();
}
+ public static Aggregation countAggregation(String alias) {
+ return Aggregation.newBuilder().setAlias(alias).setCount(Count.newBuilder().build()).build();
+ }
+
public static PropertyOrder propertyOrder(String value) {
return PropertyOrder.newBuilder()
.setProperty(propertyReference(value))
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/ReadOptionProtoPreparerTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/ReadOptionProtoPreparerTest.java
index bda5de3b5..b16fdf100 100644
--- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/ReadOptionProtoPreparerTest.java
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/ReadOptionProtoPreparerTest.java
@@ -78,11 +78,11 @@ public void shouldPrepareReadOptionsWithReadTime() {
@Test
public void shouldPrepareReadOptionsWithTransactionId() {
- String dummyTransactionId = "transaction-id";
+ String transactionId = "transaction-id";
Optional readOptions =
- protoPreparer.prepare(singletonList(transactionId(dummyTransactionId)));
+ protoPreparer.prepare(singletonList(transactionId(transactionId)));
- assertThat(readOptions.get().getTransaction().toStringUtf8()).isEqualTo(dummyTransactionId);
+ assertThat(readOptions.get().getTransaction().toStringUtf8()).isEqualTo(transactionId);
}
@Test
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java
new file mode 100644
index 000000000..b86355afa
--- /dev/null
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore;
+
+import static com.google.cloud.datastore.TraceUtil.END_SPAN_OPTIONS;
+import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.rpc.Code.UNAVAILABLE;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.createStrictMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.cloud.datastore.spi.v1.DatastoreRpc;
+import com.google.datastore.v1.RunAggregationQueryRequest;
+import com.google.datastore.v1.RunAggregationQueryResponse;
+import io.opencensus.trace.Span;
+import io.opencensus.trace.Tracer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RetryAndTraceDatastoreRpcDecoratorTest {
+
+ public static final int MAX_ATTEMPTS = 3;
+ private DatastoreRpc mockDatastoreRpc;
+ private TraceUtil mockTraceUtil;
+ private DatastoreOptions datastoreOptions =
+ DatastoreOptions.newBuilder().setProjectId("project-id").build();
+ private RetrySettings retrySettings =
+ RetrySettings.newBuilder().setMaxAttempts(MAX_ATTEMPTS).build();
+
+ private RetryAndTraceDatastoreRpcDecorator datastoreRpcDecorator;
+
+ @Before
+ public void setUp() throws Exception {
+ mockDatastoreRpc = createStrictMock(DatastoreRpc.class);
+ mockTraceUtil = createStrictMock(TraceUtil.class);
+ datastoreRpcDecorator =
+ new RetryAndTraceDatastoreRpcDecorator(
+ mockDatastoreRpc, mockTraceUtil, retrySettings, datastoreOptions);
+ }
+
+ @Test
+ public void testRunAggregationQuery() {
+ Span mockSpan = createStrictMock(Span.class);
+ RunAggregationQueryRequest aggregationQueryRequest =
+ RunAggregationQueryRequest.getDefaultInstance();
+ RunAggregationQueryResponse aggregationQueryResponse =
+ RunAggregationQueryResponse.getDefaultInstance();
+
+ expect(mockDatastoreRpc.runAggregationQuery(aggregationQueryRequest))
+ .andThrow(
+ new DatastoreException(
+ UNAVAILABLE.getNumber(), "API not accessible currently", UNAVAILABLE.name()))
+ .times(2)
+ .andReturn(aggregationQueryResponse);
+ expect(mockTraceUtil.startSpan(SPAN_NAME_RUN_AGGREGATION_QUERY)).andReturn(mockSpan);
+ expect(mockTraceUtil.getTracer()).andReturn(createNiceMock(Tracer.class));
+ mockSpan.end(END_SPAN_OPTIONS);
+
+ replay(mockDatastoreRpc, mockTraceUtil, mockSpan);
+
+ RunAggregationQueryResponse actualAggregationQueryResponse =
+ datastoreRpcDecorator.runAggregationQuery(aggregationQueryRequest);
+
+ assertThat(actualAggregationQueryResponse).isSameInstanceAs(aggregationQueryResponse);
+ verify(mockDatastoreRpc, mockTraceUtil, mockSpan);
+ }
+}
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/TestUtils.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/TestUtils.java
new file mode 100644
index 000000000..3a3fcfaea
--- /dev/null
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/TestUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore;
+
+import java.util.function.Predicate;
+import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+
+public class TestUtils {
+
+ public static T matches(Predicate predicate) {
+ EasyMock.reportMatcher(
+ new IArgumentMatcher() {
+ @Override
+ public boolean matches(Object argument) {
+ return predicate.test(((T) argument));
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {
+ buffer.append("matches(\"").append(predicate).append("\")");
+ }
+ });
+ return null;
+ }
+}
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/aggregation/CountAggregationTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/aggregation/CountAggregationTest.java
new file mode 100644
index 000000000..a8b3bc945
--- /dev/null
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/aggregation/CountAggregationTest.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.datastore.aggregation;
+
+import static com.google.cloud.datastore.aggregation.Aggregation.count;
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.datastore.v1.AggregationQuery;
+import org.junit.Test;
+
+public class CountAggregationTest {
+
+ @Test
+ public void testCountAggregationWithDefaultValues() {
+ AggregationQuery.Aggregation countAggregationPb = count().build().toPb();
+
+ assertThat(countAggregationPb.getCount().getUpTo().getValue()).isEqualTo(0L);
+ assertThat(countAggregationPb.getAlias()).isEqualTo("");
+ }
+
+ @Test
+ public void testCountAggregationWithAlias() {
+ AggregationQuery.Aggregation countAggregationPb = count().as("column_1").build().toPb();
+
+ assertThat(countAggregationPb.getCount().getUpTo().getValue()).isEqualTo(0L);
+ assertThat(countAggregationPb.getAlias()).isEqualTo("column_1");
+ }
+
+ @Test
+ public void testEquals() {
+ CountAggregation.Builder aggregationWithAlias1 = count().as("total");
+ CountAggregation.Builder aggregationWithAlias2 = count().as("total");
+ CountAggregation.Builder aggregationWithoutAlias1 = count();
+ CountAggregation.Builder aggregationWithoutAlias2 = count();
+
+ // same aliases
+ assertThat(aggregationWithAlias1.build()).isEqualTo(aggregationWithAlias2.build());
+ assertThat(aggregationWithAlias2.build()).isEqualTo(aggregationWithAlias1.build());
+
+ // with and without aliases
+ assertThat(aggregationWithAlias1.build()).isNotEqualTo(aggregationWithoutAlias1.build());
+ assertThat(aggregationWithoutAlias1.build()).isNotEqualTo(aggregationWithAlias1.build());
+
+ // no aliases
+ assertThat(aggregationWithoutAlias1.build()).isEqualTo(aggregationWithoutAlias2.build());
+ assertThat(aggregationWithoutAlias2.build()).isEqualTo(aggregationWithoutAlias1.build());
+
+ // different aliases
+ assertThat(aggregationWithAlias1.as("new-alias").build())
+ .isNotEqualTo(aggregationWithAlias2.build());
+ assertThat(aggregationWithAlias2.build())
+ .isNotEqualTo(aggregationWithAlias1.as("new-alias").build());
+ }
+}
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/AggregationQueryExecutorTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/AggregationQueryExecutorTest.java
new file mode 100644
index 000000000..f9f23261d
--- /dev/null
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/AggregationQueryExecutorTest.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore.execution;
+
+import static com.google.cloud.datastore.ProtoTestData.intValue;
+import static com.google.cloud.datastore.ReadOption.eventualConsistency;
+import static com.google.cloud.datastore.StructuredQuery.PropertyFilter.eq;
+import static com.google.cloud.datastore.TestUtils.matches;
+import static com.google.cloud.datastore.aggregation.Aggregation.count;
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.datastore.v1.ReadOptions.ReadConsistency.EVENTUAL;
+import static java.util.Arrays.asList;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.datastore.AggregationQuery;
+import com.google.cloud.datastore.AggregationResult;
+import com.google.cloud.datastore.AggregationResults;
+import com.google.cloud.datastore.DatastoreOptions;
+import com.google.cloud.datastore.EntityQuery;
+import com.google.cloud.datastore.LongValue;
+import com.google.cloud.datastore.Query;
+import com.google.cloud.datastore.spi.v1.DatastoreRpc;
+import com.google.common.collect.ImmutableMap;
+import com.google.datastore.v1.AggregationResultBatch;
+import com.google.datastore.v1.RunAggregationQueryRequest;
+import com.google.datastore.v1.RunAggregationQueryResponse;
+import com.google.datastore.v1.Value;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AggregationQueryExecutorTest {
+
+ private static final String KIND = "Task";
+ private static final String NAMESPACE = "ns";
+
+ private DatastoreRpc mockRpc;
+ private AggregationQueryExecutor queryExecutor;
+ private DatastoreOptions datastoreOptions;
+
+ @Before
+ public void setUp() throws Exception {
+ mockRpc = EasyMock.createStrictMock(DatastoreRpc.class);
+ datastoreOptions =
+ DatastoreOptions.newBuilder().setProjectId("project-id").setNamespace(NAMESPACE).build();
+ queryExecutor = new AggregationQueryExecutor(mockRpc, datastoreOptions);
+ }
+
+ @Test
+ public void shouldExecuteAggregationQuery() {
+ EntityQuery nestedQuery =
+ Query.newEntityQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .setKind(KIND)
+ .setFilter(eq("done", true))
+ .build();
+
+ AggregationQuery aggregationQuery =
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .addAggregation(count().as("total"))
+ .over(nestedQuery)
+ .build();
+
+ RunAggregationQueryResponse runAggregationQueryResponse = placeholderAggregationQueryResponse();
+ expect(mockRpc.runAggregationQuery(anyObject(RunAggregationQueryRequest.class)))
+ .andReturn(runAggregationQueryResponse);
+
+ replay(mockRpc);
+
+ AggregationResults aggregationResults = queryExecutor.execute(aggregationQuery);
+
+ verify(mockRpc);
+ assertThat(aggregationResults)
+ .isEqualTo(
+ new AggregationResults(
+ asList(
+ new AggregationResult(
+ ImmutableMap.of(
+ "count", LongValue.of(209), "property_2", LongValue.of(100))),
+ new AggregationResult(
+ ImmutableMap.of(
+ "count", LongValue.of(509), "property_2", LongValue.of(100)))),
+ Timestamp.fromProto(runAggregationQueryResponse.getBatch().getReadTime())));
+ }
+
+ @Test
+ public void shouldExecuteAggregationQueryWithReadOptions() {
+ EntityQuery nestedQuery =
+ Query.newEntityQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .setKind(KIND)
+ .setFilter(eq("done", true))
+ .build();
+
+ AggregationQuery aggregationQuery =
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .addAggregation(count().as("total"))
+ .over(nestedQuery)
+ .build();
+
+ RunAggregationQueryResponse runAggregationQueryResponse = placeholderAggregationQueryResponse();
+ expect(mockRpc.runAggregationQuery(matches(runAggregationRequestWithEventualConsistency())))
+ .andReturn(runAggregationQueryResponse);
+
+ replay(mockRpc);
+
+ AggregationResults aggregationResults =
+ queryExecutor.execute(aggregationQuery, eventualConsistency());
+
+ verify(mockRpc);
+ assertThat(aggregationResults)
+ .isEqualTo(
+ new AggregationResults(
+ asList(
+ new AggregationResult(
+ ImmutableMap.of(
+ "count", LongValue.of(209), "property_2", LongValue.of(100))),
+ new AggregationResult(
+ ImmutableMap.of(
+ "count", LongValue.of(509), "property_2", LongValue.of(100)))),
+ Timestamp.fromProto(runAggregationQueryResponse.getBatch().getReadTime())));
+ }
+
+ private RunAggregationQueryResponse placeholderAggregationQueryResponse() {
+ Map result1 =
+ new HashMap<>(
+ ImmutableMap.of(
+ "count", intValue(209),
+ "property_2", intValue(100)));
+
+ Map result2 =
+ new HashMap<>(
+ ImmutableMap.of(
+ "count", intValue(509),
+ "property_2", intValue(100)));
+
+ AggregationResultBatch resultBatch =
+ AggregationResultBatch.newBuilder()
+ .addAggregationResults(
+ com.google.datastore.v1.AggregationResult.newBuilder()
+ .putAllAggregateProperties(result1)
+ .build())
+ .addAggregationResults(
+ com.google.datastore.v1.AggregationResult.newBuilder()
+ .putAllAggregateProperties(result2)
+ .build())
+ .build();
+ return RunAggregationQueryResponse.newBuilder().setBatch(resultBatch).build();
+ }
+
+ private Predicate runAggregationRequestWithEventualConsistency() {
+ return runAggregationQueryRequest ->
+ runAggregationQueryRequest.getReadOptions().getReadConsistency() == EVENTUAL;
+ }
+}
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/request/AggregationQueryRequestProtoPreparerTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/request/AggregationQueryRequestProtoPreparerTest.java
new file mode 100644
index 000000000..6301ebeff
--- /dev/null
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/request/AggregationQueryRequestProtoPreparerTest.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore.execution.request;
+
+import static com.google.cloud.datastore.ProtoTestData.booleanValue;
+import static com.google.cloud.datastore.ProtoTestData.countAggregation;
+import static com.google.cloud.datastore.ProtoTestData.gqlQueryParameter;
+import static com.google.cloud.datastore.ProtoTestData.intValue;
+import static com.google.cloud.datastore.ProtoTestData.kind;
+import static com.google.cloud.datastore.ProtoTestData.propertyFilter;
+import static com.google.cloud.datastore.ProtoTestData.stringValue;
+import static com.google.cloud.datastore.ReadOption.eventualConsistency;
+import static com.google.cloud.datastore.StructuredQuery.PropertyFilter.eq;
+import static com.google.cloud.datastore.aggregation.Aggregation.count;
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
+import static com.google.datastore.v1.ReadOptions.ReadConsistency.EVENTUAL;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.datastore.AggregationQuery;
+import com.google.cloud.datastore.DatastoreOptions;
+import com.google.cloud.datastore.EntityQuery;
+import com.google.cloud.datastore.GqlQuery;
+import com.google.cloud.datastore.Query;
+import com.google.cloud.datastore.ReadOption;
+import com.google.cloud.datastore.ReadOption.QueryAndReadOptions;
+import com.google.common.collect.ImmutableMap;
+import com.google.datastore.v1.RunAggregationQueryRequest;
+import java.util.HashMap;
+import org.junit.Test;
+
+public class AggregationQueryRequestProtoPreparerTest {
+
+ private static final String KIND = "Task";
+ private static final String NAMESPACE = "ns";
+ private static final String PROJECT_ID = "project-id";
+ private static final DatastoreOptions DATASTORE_OPTIONS =
+ DatastoreOptions.newBuilder().setProjectId(PROJECT_ID).setNamespace(NAMESPACE).build();
+ private static final EntityQuery COMPLETED_TASK_STRUCTURED_QUERY =
+ Query.newEntityQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .setKind(KIND)
+ .setFilter(eq("done", true))
+ .build();
+
+ private static final GqlQuery> COMPLETED_TASK_GQL_QUERY =
+ Query.newGqlQueryBuilder(
+ "AGGREGATE COUNT AS total_characters OVER ("
+ + "SELECT * FROM Character WHERE name = @name and age > @1"
+ + ")")
+ .setBinding("name", "John Doe")
+ .addBinding(27)
+ .build();
+
+ private final AggregationQuery AGGREGATION_OVER_STRUCTURED_QUERY =
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .addAggregation(count().as("total"))
+ .over(COMPLETED_TASK_STRUCTURED_QUERY)
+ .build();
+
+ private final AggregationQuery AGGREGATION_OVER_GQL_QUERY =
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .over(COMPLETED_TASK_GQL_QUERY)
+ .build();
+
+ private final AggregationQueryRequestProtoPreparer protoPreparer =
+ new AggregationQueryRequestProtoPreparer(DATASTORE_OPTIONS);
+
+ @Test
+ public void shouldPrepareAggregationQueryRequestWithGivenStructuredQuery() {
+ RunAggregationQueryRequest runAggregationQueryRequest =
+ protoPreparer.prepare(QueryAndReadOptions.create(AGGREGATION_OVER_STRUCTURED_QUERY));
+
+ assertThat(runAggregationQueryRequest.getProjectId()).isEqualTo(PROJECT_ID);
+
+ assertThat(runAggregationQueryRequest.getPartitionId().getProjectId()).isEqualTo(PROJECT_ID);
+ assertThat(runAggregationQueryRequest.getPartitionId().getNamespaceId()).isEqualTo(NAMESPACE);
+
+ com.google.datastore.v1.AggregationQuery aggregationQueryProto =
+ runAggregationQueryRequest.getAggregationQuery();
+ assertThat(aggregationQueryProto.getNestedQuery())
+ .isEqualTo(
+ com.google.datastore.v1.Query.newBuilder()
+ .addKind(kind(KIND))
+ .setFilter(propertyFilter("done", EQUAL, booleanValue(true)))
+ .build());
+ assertThat(aggregationQueryProto.getAggregationsList())
+ .isEqualTo(singletonList(countAggregation("total")));
+ }
+
+ @Test
+ public void shouldPrepareAggregationQueryRequestWithGivenGqlQuery() {
+ RunAggregationQueryRequest runAggregationQueryRequest =
+ protoPreparer.prepare(QueryAndReadOptions.create(AGGREGATION_OVER_GQL_QUERY));
+
+ assertThat(runAggregationQueryRequest.getProjectId()).isEqualTo(PROJECT_ID);
+
+ assertThat(runAggregationQueryRequest.getPartitionId().getProjectId()).isEqualTo(PROJECT_ID);
+ assertThat(runAggregationQueryRequest.getPartitionId().getNamespaceId()).isEqualTo(NAMESPACE);
+
+ com.google.datastore.v1.GqlQuery gqlQueryProto = runAggregationQueryRequest.getGqlQuery();
+
+ assertThat(gqlQueryProto.getQueryString()).isEqualTo(COMPLETED_TASK_GQL_QUERY.getQueryString());
+ assertThat(gqlQueryProto.getNamedBindingsMap())
+ .isEqualTo(
+ new HashMap<>(ImmutableMap.of("name", gqlQueryParameter(stringValue("John Doe")))));
+ assertThat(gqlQueryProto.getPositionalBindingsList())
+ .isEqualTo(asList(gqlQueryParameter(intValue(27))));
+ }
+
+ @Test
+ public void shouldPrepareReadOptionsWithGivenStructuredQuery() {
+ RunAggregationQueryRequest eventualConsistencyAggregationRequest =
+ prepareQuery(AGGREGATION_OVER_STRUCTURED_QUERY, eventualConsistency());
+ assertThat(eventualConsistencyAggregationRequest.getReadOptions().getReadConsistency())
+ .isEqualTo(EVENTUAL);
+
+ Timestamp now = Timestamp.now();
+ RunAggregationQueryRequest readTimeAggregationRequest =
+ prepareQuery(AGGREGATION_OVER_STRUCTURED_QUERY, ReadOption.readTime(now));
+ assertThat(Timestamp.fromProto(readTimeAggregationRequest.getReadOptions().getReadTime()))
+ .isEqualTo(now);
+ }
+
+ @Test
+ public void shouldPrepareReadOptionsWithGivenGqlQuery() {
+ RunAggregationQueryRequest eventualConsistencyAggregationRequest =
+ prepareQuery(AGGREGATION_OVER_GQL_QUERY, eventualConsistency());
+ assertThat(eventualConsistencyAggregationRequest.getReadOptions().getReadConsistency())
+ .isEqualTo(EVENTUAL);
+
+ Timestamp now = Timestamp.now();
+ RunAggregationQueryRequest readTimeAggregationRequest =
+ prepareQuery(AGGREGATION_OVER_GQL_QUERY, ReadOption.readTime(now));
+ assertThat(Timestamp.fromProto(readTimeAggregationRequest.getReadOptions().getReadTime()))
+ .isEqualTo(now);
+ }
+
+ @Test
+ public void shouldPrepareAggregationQueryWithoutNamespace() {
+ AggregationQuery structuredQueryWithoutNamespace =
+ Query.newAggregationQueryBuilder()
+ .addAggregation(count().as("total"))
+ .over(COMPLETED_TASK_STRUCTURED_QUERY)
+ .build();
+ AggregationQuery gqlQueryWithoutNamespace =
+ Query.newAggregationQueryBuilder().over(COMPLETED_TASK_GQL_QUERY).build();
+
+ RunAggregationQueryRequest runAggregationQueryFromStructuredQuery =
+ protoPreparer.prepare(QueryAndReadOptions.create(structuredQueryWithoutNamespace));
+ RunAggregationQueryRequest runAggregationQueryFromGqlQuery =
+ protoPreparer.prepare(QueryAndReadOptions.create(gqlQueryWithoutNamespace));
+
+ assertThat(runAggregationQueryFromStructuredQuery.getPartitionId().getNamespaceId())
+ .isEqualTo("");
+ assertThat(runAggregationQueryFromGqlQuery.getPartitionId().getNamespaceId()).isEqualTo("");
+ }
+
+ private RunAggregationQueryRequest prepareQuery(AggregationQuery query, ReadOption readOption) {
+ return protoPreparer.prepare(QueryAndReadOptions.create(query, singletonList(readOption)));
+ }
+}
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/response/AggregationQueryResponseTransformerTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/response/AggregationQueryResponseTransformerTest.java
new file mode 100644
index 000000000..8776d4221
--- /dev/null
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/execution/response/AggregationQueryResponseTransformerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore.execution.response;
+
+import static com.google.cloud.datastore.ProtoTestData.intValue;
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.datastore.AggregationResult;
+import com.google.cloud.datastore.AggregationResults;
+import com.google.cloud.datastore.LongValue;
+import com.google.common.collect.ImmutableMap;
+import com.google.datastore.v1.AggregationResultBatch;
+import com.google.datastore.v1.RunAggregationQueryResponse;
+import com.google.datastore.v1.Value;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.junit.Test;
+
+public class AggregationQueryResponseTransformerTest {
+
+ private final AggregationQueryResponseTransformer responseTransformer =
+ new AggregationQueryResponseTransformer();
+
+ @Test
+ public void shouldTransformAggregationQueryResponse() {
+ Map result1 =
+ new HashMap<>(
+ ImmutableMap.of(
+ "count", intValue(209),
+ "property_2", intValue(100)));
+
+ Map result2 =
+ new HashMap<>(
+ ImmutableMap.of(
+ "count", intValue(509),
+ "property_2", intValue(100)));
+ Timestamp readTime = Timestamp.now();
+
+ AggregationResultBatch resultBatch =
+ AggregationResultBatch.newBuilder()
+ .addAggregationResults(
+ com.google.datastore.v1.AggregationResult.newBuilder()
+ .putAllAggregateProperties(result1)
+ .build())
+ .addAggregationResults(
+ com.google.datastore.v1.AggregationResult.newBuilder()
+ .putAllAggregateProperties(result2)
+ .build())
+ .setReadTime(readTime.toProto())
+ .build();
+ RunAggregationQueryResponse runAggregationQueryResponse =
+ RunAggregationQueryResponse.newBuilder().setBatch(resultBatch).build();
+
+ AggregationResults aggregationResults =
+ responseTransformer.transform(runAggregationQueryResponse);
+
+ assertThat(aggregationResults.size()).isEqualTo(2);
+ assertThat(aggregationResults.get(0)).isEqualTo(new AggregationResult(toDomainValues(result1)));
+ assertThat(aggregationResults.get(1)).isEqualTo(new AggregationResult(toDomainValues(result2)));
+ assertThat(aggregationResults.getReadTime()).isEqualTo(readTime);
+ }
+
+ private Map toDomainValues(Map map) {
+
+ return map.entrySet().stream()
+ .map(
+ (Function, Entry>)
+ entry ->
+ new SimpleEntry<>(
+ entry.getKey(), (LongValue) LongValue.fromPb(entry.getValue())))
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+ }
+}
diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java
index 869984932..b8c3bb4b6 100644
--- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java
+++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java
@@ -16,6 +16,9 @@
package com.google.cloud.datastore.it;
+import static com.google.cloud.datastore.aggregation.Aggregation.count;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -26,14 +29,17 @@
import static org.junit.Assert.fail;
import com.google.cloud.Timestamp;
+import com.google.cloud.datastore.AggregationQuery;
import com.google.cloud.datastore.Batch;
import com.google.cloud.datastore.BooleanValue;
import com.google.cloud.datastore.Cursor;
import com.google.cloud.datastore.Datastore;
+import com.google.cloud.datastore.Datastore.TransactionCallable;
import com.google.cloud.datastore.DatastoreException;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.DatastoreReaderWriter;
import com.google.cloud.datastore.Entity;
+import com.google.cloud.datastore.EntityQuery;
import com.google.cloud.datastore.EntityValue;
import com.google.cloud.datastore.FullEntity;
import com.google.cloud.datastore.GqlQuery;
@@ -61,13 +67,20 @@
import com.google.cloud.datastore.ValueType;
import com.google.cloud.datastore.testing.RemoteDatastoreHelper;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.datastore.v1.TransactionOptions;
+import com.google.datastore.v1.TransactionOptions.ReadOnly;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -170,7 +183,11 @@ public void setUp() {
@After
public void tearDown() {
- DATASTORE.delete(KEY1, KEY2, KEY3);
+ EntityQuery allEntitiesQuery = Query.newEntityQueryBuilder().build();
+ QueryResults allEntities = DATASTORE.run(allEntitiesQuery);
+ Key[] keysToDelete =
+ ImmutableList.copyOf(allEntities).stream().map(Entity::getKey).toArray(Key[]::new);
+ DATASTORE.delete(keysToDelete);
}
private Iterator getStronglyConsistentResults(Query scQuery, Query query)
@@ -506,6 +523,279 @@ public void testRunGqlQueryWithCasting() throws InterruptedException {
assertFalse(results3.hasNext());
}
+ @Test
+ public void testRunAggregationQuery() {
+ // verifying aggregation with an entity query
+ testCountAggregationWith(
+ builder ->
+ builder
+ .addAggregation(count().as("total_count"))
+ .over(
+ Query.newEntityQueryBuilder().setNamespace(NAMESPACE).setKind(KIND1).build()));
+
+ // verifying aggregation with a projection query
+ testCountAggregationWith(
+ builder ->
+ builder
+ .addAggregation(count().as("total_count"))
+ .over(
+ Query.newProjectionEntityQueryBuilder()
+ .setProjection("str")
+ .setNamespace(NAMESPACE)
+ .setKind(KIND1)
+ .build()));
+
+ // verifying aggregation with a key query
+ testCountAggregationWith(
+ builder ->
+ builder
+ .addAggregation(count().as("total_count"))
+ .over(Query.newKeyQueryBuilder().setNamespace(NAMESPACE).setKind(KIND1).build()));
+
+ // verifying aggregation with a GQL query
+ testCountAggregationWith(
+ builder ->
+ builder.over(
+ Query.newGqlQueryBuilder(
+ "AGGREGATE COUNT(*) AS total_count OVER (SELECT * FROM kind1)")
+ .setNamespace(NAMESPACE)
+ .build()));
+ }
+
+ @Test
+ public void testRunAggregationQueryWithLimit() {
+ // verifying aggregation with an entity query
+ testCountAggregationWithLimit(
+ builder ->
+ builder
+ .addAggregation(count().as("total_count"))
+ .over(Query.newEntityQueryBuilder().setNamespace(NAMESPACE).setKind(KIND1).build()),
+ ((builder, limit) ->
+ builder
+ .addAggregation(count().as("total_count"))
+ .over(
+ Query.newEntityQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .setKind(KIND1)
+ .setLimit(limit.intValue())
+ .build())));
+
+ // verifying aggregation with a projection query
+ testCountAggregationWithLimit(
+ builder ->
+ builder
+ .addAggregation(count().as("total_count"))
+ .over(
+ Query.newProjectionEntityQueryBuilder()
+ .setProjection("str")
+ .setNamespace(NAMESPACE)
+ .setKind(KIND1)
+ .build()),
+ ((builder, limit) ->
+ builder
+ .addAggregation(count().as("total_count"))
+ .over(
+ Query.newProjectionEntityQueryBuilder()
+ .setProjection("str")
+ .setNamespace(NAMESPACE)
+ .setKind(KIND1)
+ .setLimit(limit.intValue())
+ .build())));
+
+ // verifying aggregation with a key query
+ testCountAggregationWithLimit(
+ builder ->
+ builder
+ .addAggregation(count().as("total_count"))
+ .over(Query.newKeyQueryBuilder().setNamespace(NAMESPACE).setKind(KIND1).build()),
+ (builder, limit) ->
+ builder
+ .addAggregation(count().as("total_count"))
+ .over(
+ Query.newKeyQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .setKind(KIND1)
+ .setLimit(limit.intValue())
+ .build()));
+
+ // verifying aggregation with a GQL query
+ testCountAggregationWithLimit(
+ builder ->
+ builder.over(
+ Query.newGqlQueryBuilder(
+ "AGGREGATE COUNT(*) AS total_count OVER (SELECT * FROM kind1)")
+ .setNamespace(NAMESPACE)
+ .build()),
+ (builder, limit) ->
+ builder.over(
+ Query.newGqlQueryBuilder(
+ "AGGREGATE COUNT(*) AS total_count OVER (SELECT * FROM kind1 LIMIT @limit)")
+ .setNamespace(NAMESPACE)
+ .setBinding("limit", limit)
+ .build()));
+ }
+
+ /**
+ * if an entity is modified or deleted within a transaction, a query or lookup returns the
+ * original version of the entity as of the beginning of the transaction, or nothing if the entity
+ * did not exist then.
+ *
+ * @see
+ * Source
+ */
+ @Test
+ public void testRunAggregationQueryInTransactionShouldReturnAConsistentSnapshot() {
+ Key newEntityKey = Key.newBuilder(KEY1, "newKind", "name-01").build();
+ EntityQuery entityQuery =
+ Query.newEntityQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .setFilter(PropertyFilter.hasAncestor(KEY1))
+ .build();
+
+ AggregationQuery aggregationQuery =
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .over(entityQuery)
+ .addAggregation(count().as("count"))
+ .build();
+
+ // original entity count is 2
+ assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).get("count"))
+ .isEqualTo(2L);
+
+ // FIRST TRANSACTION
+ DATASTORE.runInTransaction(
+ (TransactionCallable)
+ inFirstTransaction -> {
+ // creating a new entity
+ Entity aNewEntity =
+ Entity.newBuilder(ENTITY2).setKey(newEntityKey).set("v_int", 10).build();
+ inFirstTransaction.put(aNewEntity);
+
+ // count remains 2
+ assertThat(
+ getOnlyElement(inFirstTransaction.runAggregation(aggregationQuery))
+ .get("count"))
+ .isEqualTo(2L);
+ assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).get("count"))
+ .isEqualTo(2L);
+ return null;
+ });
+ // after first transaction is committed, count is updated to 3 now.
+ assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).get("count"))
+ .isEqualTo(3L);
+
+ // SECOND TRANSACTION
+ DATASTORE.runInTransaction(
+ (TransactionCallable)
+ inSecondTransaction -> {
+ // deleting ENTITY2
+ inSecondTransaction.delete(ENTITY2.getKey());
+
+ // count remains 3
+ assertThat(
+ getOnlyElement(inSecondTransaction.runAggregation(aggregationQuery))
+ .get("count"))
+ .isEqualTo(3L);
+ assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).get("count"))
+ .isEqualTo(3L);
+ return null;
+ });
+ // after second transaction is committed, count is updated to 2 now.
+ assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).get("count"))
+ .isEqualTo(2L);
+ DATASTORE.delete(newEntityKey);
+ }
+
+ @Test
+ public void testRunAggregationQueryInAReadOnlyTransactionShouldNotLockTheCountedDocuments()
+ throws Exception {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ EntityQuery entityQuery =
+ Query.newEntityQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .setFilter(PropertyFilter.hasAncestor(KEY1))
+ .build();
+ AggregationQuery aggregationQuery =
+ Query.newAggregationQueryBuilder()
+ .setNamespace(NAMESPACE)
+ .over(entityQuery)
+ .addAggregation(count().as("count"))
+ .build();
+
+ TransactionOptions transactionOptions =
+ TransactionOptions.newBuilder().setReadOnly(ReadOnly.newBuilder().build()).build();
+ Transaction readOnlyTransaction = DATASTORE.newTransaction(transactionOptions);
+
+ // Executing query in transaction
+ assertThat(getOnlyElement(readOnlyTransaction.runAggregation(aggregationQuery)).get("count"))
+ .isEqualTo(2L);
+
+ // Concurrent write task.
+ Future addNewEntityTaskOutsideTransaction =
+ executor.submit(
+ () -> {
+ Entity aNewEntity =
+ Entity.newBuilder(ENTITY2)
+ .setKey(Key.newBuilder(KEY1, "newKind", "name-01").build())
+ .set("v_int", 10)
+ .build();
+ DATASTORE.put(aNewEntity);
+ return null;
+ });
+
+ // should not throw exception and complete successfully as the ongoing transaction is read-only.
+ addNewEntityTaskOutsideTransaction.get();
+
+ // cleanup
+ readOnlyTransaction.commit();
+ executor.shutdownNow();
+
+ assertThat(getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).get("count"))
+ .isEqualTo(3L);
+ }
+
+ @Test
+ public void testRunAggregationQueryWithReadTime() throws InterruptedException {
+ String alias = "total_count";
+
+ // verifying aggregation readTime with an entity query
+ testCountAggregationReadTimeWith(
+ builder ->
+ builder
+ .over(Query.newEntityQueryBuilder().setKind("new_kind").build())
+ .addAggregation(count().as(alias)));
+
+ // verifying aggregation readTime with a projection query
+ testCountAggregationReadTimeWith(
+ builder ->
+ builder
+ .over(
+ Query.newProjectionEntityQueryBuilder()
+ .setProjection("name")
+ .setKind("new_kind")
+ .build())
+ .addAggregation(count().as(alias)));
+
+ // verifying aggregation readTime with a key query
+ testCountAggregationReadTimeWith(
+ builder ->
+ builder
+ .over(Query.newKeyQueryBuilder().setKind("new_kind").build())
+ .addAggregation(count().as(alias)));
+
+ // verifying aggregation readTime with a GQL query
+ testCountAggregationReadTimeWith(
+ builder ->
+ builder
+ .over(
+ Query.newGqlQueryBuilder(
+ "AGGREGATE COUNT(*) AS total_count OVER (SELECT * FROM new_kind)")
+ .build())
+ .addAggregation(count().as(alias)));
+ }
+
@Test
public void testRunStructuredQuery() throws InterruptedException {
Query query =
@@ -1067,4 +1357,92 @@ public void testQueryWithReadTime() throws InterruptedException {
DATASTORE.delete(entity1.getKey(), entity2.getKey(), entity3.getKey());
}
}
+
+ private void testCountAggregationWith(Consumer configurer) {
+ AggregationQuery.Builder builder = Query.newAggregationQueryBuilder().setNamespace(NAMESPACE);
+ configurer.accept(builder);
+ AggregationQuery aggregationQuery = builder.build();
+ String alias = "total_count";
+
+ Long countBeforeAdd = getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).get(alias);
+ long expectedCount = countBeforeAdd + 1;
+
+ Entity newEntity =
+ Entity.newBuilder(ENTITY1)
+ .setKey(Key.newBuilder(KEY3, KIND1, 1).build())
+ .set("null", NULL_VALUE)
+ .set("partial1", PARTIAL_ENTITY2)
+ .set("partial2", ENTITY2)
+ .build();
+ DATASTORE.put(newEntity);
+
+ Long countAfterAdd = getOnlyElement(DATASTORE.runAggregation(aggregationQuery)).get(alias);
+ assertThat(countAfterAdd).isEqualTo(expectedCount);
+
+ DATASTORE.delete(newEntity.getKey());
+ }
+
+ private void testCountAggregationWithLimit(
+ Consumer withoutLimitConfigurer,
+ BiConsumer withLimitConfigurer) {
+ String alias = "total_count";
+
+ AggregationQuery.Builder withoutLimitBuilder =
+ Query.newAggregationQueryBuilder().setNamespace(NAMESPACE);
+ withoutLimitConfigurer.accept(withoutLimitBuilder);
+
+ Long currentCount =
+ getOnlyElement(DATASTORE.runAggregation(withoutLimitBuilder.build())).get(alias);
+ long limit = currentCount - 1;
+
+ AggregationQuery.Builder withLimitBuilder =
+ Query.newAggregationQueryBuilder().setNamespace(NAMESPACE);
+ withLimitConfigurer.accept(withLimitBuilder, limit);
+
+ Long countWithLimit =
+ getOnlyElement(DATASTORE.runAggregation(withLimitBuilder.build())).get(alias);
+ assertThat(countWithLimit).isEqualTo(limit);
+ }
+
+ private void testCountAggregationReadTimeWith(Consumer configurer)
+ throws InterruptedException {
+ Entity entity1 =
+ Entity.newBuilder(
+ Key.newBuilder(PROJECT_ID, "new_kind", "name-01").setNamespace(NAMESPACE).build())
+ .set("name", "Tyrion Lannister")
+ .build();
+ Entity entity2 =
+ Entity.newBuilder(
+ Key.newBuilder(PROJECT_ID, "new_kind", "name-02").setNamespace(NAMESPACE).build())
+ .set("name", "Jaime Lannister")
+ .build();
+ Entity entity3 =
+ Entity.newBuilder(
+ Key.newBuilder(PROJECT_ID, "new_kind", "name-03").setNamespace(NAMESPACE).build())
+ .set("name", "Cersei Lannister")
+ .build();
+
+ DATASTORE.put(entity1, entity2);
+ Thread.sleep(1000);
+ Timestamp now = Timestamp.now();
+ Thread.sleep(1000);
+ DATASTORE.put(entity3);
+
+ try {
+ AggregationQuery.Builder builder = Query.newAggregationQueryBuilder().setNamespace(NAMESPACE);
+ configurer.accept(builder);
+ AggregationQuery countAggregationQuery = builder.build();
+
+ Long latestCount =
+ getOnlyElement(DATASTORE.runAggregation(countAggregationQuery)).get("total_count");
+ assertThat(latestCount).isEqualTo(3L);
+
+ Long oldCount =
+ getOnlyElement(DATASTORE.runAggregation(countAggregationQuery, ReadOption.readTime(now)))
+ .get("total_count");
+ assertThat(oldCount).isEqualTo(2L);
+ } finally {
+ DATASTORE.delete(entity1.getKey(), entity2.getKey(), entity3.getKey());
+ }
+ }
}