Skip to content

Commit

Permalink
CosmosClient Initialization: Adds implementation for opening Rntbd co…
Browse files Browse the repository at this point in the history
…nnections to backend replica nodes in Direct mode. (#3508)
  • Loading branch information
kundadebdatta authored Oct 25, 2022
1 parent 3cc0f74 commit c0ac87a
Show file tree
Hide file tree
Showing 13 changed files with 1,109 additions and 63 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<ClientOfficialVersion>3.31.0</ClientOfficialVersion>
<ClientPreviewVersion>3.31.0</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
<DirectVersion>3.29.1</DirectVersion>
<DirectVersion>3.29.2</DirectVersion>
<EncryptionOfficialVersion>2.0.0</EncryptionOfficialVersion>
<EncryptionPreviewVersion>2.0.0</EncryptionPreviewVersion>
<EncryptionPreviewSuffixVersion>preview</EncryptionPreviewSuffixVersion>
Expand Down
56 changes: 20 additions & 36 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Handlers;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Telemetry;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
Expand Down Expand Up @@ -1352,15 +1353,30 @@ private FeedIteratorInternal GetDatabaseQueryStreamIteratorHelper(
options: requestOptions);
}

internal async Task InitializeContainersAsync(IReadOnlyList<(string databaseId, string containerId)> containers,
CancellationToken cancellationToken)
/// <summary>
/// Initializes the container by creating the Rntbd
/// connection to all of the backend replica nodes.
/// </summary>
/// <param name="containers">A read-only list containing the database id
/// and their respective container id.</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
internal async Task InitializeContainersAsync(
IReadOnlyList<(string databaseId, string containerId)> containers,
CancellationToken cancellationToken)
{
try
{
List<Task> tasks = new List<Task>();
List<Task> tasks = new ();
foreach ((string databaseId, string containerId) in containers)
{
tasks.Add(this.InitializeContainerAsync(databaseId, containerId, cancellationToken));
ContainerInternal container = (ContainerInternal)this.GetContainer(
databaseId,
containerId);

tasks.Add(this.ClientContext.InitializeContainerUsingRntbdAsync(
databaseId: databaseId,
containerLinkUri: container.LinkUri,
cancellationToken: cancellationToken));
}

await Task.WhenAll(tasks);
Expand Down Expand Up @@ -1395,38 +1411,6 @@ private int DecrementNumberOfActiveClients()
return 0;
}

private async Task InitializeContainerAsync(string databaseId, string containerId, CancellationToken cancellationToken = default)
{
ContainerInternal container = (ContainerInternal)this.GetContainer(databaseId, containerId);

IReadOnlyList<FeedRange> feedRanges = await container.GetFeedRangesAsync(cancellationToken);
List<Task> tasks = new List<Task>();
foreach (FeedRange feedRange in feedRanges)
{
tasks.Add(CosmosClient.InitializeFeedRangeAsync(container, feedRange, cancellationToken));
}

await Task.WhenAll(tasks);
}

private static async Task InitializeFeedRangeAsync(ContainerInternal container, FeedRange feedRange, CancellationToken cancellationToken = default)
{
// Do a dummy querry for each Partition Key Range to warm up the caches and connections
string guidToCheck = Guid.NewGuid().ToString();
QueryDefinition queryDefinition = new QueryDefinition($"select * from c where c.id = '{guidToCheck}'");
using (FeedIterator feedIterator = container.GetItemQueryStreamIterator(feedRange,
queryDefinition,
continuationToken: null,
requestOptions: new QueryRequestOptions() { }))
{
while (feedIterator.HasMoreResults)
{
using ResponseMessage response = await feedIterator.ReadNextAsync(cancellationToken);
response.EnsureSuccessStatusCode();
}
}
}

/// <summary>
/// Dispose of cosmos client
/// </summary>
Expand Down
42 changes: 40 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1359,15 +1359,15 @@ public void Dispose()
/// <remarks>
/// Test hook to enable unit test of DocumentClient.
/// </remarks>
internal IStoreModel StoreModel { get; set; }
internal IStoreModelExtension StoreModel { get; set; }

/// <summary>
/// Gets and sets the gateway IStoreModel object.
/// </summary>
/// <remarks>
/// Test hook to enable unit test of DocumentClient.
/// </remarks>
internal IStoreModel GatewayStoreModel { get; set; }
internal IStoreModelExtension GatewayStoreModel { get; set; }

/// <summary>
/// Gets and sets on execute scalar query callback
Expand Down Expand Up @@ -1462,6 +1462,44 @@ internal async Task<DocumentServiceResponse> ProcessRequestAsync(
}
}

