Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow creating partitions from the same table connection #156

Merged
merged 1 commit into from
Jan 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions src/TableStorage/DocumentPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static partial class DocumentPartition
public static IDocumentPartition<T> Create<T>(
CloudStorageAccount storageAccount,
Func<T, string> rowKey,
IBinaryDocumentSerializer? serializer = default) where T : class
IDocumentSerializer? serializer = default) where T : class
=> Create<T>(storageAccount, DefaultTableName, typeof(T).Name, rowKey);

/// <summary>
Expand All @@ -54,6 +54,21 @@ public static IDocumentPartition<T> Create<T>(
IDocumentSerializer? serializer = default) where T : class
=> Create<T>(storageAccount, tableName, default, rowKey);

/// <summary>
/// Creates an <see cref="ITablePartition{T}"/> for the given entity type
/// <typeparamref name="T"/>, using <typeparamref name="T"/> <c>Name</c> as the partition key.
/// </summary>
/// <typeparam name="T">The type of entity that the repository will manage.</typeparam>
/// <param name="tableConnection">The table to connect to.</param>
/// <param name="rowKey">Function to retrieve the row key for a given entity.</param>
/// <param name="serializer">Optional serializer to use instead of the default <see cref="DocumentSerializer.Default"/>.</param>
/// <returns>The new <see cref="ITablePartition{T}"/>.</returns>
public static IDocumentPartition<T> Create<T>(
TableConnection tableConnection,
Func<T, string> rowKey,
IDocumentSerializer? serializer = default) where T : class
=> Create<T>(tableConnection, default, rowKey, serializer);

/// <summary>
/// Creates an <see cref="ITablePartition{T}"/> for the given entity type
/// <typeparamref name="T"/>.
Expand All @@ -74,13 +89,31 @@ public static IDocumentPartition<T> Create<T>(
string? partitionKey = null,
Func<T, string>? rowKey = null,
IDocumentSerializer? serializer = default) where T : class
=> Create<T>(new TableConnection(storageAccount, tableName ?? GetDefaultTableName<T>()), partitionKey, rowKey, serializer);

/// <summary>
/// Creates an <see cref="ITablePartition{T}"/> for the given entity type
/// <typeparamref name="T"/>.
/// </summary>
/// <typeparam name="T">The type of entity that the repository will manage.</typeparam>
/// <param name="tableConnection">The table to connect to.</param>
/// <param name="partitionKey">Optional fixed partition key to scope entity persistence.
/// If not provided, the <typeparamref name="T"/> <c>Name</c> will be used.</param>
/// <param name="rowKey">Optional function to retrieve the row key for a given entity.
/// If not provided, the class will need a property annotated with <see cref="RowKeyAttribute"/>.</param>
/// <param name="serializer">Optional serializer to use instead of the default <see cref="DocumentSerializer.Default"/>.</param>
/// <returns>The new <see cref="ITablePartition{T}"/>.</returns>
public static IDocumentPartition<T> Create<T>(
TableConnection tableConnection,
string? partitionKey = null,
Func<T, string>? rowKey = null,
IDocumentSerializer? serializer = default) where T : class
{
tableName ??= GetDefaultTableName<T>();
partitionKey ??= TablePartition.GetDefaultPartitionKey<T>();
rowKey ??= RowKeyAttribute.CreateCompiledAccessor<T>();
serializer ??= DocumentSerializer.Default;

return new DocumentPartition<T>(storageAccount, tableName, partitionKey, rowKey, serializer);
return new DocumentPartition<T>(tableConnection, partitionKey, rowKey, serializer);
}

/// <summary>
Expand Down
19 changes: 15 additions & 4 deletions src/TableStorage/DocumentPartition`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,29 @@ partial class DocumentPartition<T> : IDocumentPartition<T> where T : class
/// <param name="rowKey">A function to determine the row key for an entity of type <typeparamref name="T"/> within the partition.</param>
/// <param name="serializer">The serializer to use.</param>
protected internal DocumentPartition(CloudStorageAccount storageAccount, string tableName, string partitionKey, Func<T, string> rowKey, IDocumentSerializer serializer)
: this(new TableConnection(storageAccount, tableName ?? DocumentPartition.GetDefaultTableName<T>()), partitionKey, rowKey, serializer)
{
}

/// <summary>
/// Initializes the repository with the given storage account and optional table name.
/// </summary>
/// <param name="tableConnection">The table to connect to.</param>
/// <param name="partitionKey">The fixed partition key that backs this table partition.</param>
/// <param name="rowKey">A function to determine the row key for an entity of type <typeparamref name="T"/> within the partition.</param>
/// <param name="serializer">The serializer to use.</param>
protected internal DocumentPartition(TableConnection tableConnection, string partitionKey, Func<T, string> rowKey, IDocumentSerializer serializer)
{
TableName = tableName ?? DocumentPartition.GetDefaultTableName<T>();
PartitionKey = partitionKey ?? TablePartition.GetDefaultPartitionKey<T>();
repository = new DocumentRepository<T>(storageAccount,
TableName,
repository = new DocumentRepository<T>(
tableConnection,
_ => PartitionKey,
rowKey ?? RowKeyAttribute.CreateCompiledAccessor<T>(),
serializer);
}

/// <inheritdoc />
public string TableName { get; }
public string TableName => repository.TableName;

/// <inheritdoc />
public string PartitionKey { get; }
Expand Down
50 changes: 23 additions & 27 deletions src/TableStorage/DocumentRepository`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Web;
using Azure;
using Azure.Data.Tables;
using Microsoft.OData.Client;
using static Devlooped.DocumentRepository;

