Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CosmosClient Initialization: Adds implementation for opening Rntbd connections to backend replica nodes in Direct mode. #3508

Merged
merged 15 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<ClientOfficialVersion>3.31.0</ClientOfficialVersion>
<ClientPreviewVersion>3.31.0</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
<DirectVersion>3.29.1</DirectVersion>
<DirectVersion>3.29.2</DirectVersion>
<EncryptionOfficialVersion>2.0.0</EncryptionOfficialVersion>
<EncryptionPreviewVersion>2.0.0</EncryptionPreviewVersion>
<EncryptionPreviewSuffixVersion>preview</EncryptionPreviewSuffixVersion>
Expand Down
56 changes: 20 additions & 36 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Handlers;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Telemetry;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
Expand Down Expand Up @@ -1352,15 +1353,30 @@ private FeedIteratorInternal GetDatabaseQueryStreamIteratorHelper(
options: requestOptions);
}

internal async Task InitializeContainersAsync(IReadOnlyList<(string databaseId, string containerId)> containers,
CancellationToken cancellationToken)
/// <summary>
/// Initializes the container by creating the Rntbd
/// connection to all of the backend replica nodes.
/// </summary>
/// <param name="containers">A read-only list containing the database id
/// and their respective container id.</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
internal async Task InitializeContainersAsync(
IReadOnlyList<(string databaseId, string containerId)> containers,
CancellationToken cancellationToken)
{
try
{
List<Task> tasks = new List<Task>();
List<Task> tasks = new ();
foreach ((string databaseId, string containerId) in containers)
{
tasks.Add(this.InitializeContainerAsync(databaseId, containerId, cancellationToken));
ContainerInternal container = (ContainerInternal)this.GetContainer(
databaseId,
containerId);

tasks.Add(this.ClientContext.InitializeContainerUsingRntbdAsync(
databaseId: databaseId,
containerLinkUri: container.LinkUri,
cancellationToken: cancellationToken));
}

await Task.WhenAll(tasks);
Expand Down Expand Up @@ -1395,38 +1411,6 @@ private int DecrementNumberOfActiveClients()
return 0;
}

private async Task InitializeContainerAsync(string databaseId, string containerId, CancellationToken cancellationToken = default)
{
ContainerInternal container = (ContainerInternal)this.GetContainer(databaseId, containerId);

IReadOnlyList<FeedRange> feedRanges = await container.GetFeedRangesAsync(cancellationToken);
List<Task> tasks = new List<Task>();
foreach (FeedRange feedRange in feedRanges)
{
tasks.Add(CosmosClient.InitializeFeedRangeAsync(container, feedRange, cancellationToken));
}

await Task.WhenAll(tasks);
}

private static async Task InitializeFeedRangeAsync(ContainerInternal container, FeedRange feedRange, CancellationToken cancellationToken = default)
{
// Do a dummy querry for each Partition Key Range to warm up the caches and connections
string guidToCheck = Guid.NewGuid().ToString();
QueryDefinition queryDefinition = new QueryDefinition($"select * from c where c.id = '{guidToCheck}'");
using (FeedIterator feedIterator = container.GetItemQueryStreamIterator(feedRange,
queryDefinition,
continuationToken: null,
requestOptions: new QueryRequestOptions() { }))
{
while (feedIterator.HasMoreResults)
{
using ResponseMessage response = await feedIterator.ReadNextAsync(cancellationToken);
response.EnsureSuccessStatusCode();
}
}
}

/// <summary>
/// Dispose of cosmos client
/// </summary>
Expand Down
42 changes: 40 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1359,15 +1359,15 @@ public void Dispose()
/// <remarks>
/// Test hook to enable unit test of DocumentClient.
/// </remarks>
internal IStoreModel StoreModel { get; set; }
internal IStoreModelExtension StoreModel { get; set; }

/// <summary>
/// Gets and sets the gateway IStoreModel object.
/// </summary>
/// <remarks>
/// Test hook to enable unit test of DocumentClient.
/// </remarks>
internal IStoreModel GatewayStoreModel { get; set; }
internal IStoreModelExtension GatewayStoreModel { get; set; }

/// <summary>
/// Gets and sets on execute scalar query callback
Expand Down Expand Up @@ -1462,6 +1462,44 @@ internal async Task<DocumentServiceResponse> ProcessRequestAsync(
}
}

/// <summary>
/// Establishes and Initializes the Rntbd connection to all the backend replica nodes
/// for the given database name and container.
/// </summary>
/// <param name="databaseName">A string containing the cosmos database name.</param>
/// <param name="containerLinkUri">A string containing the cosmos container link uri.</param>
/// <param name="cancellationToken">An instance of the <see cref="CancellationToken"/>.</param>
internal async Task OpenConnectionsToAllReplicasAsync(
string databaseName,
string containerLinkUri,
CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(databaseName) ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Personally I like one if-statement for argument for readablity.

string.IsNullOrEmpty(containerLinkUri))
{
string resourceName = string.IsNullOrEmpty(databaseName) ?
nameof(databaseName) :
nameof(containerLinkUri);

throw new ArgumentNullException(resourceName);
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}

