Skip to content

Commit

Permalink
Add support for querying with LINQ and expressions
Browse files Browse the repository at this point in the history
This commit introduces support for LINQ querying a table repository. Unlike the document-based one, the table repository persists entities in individual columns that can therefore be queried. The document-based repository is now a separte common base interface for both queryable and non-queryable repos.

The main usage is very similar to the built-in API, where you invoke `CreateQuery` and then perform LINQ queries over it. There are obvious restrictions on what works and doesn't, and it's documented at https://docs.microsoft.com/en-us/rest/api/storageservices/query-operators-supported-for-the-table-service.

The built-in implementation for querying is deeply intertwined with the ITableEntity interface, to the point that it's effectively unsable for our purposes. No amount of trial/hacking could get over the fact that TableQuery<T> requires the T to implement ITableEntity. Short of generating dynamic proxies (a seriously complex approach), we simply cannot use that.

Luckily, it's all OData in the end, so if you build the right URIs (including headers and signatures), you don't actually need it. That includes properly parsing the continuation token headers.

So we take a dependency on OData for the URI building capabilities, but otherwise execute the queries ourselves using plain HttpClient. The same deserialization support configured for the other operations on the repo is used, so the behavior is indistinguishable from EnumerateAsync.

An extension method `EnumerateAsync(predicate)` is also provided now, which might provide a simpler API surface for simple Where-style filtering.

Finally, the "native" way of consuming these queries is `IAsyncEnumerable<T>`, but the LINQ querying works over `IQueryable<T>` so there is a need to bridge the two.

There is some discussion in the dotnet org about this, since EF Core provides similar capabilities, but nothing exists in the platform yet. So we provide a simple `GetAsyncEnumerator` extension method for `IQueryable<T>` to bridge that gap, for use with built-in `await foreach` over the query.

For tests, we just `cat` the async enumerable extensions for convenience, from https://github.com/devlooped/catbag/blob/main/System/Collections/Generic/IAsyncEnumerableExtensions.cs.

Fixes #33.
  • Loading branch information
