diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DCountDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DCountDocumentQueryExecutionContext.java new file mode 100644 index 0000000000000..09c113b557c28 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DCountDocumentQueryExecutionContext.java @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.Constants; +import com.azure.cosmos.implementation.Document; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.QueryMetrics; +import com.azure.cosmos.implementation.Resource; +import com.azure.cosmos.implementation.Strings; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.ModelBridgeInternal; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiFunction; + +/** + * Execution component that is able to aggregate COUNT(DISTINCT) from multiple continuations and partitions. + * + * @param Resource generic type + */ +public class DCountDocumentQueryExecutionContext implements IDocumentQueryExecutionComponent { + private final IDocumentQueryExecutionComponent component; + private final QueryInfo info; + private long count; + private ConcurrentMap queryMetricsMap = new ConcurrentHashMap<>(); + + private DCountDocumentQueryExecutionContext( + IDocumentQueryExecutionComponent component, + QueryInfo info, + long count) { + + if (component == null) { + throw new IllegalArgumentException("documentQueryExecutionComponent cannot be null"); + } + + this.component = component; + this.count = count; + this.info = info; + } + + public static Flux> createAsync( + BiFunction, Flux>> createSourceComponentFunction, + QueryInfo info, + String continuationToken, + PipelinedDocumentQueryParams documentQueryParams) { + + return createSourceComponentFunction + .apply(continuationToken, documentQueryParams) + .map(component -> new DCountDocumentQueryExecutionContext(component, info, 0 /*default count*/)); + } + + IDocumentQueryExecutionComponent getComponent() { + return this.component; + } + + @SuppressWarnings("unchecked") + @Override + public Flux> drainAsync(int maxPageSize) { + return this.component.drainAsync(maxPageSize) + .collectList() + .map(superList -> { + double requestCharge = 0; + Map headers = new HashMap<>(); + List diagnosticsList = new ArrayList<>(); + + for (FeedResponse page : superList) { + diagnosticsList.addAll(BridgeInternal + .getClientSideRequestStatisticsList(page + .getCosmosDiagnostics())); + count += page.getResults().size(); + requestCharge += page.getRequestCharge(); + QueryMetrics.mergeQueryMetricsMap(queryMetricsMap, + BridgeInternal.queryMetricsFromFeedResponse(page)); + } + + Document result = new Document(); + if (Strings.isNullOrEmpty(info.getDCountAlias())) { + if (info.hasSelectValue()) { + result.set(Constants.Properties.VALUE, count); + } else { + // Setting $1 as the key to be consistent with service results + result.set("$1", count); + } + } else { + result.set(info.getDCountAlias(), count); + } + headers.put(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double.toString(requestCharge)); + FeedResponse frp = + BridgeInternal.createFeedResponseWithQueryMetrics(Collections.singletonList(result), headers, + queryMetricsMap, null, false, + false, null); + + BridgeInternal.addClientSideDiagnosticsToFeed(frp.getCosmosDiagnostics(), diagnosticsList); + return (FeedResponse) BridgeInternal + .createFeedResponseWithQueryMetrics(Collections + .singletonList(result), + headers, + BridgeInternal + .queryMetricsFromFeedResponse(frp), + ModelBridgeInternal + .getQueryPlanDiagnosticsContext(frp), + false, + false, + frp.getCosmosDiagnostics()); + }) + .flux(); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DCountInfo.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DCountInfo.java new file mode 100644 index 0000000000000..29d22963cf968 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DCountInfo.java @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.query; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class DCountInfo { + @JsonProperty("dCountAlias") + private String dCountAlias; + + public String getDCountAlias() { + return dCountAlias; + } + + public void setDCountAlias(String dCountAlias) { + this.dCountAlias = dCountAlias; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java index 09d96a8f41427..06620310a1df7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java @@ -159,6 +159,18 @@ public static Flux, Flux>> createDCountComponentFunction; + if (queryInfo.hasDCount()) { + createDCountComponentFunction = (continuationToken, documentQueryParams) -> { + return DCountDocumentQueryExecutionContext.createAsync(createTakeComponentFunction, + queryInfo, + continuationToken, + documentQueryParams); + }; + } else { + createDCountComponentFunction = createTakeComponentFunction; + } + int actualPageSize = Utils.getValueOrDefault(ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(cosmosQueryRequestOptions), ParallelQueryConfig.ClientInternalPageSize); @@ -167,7 +179,7 @@ public static Flux new PipelinedDocumentQueryExecutionContext<>(c, pageSize, correlatedActivityId, queryInfo)); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java index d5ebd45222acd..ce2b4865767cd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java @@ -13,5 +13,6 @@ public enum QueryFeature { OffsetAndLimit, OrderBy, Top, - NonValueAggregate + NonValueAggregate, + DCount } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java index 6dc0b54e4aaf5..ad8b72bf11080 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.node.ObjectNode; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.Strings; import java.time.Instant; import java.util.Collection; @@ -32,6 +33,7 @@ public final class QueryInfo extends JsonSerializable { private Integer limit; private DistinctQueryType distinctQueryType; private QueryPlanDiagnosticsContext queryPlanDiagnosticsContext; + private DCountInfo dCountInfo; public QueryInfo() { } @@ -166,6 +168,23 @@ public List getGroupByAliases() { return super.getList("groupByAliases", String.class); } + public boolean hasDCount() { + return this.getDCountInfo() != null; + } + + public DCountInfo getDCountInfo() { + return this.dCountInfo != null ? + this.dCountInfo : (this.dCountInfo = super.getObject("dCountInfo", DCountInfo.class)); + } + + public String getDCountAlias() { + return this.dCountInfo.getDCountAlias(); + } + + public boolean isValueAggregate() { + return Strings.isNullOrEmpty(this.getDCountAlias()); + } + public QueryPlanDiagnosticsContext getQueryPlanDiagnosticsContext() { return queryPlanDiagnosticsContext; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java index 92c38ed1c6102..5200d0d1f4590 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java @@ -33,6 +33,7 @@ class QueryPlanRetriever { QueryFeature.Distinct.name() + ", " + QueryFeature.GroupBy.name() + ", " + QueryFeature.Top.name() + ", " + + QueryFeature.DCount.name() + ", " + QueryFeature.NonValueAggregate.name(); static Mono getQueryPlanThroughGatewayAsync(DiagnosticsClientContext diagnosticsClientContext, diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java index bb324a7cecf47..5f63fc3092fc8 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DistinctQueryTests.java @@ -5,10 +5,11 @@ import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosClientBuilder; -import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.FeedResponseListValidator; import com.azure.cosmos.implementation.FeedResponseValidator; +import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.guava25.collect.ImmutableMap; import com.azure.cosmos.implementation.query.UnorderedDistinctMap; import com.azure.cosmos.implementation.routing.UInt128; import com.azure.cosmos.models.CosmosQueryRequestOptions; @@ -26,9 +27,9 @@ import org.testng.annotations.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; @@ -116,102 +117,100 @@ public void queryDocuments(Boolean qmEnabled) { @Test(groups = {"simple"}, timeOut = TIMEOUT_LONG) public void queryDistinctDocuments() { - List queries = Arrays.asList( - // basic distinct queries - "SELECT %s VALUE null", - "SELECT %s VALUE false", - "SELECT %s VALUE true", - "SELECT %s VALUE 1", - "SELECT %s VALUE 'a'", - "SELECT %s VALUE [null, true, false, 1, 'a']", - "SELECT %s false AS p", - "SELECT %s 1 AS p", - "SELECT %s 'a' AS p", - - "SELECT %s VALUE null FROM c", - "SELECT %s VALUE false FROM c", - "SELECT %s VALUE 1 FROM c", - "SELECT %s VALUE 'a' FROM c", - "SELECT %s null AS p FROM c", - "SELECT %s false AS p FROM c", - "SELECT %s 1 AS p FROM c", - "SELECT %s 'a' AS p FROM c", + Map queries = ImmutableMap.builder() + // basic distinct queries + .put("SELECT %s VALUE null", true) + .put("SELECT %s VALUE false", false) + .put("SELECT %s VALUE true", false) + .put("SELECT %s VALUE 1", false) + .put("SELECT %s VALUE 'a'", true) + .put("SELECT %s VALUE [null, true, false, 1, 'a']", false) + .put("SELECT %s false AS p", true) + .put("SELECT %s 1 AS p", false) + .put("SELECT %s 'a' AS p", false) + .put("SELECT %s VALUE null FROM c", false) + .put("SELECT %s VALUE false FROM c", false) + .put("SELECT %s VALUE 1 FROM c", false) + .put("SELECT %s VALUE 'a' FROM c", false) + .put("SELECT %s null AS p FROM c", false) + .put("SELECT %s false AS p FROM c", false) + .put("SELECT %s 1 AS p FROM c", false) + .put("SELECT %s 'a' AS p FROM c", false) // number value distinct queries - "SELECT %s VALUE c.income from c", - "SELECT %s VALUE c.age from c", - "SELECT %s c.income, c.income AS income2 from c", - "SELECT %s c.income, c.age from c", + .put("SELECT %s VALUE c.income from c", true) + .put("SELECT %s VALUE c.age from c", false) + .put("SELECT %s c.income, c.income AS income2 from c", false) + .put("SELECT %s c.income, c.age from c", false) // string value distinct queries - "SELECT %s c.name from c", - "SELECT %s VALUE c.city from c", - "SELECT %s c.name, c.name AS name2 from c", - "SELECT %s c.name, c.city from c", + .put("SELECT %s c.name from c", true) + .put("SELECT %s VALUE c.city from c", false) + .put("SELECT %s c.name, c.name AS name2 from c", false) + .put("SELECT %s c.name, c.city from c", false) // array distinct queries - "SELECT %s c.children from c", - "SELECT %s c.children, c.children AS children2 from c", + .put("SELECT %s c.children from c", true) + .put("SELECT %s c.children, c.children AS children2 from c", false) // object value distinct queries - "SELECT %s VALUE c.pet from c", - "SELECT %s c.pet, c.pet AS pet2 from c", + .put("SELECT %s VALUE c.pet from c", true) + .put("SELECT %s c.pet, c.pet AS pet2 from c", false) // scalar expressions distinct query - "SELECT %s VALUE ABS(c.age) FROM c", - "SELECT %s VALUE LEFT(c.name, 1) FROM c", - "SELECT %s VALUE c.name || ', ' || (c.city ?? '') FROM c", - "SELECT %s VALUE ARRAY_LENGTH(c.children) FROM c", - "SELECT %s VALUE IS_DEFINED(c.city) FROM c", - "SELECT %s VALUE (c.children[0].age ?? 0) + (c.children[1].age ?? 0) FROM c", + .put("SELECT %s VALUE ABS(c.age) FROM c", true) + .put("SELECT %s VALUE LEFT(c.name, 1) FROM c", false) + .put("SELECT %s VALUE c.name || ', ' || (c.city ?? '') FROM c", false) + .put("SELECT %s VALUE ARRAY_LENGTH(c.children) FROM c", false) + .put("SELECT %s VALUE IS_DEFINED(c.city) FROM c", false) + .put("SELECT %s VALUE (c.children[0].age ?? 0) + (c.children[1].age ?? 0) FROM c", false) // distinct queries with order by - "SELECT %s c.name FROM c ORDER BY c.name ASC", - "SELECT %s c.age FROM c ORDER BY c.age", - "SELECT %s c.city FROM c ORDER BY c.city", - "SELECT %s c.city FROM c ORDER BY c.age", - "SELECT %s LEFT(c.name, 1) FROM c ORDER BY c.name", + .put("SELECT %s c.name FROM c ORDER BY c.name ASC", false) + .put("SELECT %s c.age FROM c ORDER BY c.age", false) + .put("SELECT %s c.city FROM c ORDER BY c.city", false) + .put("SELECT %s c.city FROM c ORDER BY c.age", false) + .put("SELECT %s LEFT(c.name, 1) FROM c ORDER BY c.name", false) // distinct queries with top and no matching order by - "SELECT %s TOP 2147483647 VALUE c.age FROM c", + .put("SELECT %s TOP 2147483647 VALUE c.age FROM c", false) // distinct queries with top and matching order by - "SELECT %s TOP 2147483647 c.age FROM c ORDER BY c.age", + .put("SELECT %s TOP 2147483647 c.age FROM c ORDER BY c.age", false) // distinct queries with aggregates - "SELECT %s VALUE MAX(c.age) FROM c", + .put("SELECT %s VALUE MAX(c.age) FROM c", false) // distinct queries with joins - "SELECT %s VALUE c.age FROM p JOIN c IN p.children", - "SELECT %s p.age AS ParentAge, c.age ChildAge FROM p JOIN c IN p.children", - "SELECT %s VALUE c.name FROM p JOIN c IN p.children", - "SELECT %s p.name AS ParentName, c.name ChildName FROM p JOIN c IN p.children", + .put("SELECT %s VALUE c.age FROM p JOIN c IN p.children", true) + .put("SELECT %s p.age AS ParentAge, c.age ChildAge FROM p JOIN c IN p.children", false) + .put("SELECT %s VALUE c.name FROM p JOIN c IN p.children", false) + .put("SELECT %s p.name AS ParentName, c.name ChildName FROM p JOIN c IN p.children", false) // distinct queries in subqueries - "SELECT %s r.age, s FROM r JOIN (SELECT DISTINCT VALUE c FROM (SELECT 1 a) c) s WHERE r.age > 25", - "SELECT %s p.name, p.age FROM (SELECT DISTINCT * FROM r) p WHERE p.age > 25", + .put("SELECT %s r.age, s FROM r JOIN (SELECT DISTINCT VALUE c FROM (SELECT 1 a) c) s WHERE r.age > 25", false) + .put("SELECT %s p.name, p.age FROM (SELECT DISTINCT * FROM r) p WHERE p.age > 25", false) // distinct queries in scalar subqeries - "SELECT %s p.name, (SELECT DISTINCT VALUE p.age) AS Age FROM p", - "SELECT %s p.name, p.age FROM p WHERE (SELECT DISTINCT VALUE LEFT(p.name, 1)) > 'A' AND (SELECT " + - "DISTINCT VALUE p.age) > 21", - "SELECT %s p.name, (SELECT DISTINCT VALUE p.age) AS Age FROM p WHERE (SELECT DISTINCT VALUE p.name) >" + - " 'A' OR (SELECT DISTINCT VALUE p.age) > 21", + .put("SELECT %s p.name, (SELECT DISTINCT VALUE p.age) AS Age FROM p", true) + .put("SELECT %s p.name, p.age FROM p WHERE (SELECT DISTINCT VALUE LEFT(p.name, 1)) > 'A' AND (SELECT " + + "DISTINCT VALUE p.age) > 21", false) + .put("SELECT %s p.name, (SELECT DISTINCT VALUE p.age) AS Age FROM p WHERE (SELECT DISTINCT VALUE p.name) >" + + " 'A' OR (SELECT DISTINCT VALUE p.age) > 21", false) // select * - "SELECT %s * FROM c" - ); - - for (String query : queries) { - logger.info("Current distinct query: " + query); + .put("SELECT %s * FROM c", true) + .build(); + for (Map.Entry entry : queries.entrySet()) { + logger.info("Current distinct query: " + entry.getKey()); CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); options.setMaxDegreeOfParallelism(2); List documentsFromWithDistinct = new ArrayList<>(); List documentsFromWithoutDistinct = new ArrayList<>(); - final String queryWithDistinct = String.format(query, "DISTINCT"); - final String queryWithoutDistinct = String.format(query, ""); + final String queryWithDistinct = String.format(entry.getKey(), "DISTINCT"); + final String queryWithoutDistinct = String.format(entry.getKey(), ""); CosmosPagedFlux queryObservable = createdCollection.queryItems(queryWithoutDistinct, options, @@ -239,6 +238,27 @@ public void queryDistinctDocuments() { FeedResponse next = iterator.next(); documentsFromWithDistinct.addAll(next.getResults()); } + + // We want to do Dcount for some queries + if (entry.getValue()) { + // Do a dcount query and validate results + String queryWithDcount = "Select value count(1) from (" + + String.format(entry.getKey(), "DISTINCT") + + ")"; + List docsWithDCount = new ArrayList<>(); + + CosmosPagedFlux dcountQueryObs = createdCollection.queryItems(queryWithDcount, + options, + Integer.class); + + for (FeedResponse next : dcountQueryObs.byPage().toIterable()) { + docsWithDCount.addAll(next.getResults()); + } + assertThat(docsWithDCount.size()).isEqualTo(1); + int dCount = docsWithDCount.get(0); + assertThat(dCount).isEqualTo(documentsFromWithDistinct.size()); + } + assertThat(documentsFromWithDistinct.size()).isGreaterThanOrEqualTo(1); assertThat(documentsFromWithDistinct.size()).isEqualTo(documentsFromWithoutDistinct.size()); }