if (this.StoreModel != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is it expected?
If so isn't it better to fail than silently easting?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is an scenario where StoreModel is null, we have checks like this in the same file, but can potentially be removed without issues

{
try
{
await this.StoreModel.OpenConnectionsToAllReplicasAsync(
databaseName,
containerLinkUri,
cancellationToken);
}
catch (Exception)
{
throw;
}
}
}

private static string NormalizeAuthorizationPayload(string input)
{
const int expansionBuffer = 12;
Expand Down
7 changes: 6 additions & 1 deletion Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace Microsoft.Azure.Cosmos
using Newtonsoft.Json;

// Marking it as non-sealed in order to unit test it using Moq framework
internal class GatewayStoreModel : IStoreModel, IDisposable
internal class GatewayStoreModel : IStoreModelExtension, IDisposable
{
private static readonly string sessionConsistencyAsString = ConsistencyLevel.Session.ToString();

Expand Down Expand Up @@ -482,5 +482,10 @@ private Uri GetFeedUri(DocumentServiceRequest request)
{
return new Uri(this.endpointManager.ResolveServiceEndpoint(request), PathsHelper.GeneratePath(request.ResourceType, request, true));
}

public Task OpenConnectionsToAllReplicasAsync(string databaseName, string containerLinkUri, CancellationToken cancellationToken = default)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argument per line

{
return Task.CompletedTask;
}
}
}
13 changes: 13 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,19 @@ internal override BatchAsyncContainerExecutor GetExecutorForContainer(ContainerI
return this.batchExecutorCache.GetExecutorForContainer(container, this);
}

/// <inheritdoc/>
internal override async Task InitializeContainerUsingRntbdAsync(
string databaseId,
string containerLinkUri,
CancellationToken cancellationToken)
{
await this.DocumentClient.EnsureValidClientAsync(NoOpTrace.Singleton);
await this.DocumentClient.OpenConnectionsToAllReplicasAsync(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.DocumentClient.InitializeCachesAsync will take care of populating both partitionKeyRange and addresses caches.

One option is we could leverage it and start creating connections after its populated. It's a serial execution but that might suffice for current scenarios. Thoghts?

databaseId,
containerLinkUri,
cancellationToken);
}