/// <summary>
/// Establishes and Initializes the Rntbd connection to all the backend replica nodes
/// for the given database name and container.
/// </summary>
/// <param name="databaseName">A string containing the cosmos database name.</param>
/// <param name="containerLinkUri">A string containing the cosmos container link uri.</param>
/// <param name="cancellationToken">An instance of the <see cref="CancellationToken"/>.</param>
internal async Task OpenConnectionsToAllReplicasAsync(
string databaseName,
string containerLinkUri,
CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(databaseName) ||
string.IsNullOrEmpty(containerLinkUri))
{
string resourceName = string.IsNullOrEmpty(databaseName) ?
nameof(databaseName) :
nameof(containerLinkUri);

throw new ArgumentNullException(resourceName);
}

if (this.StoreModel != null)
{
try
{
await this.StoreModel.OpenConnectionsToAllReplicasAsync(
databaseName,
containerLinkUri,
cancellationToken);
}
catch (Exception)
{
throw;
}
}
}

private static string NormalizeAuthorizationPayload(string input)
{
const int expansionBuffer = 12;
Expand Down
7 changes: 6 additions & 1 deletion Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace Microsoft.Azure.Cosmos
using Newtonsoft.Json;

// Marking it as non-sealed in order to unit test it using Moq framework
internal class GatewayStoreModel : IStoreModel, IDisposable
internal class GatewayStoreModel : IStoreModelExtension, IDisposable
{
private static readonly string sessionConsistencyAsString = ConsistencyLevel.Session.ToString();

Expand Down Expand Up @@ -482,5 +482,10 @@ private Uri GetFeedUri(DocumentServiceRequest request)
{
return new Uri(this.endpointManager.ResolveServiceEndpoint(request), PathsHelper.GeneratePath(request.ResourceType, request, true));
}

public Task OpenConnectionsToAllReplicasAsync(string databaseName, string containerLinkUri, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
}
}
13 changes: 13 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,19 @@ internal override BatchAsyncContainerExecutor GetExecutorForContainer(ContainerI
return this.batchExecutorCache.GetExecutorForContainer(container, this);
}

/// <inheritdoc/>
internal override async Task InitializeContainerUsingRntbdAsync(
string databaseId,
string containerLinkUri,
CancellationToken cancellationToken)
{
await this.DocumentClient.EnsureValidClientAsync(NoOpTrace.Singleton);
await this.DocumentClient.OpenConnectionsToAllReplicasAsync(
databaseId,
containerLinkUri,
cancellationToken);
}

public override void Dispose()
{
this.Dispose(true);
Expand Down
12 changes: 12 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/CosmosClientContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ internal abstract Task<T> ProcessResourceOperationAsync<T>(
ITrace trace,
CancellationToken cancellationToken);

/// <summary>
/// Initializes the given container by establishing the
/// Rntbd connection to all of the backend replica nodes.
/// </summary>
/// <param name="databaseId">A string containing the cosmos database identifier.</param>
/// <param name="containerLinkUri">A string containing the cosmos container link uri.</param>
/// <param name="cancellationToken">An instance of the <see cref="CancellationToken"/>.</param>
internal abstract Task InitializeContainerUsingRntbdAsync(
string databaseId,
string containerLinkUri,
CancellationToken cancellationToken);

public abstract void Dispose();
}
}
112 changes: 98 additions & 14 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace Microsoft.Azure.Cosmos.Routing
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Linq;
using System.Net;
Expand All @@ -16,6 +15,7 @@ namespace Microsoft.Azure.Cosmos.Routing
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -84,15 +84,14 @@ public GatewayAddressCache(

public Uri ServiceEndpoint => this.serviceEndpoint;

[SuppressMessage("", "AsyncFixer02", Justification = "Multi task completed with await")]
[SuppressMessage("", "AsyncFixer04", Justification = "Multi task completed outside of await")]
public async Task OpenAsync(
public async Task OpenConnectionsAsync(
string databaseName,
ContainerProperties collection,
IReadOnlyList<PartitionKeyRangeIdentity> partitionKeyRangeIdentities,
Func<Uri, Task> openConnectionHandler,
CancellationToken cancellationToken)
{
List<Task<DocumentServiceResponse>> tasks = new List<Task<DocumentServiceResponse>>();
List<Task<TryCatch<DocumentServiceResponse>>> tasks = new ();
int batchSize = GatewayAddressCache.DefaultBatchSize;

#if !(NETSTANDARD15 || NETSTANDARD16)
Expand All @@ -110,8 +109,14 @@ public async Task OpenAsync(
#endif
#endif

string collectionAltLink = string.Format(CultureInfo.InvariantCulture, "{0}/{1}/{2}/{3}", Paths.DatabasesPathSegment, Uri.EscapeUriString(databaseName),
Paths.CollectionsPathSegment, Uri.EscapeUriString(collection.Id));
string collectionAltLink = string.Format(
CultureInfo.InvariantCulture,
"{0}/{1}/{2}/{3}",
Paths.DatabasesPathSegment,
Uri.EscapeUriString(databaseName),
Paths.CollectionsPathSegment,
Uri.EscapeUriString(collection.Id));

using (DocumentServiceRequest request = DocumentServiceRequest.CreateFromName(
OperationType.Read,
collectionAltLink,
Expand All @@ -120,17 +125,22 @@ public async Task OpenAsync(
{
for (int i = 0; i < partitionKeyRangeIdentities.Count; i += batchSize)
{
tasks.Add(this.GetServerAddressesViaGatewayAsync(
request,
collection.ResourceId,
partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId),
false));
tasks
.Add(this.GetAddressesAsync(
request: request,
collectionRid: collection.ResourceId,
partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId)));
}
}

foreach (DocumentServiceResponse response in await Task.WhenAll(tasks))
foreach (TryCatch<DocumentServiceResponse> task in await Task.WhenAll(tasks))
{
using (response)
if (task.Failed)
{
continue;
}

using (DocumentServiceResponse response = task.Result)
{
FeedResource<Address> addressFeed = response.GetResource<FeedResource<Address>>();

Expand All @@ -146,11 +156,50 @@ public async Task OpenAsync(
this.serverPartitionAddressCache.Set(
new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);

if (openConnectionHandler != null)
{
await this.OpenRntbdChannelsAsync(
addressInfo,
openConnectionHandler);
}
}
}
}
}

/// <summary>
/// Invokes the transport client delegate to open the Rntbd connection
/// and establish Rntbd context negotiation to the backend replica nodes.
/// </summary>
/// <param name="addressInfo">An instance of <see cref="Tuple{T1, T2}"/> containing the partition key id
/// and it's corresponding address information.</param>
/// <param name="openConnectionHandlerAsync">The transport client callback delegate to be invoked at a
/// later point of time.</param>
private async Task OpenRntbdChannelsAsync(
Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo,
Func<Uri, Task> openConnectionHandlerAsync)
{
foreach (AddressInformation address in addressInfo.Item2.AllAddresses)
{
DefaultTrace.TraceVerbose("Attempting to open Rntbd connection to backend uri: {0}. '{1}'",
address.PhysicalUri,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
try
{
await openConnectionHandlerAsync(
new Uri(address.PhysicalUri));
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to open Rntbd connection to backend uri: {0} with exception: {1}. '{2}'",
address.PhysicalUri,
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
}
}

public async Task<PartitionAddressInformation> TryGetAddressesAsync(
DocumentServiceRequest request,
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
Expand Down Expand Up @@ -698,6 +747,41 @@ private static string ProtocolString(Protocol protocol)
};
}

/// <summary>
/// Utilizes the <see cref="TryCatch{TResult}"/> to get the server addresses. If an
/// exception is thrown during the invocation, it handles it gracefully and returns
/// a <see cref="TryCatch{TResult}"/> Task containing the exception.
/// </summary>
/// <param name="request">An instance of <see cref="DocumentServiceRequest"/> containing the request payload.</param>
/// <param name="collectionRid">A string containing the collection ids.</param>
/// <param name="partitionKeyRangeIds">An instance of <see cref="IEnumerable{T}"/> containing the list of partition key range ids.</param>
/// <returns>A task of <see cref="TryCatch{TResult}"/> containing the result.</returns>
private async Task<TryCatch<DocumentServiceResponse>> GetAddressesAsync(
DocumentServiceRequest request,
string collectionRid,
IEnumerable<string> partitionKeyRangeIds)
{
try
{
return TryCatch<DocumentServiceResponse>
.FromResult(
await this.GetServerAddressesViaGatewayAsync(
request: request,
collectionRid: collectionRid,
partitionKeyRangeIds: partitionKeyRangeIds,
forceRefresh: false));
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to fetch the server addresses for: {0} with exception: {1}. '{2}'",
collectionRid,
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);

return TryCatch<DocumentServiceResponse>.FromException(ex);
}
}

protected virtual void Dispose(bool disposing)
{
if (this.disposedValue)
Expand Down
Loading

0 comments on commit c0ac87a

Please sign in to comment.