Skip to content

Commit

Permalink
Implement Cosmos pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
roji committed Jun 27, 2024
1 parent a025536 commit 53b39fc
Show file tree
Hide file tree
Showing 13 changed files with 1,054 additions and 272 deletions.
4 changes: 2 additions & 2 deletions src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public CosmosQueryEventData(
EventDefinitionBase eventDefinition,
Func<EventDefinitionBase, EventData, string> messageGenerator,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
IReadOnlyList<(string Name, object? Value)> parameters,
string querySql,
bool logSensitiveData)
Expand All @@ -46,7 +46,7 @@ public CosmosQueryEventData(
/// <summary>
/// The key of the Cosmos partition that the query is using.
/// </summary>
public virtual PartitionKey PartitionKeyValue { get; }
public virtual PartitionKey? PartitionKeyValue { get; }

/// <summary>
/// Name/values for each parameter in the Cosmos Query.
Expand Down
4 changes: 2 additions & 2 deletions src/EFCore.Cosmos/Diagnostics/CosmosQueryExecutedEventData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public CosmosQueryExecutedEventData(
double requestCharge,
string activityId,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
IReadOnlyList<(string Name, object? Value)> parameters,
string querySql,
bool logSensitiveData)
Expand Down Expand Up @@ -70,7 +70,7 @@ public CosmosQueryExecutedEventData(
/// <summary>
/// The key of the Cosmos partition that the query is using.
/// </summary>
public virtual PartitionKey PartitionKeyValue { get; }
public virtual PartitionKey? PartitionKeyValue { get; }

/// <summary>
/// Name/values for each parameter in the Cosmos Query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void SyncNotSupported(
public static void ExecutingSqlQuery(
this IDiagnosticsLogger<DbLoggerCategory.Database.Command> diagnostics,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
CosmosSqlQuery cosmosSqlQuery)
{
var definition = CosmosResources.LogExecutingSqlQuery(diagnostics);
Expand All @@ -66,7 +66,7 @@ public static void ExecutingSqlQuery(
definition.Log(
diagnostics,
containerId,
logSensitiveData ? partitionKeyValue.ToString() : "?",
logSensitiveData ? partitionKeyValue?.ToString() : "?",
FormatParameters(cosmosSqlQuery.Parameters, logSensitiveData && cosmosSqlQuery.Parameters.Count > 0),
Environment.NewLine,
cosmosSqlQuery.Query);
Expand Down Expand Up @@ -158,7 +158,7 @@ public static void ExecutedReadNext(
double requestCharge,
string activityId,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
CosmosSqlQuery cosmosSqlQuery)
{
var definition = CosmosResources.LogExecutedReadNext(diagnostics);
Expand All @@ -177,7 +177,7 @@ public static void ExecutedReadNext(
requestCharge,
activityId,
containerId,
logSensitiveData ? partitionKeyValue.ToString() : "?",
logSensitiveData ? partitionKeyValue?.ToString() : "?",
FormatParameters(cosmosSqlQuery.Parameters, logSensitiveData && cosmosSqlQuery.Parameters.Count > 0),
Environment.NewLine,
cosmosSqlQuery.Query));
Expand Down
61 changes: 61 additions & 0 deletions src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Azure;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal;
using Microsoft.EntityFrameworkCore.Query.Internal;
Expand All @@ -17,6 +18,8 @@ namespace Microsoft.EntityFrameworkCore;
/// </remarks>
public static class CosmosQueryableExtensions
{
#region WithPartitionKey

internal static readonly MethodInfo WithPartitionKeyMethodInfo
= typeof(CosmosQueryableExtensions).GetTypeInfo()
.GetDeclaredMethods(nameof(WithPartitionKey))
Expand Down Expand Up @@ -74,6 +77,10 @@ source.Provider is EntityQueryProvider
: source;
}

#endregion WithPartitionKey

#region FromSql

/// <summary>
/// Creates a LINQ query based on an interpolated string representing a SQL query.
/// </summary>
Expand Down Expand Up @@ -177,4 +184,58 @@ private static FromSqlQueryRootExpression GenerateFromSqlQueryRoot(
sql,
Expression.Constant(arguments));
}

#endregion FromSql

#region ToPageAsync

internal static readonly MethodInfo ToPageAsyncMethodInfo
= typeof(CosmosQueryableExtensions).GetTypeInfo()
.GetDeclaredMethods(nameof(ToPageAsync))
.Single();

/// <summary>
/// Allows paginating through query results by repeatedly executing the same query, passing continuation tokens to retrieve
/// successive pages of the result set, and specifying the maximum number of results per page.
/// </summary>
/// <param name="source">The source query.</param>
/// <param name="continuationToken">
/// An optional continuation token returned from a previous execution of this query via <see cref="Page{T}.ContinuationToken" />.
/// If <see langword="null" />, retrieves query results from the start.
/// </param>
/// <param name="maxItemCount">
/// The maximum number of results in the returned <see cref="Page{T}" />. The page may contain fewer results of the database
/// did not contain enough matching results.
/// </param>
/// <param name="responseContinuationTokenLimitInKb">Limits the length of continuation token in the query response.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> to observe while waiting for the task to complete.</param>
/// <returns>A <see cref="Page{T}" /> containing at most <paramref name="maxItemCount" /> results.</returns>
public static Task<Page<TSource>> ToPageAsync<TSource>(
this IQueryable<TSource> source,
string? continuationToken = null,
int? maxItemCount = null,
int? responseContinuationTokenLimitInKb = null,
CancellationToken cancellationToken = default)
{
if (source.Provider is not IAsyncQueryProvider provider)
{
throw new InvalidOperationException(CoreStrings.IQueryableProviderNotAsync);
}

return provider.ExecuteAsync<Task<Page<TSource>>>(
Expression.Call(
instance: null,
method: ToPageAsyncMethodInfo.MakeGenericMethod(typeof(TSource)),
arguments:
[
source.Expression,
Expression.Constant(continuationToken, typeof(string)),
Expression.Constant(maxItemCount, typeof(int?)),
Expression.Constant(responseContinuationTokenLimitInKb, typeof(int?)),
Expression.Constant(default(CancellationToken), typeof(CancellationToken))
]),
cancellationToken);
}

#endregion ToPageAsync
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.EntityFrameworkCore.Cosmos.Internal;
using Microsoft.EntityFrameworkCore.Cosmos.Metadata.Internal;
using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal.Expressions;
using Microsoft.EntityFrameworkCore.Cosmos.Storage.Internal;
using Microsoft.EntityFrameworkCore.Internal;

Expand Down Expand Up @@ -86,6 +87,63 @@ protected CosmosQueryableMethodTranslatingExpressionVisitor(
_subquery = true;
}

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public override Expression Translate(Expression expression)
{
// Handle ToPageAsync(), which can only ever be the top-level node in the query tree.
if (expression is MethodCallExpression { Method: var method, Arguments: var arguments }
&& method.DeclaringType == typeof(CosmosQueryableExtensions)
&& method.Name == nameof(CosmosQueryableExtensions.ToPageAsync))
{
var source = base.Translate(arguments[0]);

if (source == QueryCompilationContext.NotTranslatedExpression)
{
return source;
}

if (source is not ShapedQueryExpression shapedQuery)
{
throw new UnreachableException($"Expected a ShapedQueryExpression but found {source.GetType().Name}");
}

// The arguments to ToPageAsync must have been parameterized by the funcletizer, since they're non-lambda arguments to
// a top-level function (like Skip/Take). Translate to get these as SqlParameterExpressions.
if (arguments is not
[
_, // source
ParameterExpression continuationToken,
ParameterExpression maxItemCount,
ParameterExpression responseContinuationTokenLimitInKb,
_ // cancellationToken
]
|| _sqlTranslator.Translate(continuationToken) is not SqlParameterExpression translatedContinuationToken
|| _sqlTranslator.Translate(maxItemCount) is not SqlParameterExpression translatedMaxItemCount
|| _sqlTranslator.Translate(responseContinuationTokenLimitInKb) is not SqlParameterExpression
translatedResponseContinuationTokenLimitInKb)
{
throw new UnreachableException("ToPageAsync without the appropriate parameterized arguments");
}

// Wrap the shaper for the entire query in a PagingExpression which also contains the paging arguments, and update
// the final cardinality to Single (since we'll be returning a single Page).
return shapedQuery
.UpdateShaperExpression(new PagingExpression(
shapedQuery.ShaperExpression,
translatedContinuationToken,
translatedMaxItemCount,
translatedResponseContinuationTokenLimitInKb))
.UpdateResultCardinality(ResultCardinality.Single);
}

return base.Translate(expression);
}

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
Expand Down
Loading

0 comments on commit 53b39fc

Please sign in to comment.