public override void Dispose()
{
this.Dispose(true);
Expand Down
12 changes: 12 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/CosmosClientContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ internal abstract Task<T> ProcessResourceOperationAsync<T>(
ITrace trace,
CancellationToken cancellationToken);

/// <summary>
/// Initializes the given container by establishing the
/// Rntbd connection to all of the backend replica nodes.
/// </summary>
/// <param name="databaseId">A string containing the cosmos database identifier.</param>
/// <param name="containerLinkUri">A string containing the cosmos container link uri.</param>
/// <param name="cancellationToken">An instance of the <see cref="CancellationToken"/>.</param>
internal abstract Task InitializeContainerUsingRntbdAsync(
string databaseId,
string containerLinkUri,
CancellationToken cancellationToken);

public abstract void Dispose();
}
}
112 changes: 98 additions & 14 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace Microsoft.Azure.Cosmos.Routing
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Linq;
using System.Net;
Expand All @@ -16,6 +15,7 @@ namespace Microsoft.Azure.Cosmos.Routing
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -84,15 +84,14 @@ public GatewayAddressCache(

public Uri ServiceEndpoint => this.serviceEndpoint;

[SuppressMessage("", "AsyncFixer02", Justification = "Multi task completed with await")]
[SuppressMessage("", "AsyncFixer04", Justification = "Multi task completed outside of await")]
public async Task OpenAsync(
public async Task OpenConnectionsAsync(
string databaseName,
ContainerProperties collection,
IReadOnlyList<PartitionKeyRangeIdentity> partitionKeyRangeIdentities,
Func<Uri, Task> openConnectionHandler,
CancellationToken cancellationToken)
{
List<Task<DocumentServiceResponse>> tasks = new List<Task<DocumentServiceResponse>>();
List<Task<TryCatch<DocumentServiceResponse>>> tasks = new ();
int batchSize = GatewayAddressCache.DefaultBatchSize;

#if !(NETSTANDARD15 || NETSTANDARD16)
Expand All @@ -110,8 +109,14 @@ public async Task OpenAsync(
#endif
#endif

string collectionAltLink = string.Format(CultureInfo.InvariantCulture, "{0}/{1}/{2}/{3}", Paths.DatabasesPathSegment, Uri.EscapeUriString(databaseName),
Paths.CollectionsPathSegment, Uri.EscapeUriString(collection.Id));
string collectionAltLink = string.Format(
CultureInfo.InvariantCulture,
"{0}/{1}/{2}/{3}",
Paths.DatabasesPathSegment,
Uri.EscapeUriString(databaseName),
Paths.CollectionsPathSegment,
Uri.EscapeUriString(collection.Id));

using (DocumentServiceRequest request = DocumentServiceRequest.CreateFromName(
OperationType.Read,
collectionAltLink,
Expand All @@ -120,17 +125,22 @@ public async Task OpenAsync(
{
for (int i = 0; i < partitionKeyRangeIdentities.Count; i += batchSize)
{
tasks.Add(this.GetServerAddressesViaGatewayAsync(
request,
collection.ResourceId,
partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId),
false));
tasks
.Add(this.GetAddressesAsync(
request: request,
collectionRid: collection.ResourceId,
partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId)));
}
}

foreach (DocumentServiceResponse response in await Task.WhenAll(tasks))
foreach (TryCatch<DocumentServiceResponse> task in await Task.WhenAll(tasks))
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Monoid type used jsut to capture exception?

One alternative is to do simple try-catch and use Task to get the exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Monad is simpler/easier to work with afterwards, you cannot use try/catch inside the task with Task.WhenAll

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally - We needed an approach where the task is throwing an exception, it should continue the execution and makes sure, it doesn't fail the other pending tasks. There is a way we could achieve the same using Task.ContinueWith() but it's not a preferred way in dotnet. per this .net best practices and recommendations, it appears that await ing on a task is a preferred design choice over .ContinueWith(). Therefore, we decided to leverage the TryCatch framework.

{
using (response)
if (task.Failed)
{
continue;
}

using (DocumentServiceResponse response = task.Result)
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
{
FeedResource<Address> addressFeed = response.GetResource<FeedResource<Address>>();

Expand All @@ -146,11 +156,50 @@ public async Task OpenAsync(
this.serverPartitionAddressCache.Set(
new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);

if (openConnectionHandler != null)
{
await this.OpenRntbdChannelsAsync(
addressInfo,
openConnectionHandler);
}
}
}
}
}

/// <summary>
/// Invokes the transport client delegate to open the Rntbd connection
/// and establish Rntbd context negotiation to the backend replica nodes.
/// </summary>
/// <param name="addressInfo">An instance of <see cref="Tuple{T1, T2}"/> containing the partition key id
/// and it's corresponding address information.</param>
/// <param name="openConnectionHandlerAsync">The transport client callback delegate to be invoked at a
/// later point of time.</param>
private async Task OpenRntbdChannelsAsync(
Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo,
Func<Uri, Task> openConnectionHandlerAsync)
{
foreach (AddressInformation address in addressInfo.Item2.AllAddresses)
{
DefaultTrace.TraceVerbose("Attempting to open Rntbd connection to backend uri: {0}. '{1}'",
address.PhysicalUri,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
try
{
await openConnectionHandlerAsync(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As contract the caller is deciding to serialize creation.
Isn't it better to let the transport decide on it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only 1 transport that would call this (Direct) though?

new Uri(address.PhysicalUri));
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to open Rntbd connection to backend uri: {0} with exception: {1}. '{2}'",
address.PhysicalUri,
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
}
}

public async Task<PartitionAddressInformation> TryGetAddressesAsync(
DocumentServiceRequest request,
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
Expand Down Expand Up @@ -698,6 +747,41 @@ private static string ProtocolString(Protocol protocol)
};
}

/// <summary>
/// Utilizes the <see cref="TryCatch{TResult}"/> to get the server addresses. If an
/// exception is thrown during the invocation, it handles it gracefully and returns
/// a <see cref="TryCatch{TResult}"/> Task containing the exception.
/// </summary>
/// <param name="request">An instance of <see cref="DocumentServiceRequest"/> containing the request payload.</param>
/// <param name="collectionRid">A string containing the collection ids.</param>
/// <param name="partitionKeyRangeIds">An instance of <see cref="IEnumerable{T}"/> containing the list of partition key range ids.</param>
/// <returns>A task of <see cref="TryCatch{TResult}"/> containing the result.</returns>
private async Task<TryCatch<DocumentServiceResponse>> GetAddressesAsync(
DocumentServiceRequest request,
string collectionRid,
IEnumerable<string> partitionKeyRangeIds)
{
try
{
return TryCatch<DocumentServiceResponse>
.FromResult(
await this.GetServerAddressesViaGatewayAsync(
request: request,
collectionRid: collectionRid,
partitionKeyRangeIds: partitionKeyRangeIds,
forceRefresh: false));
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to fetch the server addresses for: {0} with exception: {1}. '{2}'",
collectionRid,
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);

return TryCatch<DocumentServiceResponse>.FromException(ex);
}
}

protected virtual void Dispose(bool disposing)
{
if (this.disposedValue)
Expand Down
Loading