kzu committed Jun 15, 2021
1 parent 62a1d19 commit 07e6b35
Show file tree
Hide file tree
Showing 30 changed files with 871 additions and 92 deletions.
1 change: 1 addition & 0 deletions src/TableStorage.Source/TableStorage.Source.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ It can be persisted and retrieved with:
<PackageReference Include="NuGetizer" Version="0.7.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="all" />
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.8" />
<PackageReference Include="Microsoft.OData.Client" Version="7.9.0" />
<PackageReference Include="System.Text.Json" Version="5.0.0" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/TableStorage/AttributedDocumentRepository`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Devlooped
{
/// <summary>
/// A <see cref="ITableRepository{T}"/> implementation which relies on the entity type <typeparamref name="T"/>
/// A <see cref="IDocumentRepository{T}"/> implementation which relies on the entity type <typeparamref name="T"/>
/// being annotated with <see cref="PartitionKeyAttribute"/> and <see cref="RowKeyAttribute"/>, and
/// optionally <see cref="TableAttribute"/> (defaults to type name).
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/TableStorage/AttributedTableRepository`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Devlooped
{
/// <summary>
/// A <see cref="ITableRepository{T}"/> implementation which relies on the entity type <typeparamref name="T"/>
/// A <see cref="IDocumentRepository{T}"/> implementation which relies on the entity type <typeparamref name="T"/>
/// being annotated with <see cref="PartitionKeyAttribute"/> and <see cref="RowKeyAttribute"/>, and
/// optionally <see cref="TableAttribute"/> (defaults to type name).
/// </summary>
Expand Down
20 changes: 10 additions & 10 deletions src/TableStorage/DocumentPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Devlooped
{
/// <summary>
/// Factory methods to create <see cref="ITableRepository{T}"/> instances
/// Factory methods to create <see cref="IDocumentRepository{T}"/> instances
/// that store entities as a serialized document.
/// </summary>
static partial class DocumentPartition
Expand All @@ -22,39 +22,39 @@ static partial class DocumentPartition
public const string DefaultTableName = "Documents";

/// <summary>
/// Creates an <see cref="ITablePartition{T}"/> for the given entity type
/// Creates an <see cref="IDocumentPartition{T}"/> for the given entity type
/// <typeparamref name="T"/>, using <see cref="DefaultTableName"/> as the table name and the
/// <typeparamref name="T"/> <c>Name</c> as the partition key.
/// </summary>
/// <typeparam name="T">The type of entity that the repository will manage.</typeparam>
/// <param name="storageAccount">The storage account to use.</param>
/// <param name="rowKey">Function to retrieve the row key for a given entity.</param>
/// <returns>The new <see cref="ITablePartition{T}"/>.</returns>
public static ITablePartition<T> Create<T>(
/// <returns>The new <see cref="IDocumentPartition{T}"/>.</returns>
public static IDocumentPartition<T> Create<T>(
CloudStorageAccount storageAccount,
Func<T, string> rowKey,
IBinaryDocumentSerializer? serializer = default) where T : class
=> Create<T>(storageAccount, DefaultTableName, typeof(T).Name, rowKey);

/// <summary>
/// Creates an <see cref="ITablePartition{T}"/> for the given entity type
/// Creates an <see cref="IDocumentPartition{T}"/> for the given entity type
/// <typeparamref name="T"/>, using the given table name and the
/// <typeparamref name="T"/> <c>Name</c> as the partition key.
/// </summary>
/// <typeparam name="T">The type of entity that the repository will manage.</typeparam>
/// <param name="storageAccount">The storage account to use.</param>
/// <param name="tableName">Table name to use.</param>
/// <param name="rowKey">Function to retrieve the row key for a given entity.</param>
/// <returns>The new <see cref="ITablePartition{T}"/>.</returns>
public static ITablePartition<T> Create<T>(
/// <returns>The new <see cref="IDocumentPartition{T}"/>.</returns>
public static IDocumentPartition<T> Create<T>(
CloudStorageAccount storageAccount,
string tableName,
Func<T, string> rowKey,
IDocumentSerializer? serializer = default) where T : class
=> Create<T>(storageAccount, tableName, default, rowKey);

/// <summary>
/// Creates an <see cref="ITablePartition{T}"/> for the given entity type
/// Creates an <see cref="IDocumentPartition{T}"/> for the given entity type
/// <typeparamref name="T"/>.
/// </summary>
/// <typeparam name="T">The type of entity that the repository will manage.</typeparam>
Expand All @@ -65,8 +65,8 @@ public static ITablePartition<T> Create<T>(
/// If not provided, the <typeparamref name="T"/> <c>Name</c> will be used.</param>
/// <param name="rowKey">Optional function to retrieve the row key for a given entity.
/// If not provided, the class will need a property annotated with <see cref="RowKeyAttribute"/>.</param>
/// <returns>The new <see cref="ITablePartition{T}"/>.</returns>
public static ITablePartition<T> Create<T>(
/// <returns>The new <see cref="IDocumentPartition{T}"/>.</returns>
public static IDocumentPartition<T> Create<T>(
CloudStorageAccount storageAccount,
string? tableName = default,
string? partitionKey = null,
Expand Down
4 changes: 2 additions & 2 deletions src/TableStorage/DocumentPartition`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
namespace Devlooped
{
/// <inheritdoc />
partial class DocumentPartition<T> : ITablePartition<T> where T : class
partial class DocumentPartition<T> : IDocumentPartition<T> where T : class
{
readonly ITableRepository<T> repository;
readonly IDocumentRepository<T> repository;

/// <summary>
/// Initializes the repository with the given storage account and optional table name.
Expand Down
8 changes: 4 additions & 4 deletions src/TableStorage/DocumentRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
namespace Devlooped
{
/// <summary>
/// Factory methods to create <see cref="ITableRepository{T}"/> instances
/// Factory methods to create <see cref="IDocumentRepository{T}"/> instances
/// that store entities as a serialized document.
/// </summary>
static partial class DocumentRepository
{
/// <summary>
/// Creates an <see cref="ITableRepository{T}"/> for the given entity type
/// Creates an <see cref="IDocumentRepository{T}"/> for the given entity type
/// <typeparamref name="T"/>.
/// </summary>
/// <typeparam name="T">The type of entity that the repository will manage.</typeparam>
Expand All @@ -23,8 +23,8 @@ static partial class DocumentRepository
/// If not provided, the class will need a property annotated with <see cref="PartitionKeyAttribute"/>.</param>
/// <param name="rowKey">Optional function to retrieve the row key for a given entity.
/// If not provided, the class will need a property annotated with <see cref="RowKeyAttribute"/>.</param>
/// <returns>The new <see cref="ITableRepository{T}"/>.</returns>
public static ITableRepository<T> Create<T>(
/// <returns>The new <see cref="IDocumentRepository{T}"/>.</returns>
public static IDocumentRepository<T> Create<T>(
CloudStorageAccount storageAccount,
string? tableName = default,
Func<T, string>? partitionKey = default,
Expand Down
20 changes: 11 additions & 9 deletions src/TableStorage/DocumentRepository`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace Devlooped
{
/// <inheritdoc />
partial class DocumentRepository<T> : ITableRepository<T> where T : class
partial class DocumentRepository<T> : IDocumentRepository<T> where T : class
{
static readonly string documentVersion = (typeof(T).Assembly.GetName().Version ?? new Version(1, 0)).ToString(2);

Expand All @@ -23,7 +23,7 @@ partial class DocumentRepository<T> : ITableRepository<T> where T : class
readonly Func<T, string> rowKey;
readonly Task<CloudTable> table;

readonly Func<string, CancellationToken, IAsyncEnumerable<T>> enumerate;
readonly Func<string?, CancellationToken, IAsyncEnumerable<T>> enumerate;
readonly Func<string, string, CancellationToken, Task<T?>> get;
readonly Func<T, CancellationToken, Task<T>> put;

Expand Down Expand Up @@ -91,7 +91,7 @@ await table.ExecuteAsync(TableOperation.Delete(
}

/// <inheritdoc />
public IAsyncEnumerable<T> EnumerateAsync(string partitionKey, CancellationToken cancellation = default)
public IAsyncEnumerable<T> EnumerateAsync(string? partitionKey = default, CancellationToken cancellation = default)
=> enumerate(partitionKey, cancellation);

/// <inheritdoc />
Expand All @@ -104,11 +104,12 @@ public Task<T> PutAsync(T entity, CancellationToken cancellation = default)

#region Binary

async IAsyncEnumerable<T> EnumerateBinaryAsync(string partitionKey, [EnumeratorCancellation] CancellationToken cancellation = default)
async IAsyncEnumerable<T> EnumerateBinaryAsync(string? partitionKey = default, [EnumeratorCancellation] CancellationToken cancellation = default)
{
var table = await this.table.ConfigureAwait(false);
var query = new TableQuery<BinaryDocumentEntity>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, partitionKey));
var query = new TableQuery<BinaryDocumentEntity>();
if (partitionKey != null)
query = query.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, partitionKey));

TableContinuationToken? continuation = null;
do
Expand Down Expand Up @@ -173,11 +174,12 @@ async Task<T> PutBinaryAsync(T entity, CancellationToken cancellation = default)

#region String

async IAsyncEnumerable<T> EnumerateStringAsync(string partitionKey, [EnumeratorCancellation] CancellationToken cancellation = default)
async IAsyncEnumerable<T> EnumerateStringAsync(string? partitionKey = default, [EnumeratorCancellation] CancellationToken cancellation = default)
{
var table = await this.table.ConfigureAwait(false);
var query = new TableQuery<DocumentEntity>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, partitionKey));
var query = new TableQuery<DocumentEntity>();
if (partitionKey != null)
query = query.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, partitionKey));

TableContinuationToken? continuation = null;
do
Expand Down
2 changes: 1 addition & 1 deletion src/TableStorage/DocumentSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ partial class DocumentSerializer : IStringDocumentSerializer
/// <summary>
/// Default instance of the serializer.
/// </summary>
public static IDocumentSerializer Default { get; } = new DocumentSerializer();
public static IStringDocumentSerializer Default { get; } = new DocumentSerializer();

readonly JsonSerializerOptions options;

Expand Down
55 changes: 55 additions & 0 deletions src/TableStorage/Http.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//<auto-generated/>
#nullable enable
using System;
using System.Globalization;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Text;
using Microsoft.Azure.Cosmos.Table;

namespace Devlooped
{
static class Http
{
public static HttpClient Client { get; }

static Http()
{
Client = new HttpClient();
Client.DefaultRequestHeaders.Add("Accept", "application/json; odata=nometadata");
Client.DefaultRequestHeaders.Add("Accept-Charset", "UTF-8");
Client.DefaultRequestHeaders.Add("User-Agent", $"Azure-Cosmos-Table/1.0.8 ({RuntimeInformation.FrameworkDescription}; {Environment.OSVersion.Platform} {Environment.OSVersion.Version})");
Client.DefaultRequestHeaders.Add("DataServiceVersion", "3.0;NetFx");
Client.DefaultRequestHeaders.Add("MaxDataServiceVersion", "3.0;NetFx");
Client.DefaultRequestHeaders.Add("x-ms-version", "2017-07-29");
}

public static HttpRequestMessage AddAuthorizationHeader(this HttpRequestMessage request, CloudStorageAccount account)
{
if (!request.Headers.TryGetValues("x-ms-date", out var values) ||
values.FirstOrDefault() is not string date ||
string.IsNullOrEmpty(date))
{
date = DateTime.UtcNow.ToString("R", CultureInfo.InvariantCulture);
request.Headers.Add("x-ms-date", date);
}

var resource = request.RequestUri.GetComponents(UriComponents.Path, UriFormat.Unescaped);
var toSign = string.Format("{0}\n/{1}/{2}",
request.Headers.GetValues("x-ms-date").First(),
account.Credentials.AccountName,
resource.TrimStart('/'));

var hasher = new HMACSHA256(Convert.FromBase64String(account.Credentials.Key));
var signature = hasher.ComputeHash(Encoding.UTF8.GetBytes(toSign));
var authentication = new AuthenticationHeaderValue("SharedKeyLite", account.Credentials.AccountName + ":" + Convert.ToBase64String(signature));

request.Headers.Authorization = authentication;

return request;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
namespace Devlooped
{
/// <summary>
/// A specific partition within a <see cref="ITableRepository{T}"/>.
/// A specific partition within a <see cref="IDocumentRepository{T}"/>.
/// </summary>
/// <typeparam name="T">The type of entity being persisted.</typeparam>
partial interface ITablePartition<T> where T : class
partial interface IDocumentPartition<T> where T : class
{
/// <summary>
/// Gets the table name being used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
namespace Devlooped
{
/// <summary>
/// A generic repository that stores entities in table storage, using individual
/// columns for entity properties.
/// A generic repository that stores entities in table storage.
/// </summary>
/// <typeparam name="T">The type of entity being persisted.</typeparam>
partial interface ITableRepository<T> where T : class
partial interface IDocumentRepository<T> where T : class
{
/// <summary>
/// Gets the table name being used.
Expand All @@ -34,12 +33,12 @@ partial interface ITableRepository<T> where T : class
Task DeleteAsync(string partitionKey, string rowKey, CancellationToken cancellation = default);

/// <summary>
/// Enumerates asynchronously all entities with the given <paramref name="partitionKey"/>.
/// Enumerates asynchronously all entities, optionally within the given <paramref name="partitionKey"/>.
/// </summary>
/// <param name="partitionKey">The partition key to scan and enumerate all entities from.</param>
/// <param name="partitionKey">The optional partition key to scope the enumeration to.</param>
/// <param name="cancellation">Optional <see cref="CancellationToken"/>.</param>
/// <returns></returns>
IAsyncEnumerable<T> EnumerateAsync(string partitionKey, CancellationToken cancellation = default);
IAsyncEnumerable<T> EnumerateAsync(string? partitionKey = default, CancellationToken cancellation = default);

/// <summary>
/// Retrieves an entity from the repository.
Expand Down
35 changes: 35 additions & 0 deletions src/TableStorage/IQueryableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//<auto-generated/>
#nullable enable
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Devlooped
{
/// <summary>
/// Extension method <see cref="GetAsyncEnumerator{TSource}(IQueryable{TSource}, CancellationToken)"/> to
/// allow native <c>await foreach</c> support for <see cref="IQueryable{T}"/> queries created from
/// the <see cref="ITableRepository{T}.CreateQuery"/> or <see cref="ITablePartition{T}.CreateQuery"/>, which
/// implement <see cref="IAsyncEnumerable{T}"/>.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
static partial class IQueryableExtensions
{
/// <summary>
/// Gets the <see cref="IAsyncEnumerator{T}"/> for an <see cref="IQueryable{T}"/> that
/// implements <see cref="IAsyncEnumerable{T}"/>, for use with <c>await foreach</c>.
/// </summary>
/// <exception cref="ArgumentException">The <paramref name="source"/> does not implement
/// <see cref="IAsyncEnumerable{T}"/>.</exception>
public static IAsyncEnumerator<TSource> GetAsyncEnumerator<TSource>(this IQueryable<TSource> source, CancellationToken cancellation = default)
{
if (source is not IAsyncEnumerable<TSource> enumerable)
throw new ArgumentException("The source it not an async enumerable.", nameof(source));

return enumerable.GetAsyncEnumerator(cancellation);
}
}
}
39 changes: 39 additions & 0 deletions src/TableStorage/ITablePartition`1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//<auto-generated/>
#nullable enable
using System.Linq;

namespace Devlooped
{
/// <summary>
/// A specific partition within an <see cref="ITableRepository{T}"/>, which allows querying
/// by the entity properties, since they are stored in individual columns.
/// </summary>
/// <typeparam name="T">The type of entity being persisted.</typeparam>
partial interface ITablePartition<T> : IDocumentPartition<T> where T : class
{
/// <summary>
/// Creates a query for use with LINQ expressions. See
/// <see ref="https://docs.microsoft.com/en-us/rest/api/storageservices/query-operators-supported-for-the-table-service">supported operators</see>.
/// </summary>
/// <remarks>
/// The query is scoped to the current partition already.
/// </remarks>
/// <example>
/// var books = TablePartition.Create&lt;Book&gt;("Bestsellers");
/// await foreach (var book in books.CreateQuery().Where(x => x.IsPublished))
/// {
/// Console.WriteLine(book.ISBN);
/// }
/// </example>
/// <example>
/// var books = TablePartition.Create&lt;Book&gt;("Bestsellers");
/// await foreach (var published in from book in books.CreateQuery()
/// where book.IsPublished && book.Pages > 1000
/// select book)
/// {
/// Console.WriteLine(published.ISBN);
/// }
/// </example>
IQueryable<T> CreateQuery();
}
}
Loading

0 comments on commit 07e6b35

Please sign in to comment.