namespace Devlooped
{
Expand All @@ -22,14 +19,13 @@ partial class DocumentRepository<T> : IDocumentRepository<T> where T : class
static readonly int documentMajorVersion;
static readonly int documentMinorVersion;

readonly CloudStorageAccount storageAccount;
readonly TableConnection tableConnection;

readonly IStringDocumentSerializer? stringSerializer;
readonly IBinaryDocumentSerializer? binarySerializer;

readonly Func<T, string> partitionKey;
readonly Func<T, string> rowKey;
readonly Task<TableClient> table;

readonly Func<Func<IQueryable<IDocumentEntity>, IQueryable<IDocumentEntity>>, CancellationToken, IAsyncEnumerable<T>> enumerate;
readonly Func<string, string, CancellationToken, Task<T?>> get;
Expand All @@ -52,10 +48,20 @@ static DocumentRepository()
/// <param name="rowKey">A function to determine the row key for an entity of type <typeparamref name="T"/>.</param>
/// <param name="serializer">Optional serializer to use instead of the default <see cref="DocumentSerializer.Default"/>.</param>
protected internal DocumentRepository(CloudStorageAccount storageAccount, string tableName, Func<T, string> partitionKey, Func<T, string> rowKey, IDocumentSerializer serializer)
: this(new TableConnection(storageAccount, tableName ?? TableRepository.GetDefaultTableName<T>()), partitionKey, rowKey, serializer)
{
this.storageAccount = storageAccount;
TableName = tableName ?? TableRepository.GetDefaultTableName<T>();

}

/// <summary>
/// Initializes the table repository.
/// </summary>
/// <param name="tableConnection">The table to connect to.</param>
/// <param name="partitionKey">A function to determine the partition key for an entity of type <typeparamref name="T"/>.</param>
/// <param name="rowKey">A function to determine the row key for an entity of type <typeparamref name="T"/>.</param>
/// <param name="serializer">Optional serializer to use instead of the default <see cref="DocumentSerializer.Default"/>.</param>
protected internal DocumentRepository(TableConnection tableConnection, Func<T, string> partitionKey, Func<T, string> rowKey, IDocumentSerializer serializer)
{
this.tableConnection = tableConnection;
this.partitionKey = partitionKey ?? PartitionKeyAttribute.CreateCompiledAccessor<T>();
this.rowKey = rowKey ?? RowKeyAttribute.CreateCompiledAccessor<T>();

Expand All @@ -78,17 +84,15 @@ protected internal DocumentRepository(CloudStorageAccount storageAccount, string
get = GetBinaryAsync;
put = PutBinaryAsync;
}

table = GetTableAsync(TableName);
}

/// <inheritdoc />
public string TableName { get; }
public string TableName => tableConnection.TableName;

/// <inheritdoc />
public async Task<bool> DeleteAsync(string partitionKey, string rowKey, CancellationToken cancellation = default)
{
var table = await this.table.ConfigureAwait(false);
var table = await this.tableConnection.GetTableAsync().ConfigureAwait(false);

var result = await table.DeleteEntityAsync(partitionKey, rowKey, cancellationToken: cancellation).ConfigureAwait(false);

Expand Down Expand Up @@ -120,7 +124,7 @@ public Task<T> PutAsync(T entity, CancellationToken cancellation = default)
async IAsyncEnumerable<T> EnumerateBinaryAsync(Func<IQueryable<IDocumentEntity>, IQueryable<IDocumentEntity>> filter, [EnumeratorCancellation] CancellationToken cancellation = default)
{
var query = new TableRepositoryQuery<BinaryDocumentEntity>(
storageAccount,
tableConnection.StorageAccount,
DocumentSerializer.Default,
TableName,
nameof(IDocumentEntity.PartitionKey),
Expand All @@ -141,7 +145,7 @@ async IAsyncEnumerable<T> EnumerateBinaryAsync(Func<IQueryable<IDocumentEntity>,

async Task<T?> GetBinaryAsync(string partitionKey, string rowKey, CancellationToken cancellation = default)
{
var table = await this.table.ConfigureAwait(false);
var table = await this.tableConnection.GetTableAsync().ConfigureAwait(false);

try
{
Expand All @@ -164,7 +168,7 @@ async Task<T> PutBinaryAsync(T entity, CancellationToken cancellation = default)
{
var partitionKey = this.partitionKey.Invoke(entity);
var rowKey = this.rowKey.Invoke(entity);
var table = await this.table.ConfigureAwait(false);
var table = await this.tableConnection.GetTableAsync().ConfigureAwait(false);

// We use Replace because all the existing entity data is in a single
// column, no point in merging since it can't be done at that level anyway.
Expand All @@ -187,7 +191,7 @@ async Task<T> PutBinaryAsync(T entity, CancellationToken cancellation = default)
async IAsyncEnumerable<T> EnumerateStringAsync(Func<IQueryable<IDocumentEntity>, IQueryable<IDocumentEntity>> filter, [EnumeratorCancellation] CancellationToken cancellation = default)
{
IQueryable<IDocumentEntity> query = new TableRepositoryQuery<DocumentEntity>(
storageAccount,
tableConnection.StorageAccount,
DocumentSerializer.Default,
TableName, null, null);

Expand All @@ -207,7 +211,7 @@ async IAsyncEnumerable<T> EnumerateStringAsync(Func<IQueryable<IDocumentEntity>,

async Task<T?> GetStringAsync(string partitionKey, string rowKey, CancellationToken cancellation = default)
{
var table = await this.table.ConfigureAwait(false);
var table = await this.tableConnection.GetTableAsync().ConfigureAwait(false);

try
{
Expand All @@ -229,7 +233,7 @@ async Task<T> PutStringAsync(T entity, CancellationToken cancellation = default)
{
var partitionKey = this.partitionKey.Invoke(entity);
var rowKey = this.rowKey.Invoke(entity);
var table = await this.table.ConfigureAwait(false);
var table = await this.tableConnection.GetTableAsync().ConfigureAwait(false);

// We use Replace because all the existing entity data is in a single
// column, no point in merging since it can't be done at that level anyway.
Expand All @@ -246,13 +250,5 @@ async Task<T> PutStringAsync(T entity, CancellationToken cancellation = default)
}

#endregion

Task<TableClient> GetTableAsync(string tableName) => Task.Run(async () =>
{
var tableClient = storageAccount.CreateTableServiceClient();
var table = tableClient.GetTableClient(tableName);
await table.CreateIfNotExistsAsync();
return table;
});
}
}
}
2 changes: 1 addition & 1 deletion src/TableStorage/DocumentSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Devlooped
{
/// <summary>
/// Default implementation of <see cref="IStringDocumentSerializer"/> which
/// uses Newtonsoft.Json for serialization.
/// uses System.Text.Json for serialization.
/// </summary>
partial class DocumentSerializer : IStringDocumentSerializer
{
Expand Down
49 changes: 49 additions & 0 deletions src/TableStorage/TableConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System.Threading.Tasks;
using Azure.Data.Tables;

namespace Devlooped
{
/// <summary>
/// Represents a connection to a given <see cref="TableName"/>
/// over a given <see cref="CloudStorageAccount"/>.
/// </summary>
partial class TableConnection
{
readonly CloudStorageAccount storageAccount;
TableClient? table;

/// <summary>
/// Creates the affinitized table connection for the given table name.
/// </summary>
/// <param name="storageAccount">The storage account to use.</param>
/// <param name="tableName">The table to connect to.</param>
public TableConnection(CloudStorageAccount storageAccount, string tableName)
{
this.storageAccount = storageAccount;
TableName = tableName;
}

/// <summary>
/// Gets the storage account used to connect to the table.
/// </summary>
public CloudStorageAccount StorageAccount => storageAccount;

/// <summary>
/// Gets the name of the table to use.
/// </summary>
public string TableName { get; }

/// <summary>
/// Gets table client, creating the table if it doesn't exist.
/// </summary>
internal async Task<TableClient> GetTableAsync() => table ??= await CreateTableClientAsync();

async Task<TableClient> CreateTableClientAsync()
{
var tableClient = storageAccount.CreateTableServiceClient();
var table = tableClient.GetTableClient(TableName);
await table.CreateIfNotExistsAsync();
return table;
}
}
}
15 changes: 12 additions & 3 deletions src/TableStorage/TableEntityPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,23 @@ partial class TableEntityPartition : ITablePartition<TableEntity>
/// <param name="tableName">The table that backs this table partition.</param>
/// <param name="partitionKey">The fixed partition key that backs this table partition.</param>
protected internal TableEntityPartition(CloudStorageAccount storageAccount, string tableName, string partitionKey)
: this(new TableConnection(storageAccount, tableName), partitionKey)
{
}

/// <summary>
/// Initializes the repository with the given storage account and optional table name.
/// </summary>
/// <param name="tableConnection">The <see cref="TableConnection"/> to use to connect to the table.</param>
/// <param name="partitionKey">The fixed partition key that backs this table partition.</param>
protected internal TableEntityPartition(TableConnection tableConnection, string partitionKey)
{
TableName = tableName;
PartitionKey = partitionKey;
repository = new TableEntityRepository(storageAccount, TableName);
repository = new TableEntityRepository(tableConnection);
}

/// <inheritdoc />
public string TableName { get; }
public string TableName => repository.TableName;

/// <inheritdoc />
public string PartitionKey { get; }
Expand Down
Loading