-
Notifications
You must be signed in to change notification settings - Fork 499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CosmosClient Initialization: Adds implementation for opening Rntbd connections to backend replica nodes in Direct mode. #3508
Changes from 12 commits
0207c4b
ed84069
c6e6f50
a95c573
56f7496
0ae5412
8906062
771cb1a
c0a63d5
11e4ad1
c449a15
91e4ffc
0210784
65d665b
375ce39
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1359,15 +1359,15 @@ public void Dispose() | |
/// <remarks> | ||
/// Test hook to enable unit test of DocumentClient. | ||
/// </remarks> | ||
internal IStoreModel StoreModel { get; set; } | ||
internal IStoreModelExtension StoreModel { get; set; } | ||
|
||
/// <summary> | ||
/// Gets and sets the gateway IStoreModel object. | ||
/// </summary> | ||
/// <remarks> | ||
/// Test hook to enable unit test of DocumentClient. | ||
/// </remarks> | ||
internal IStoreModel GatewayStoreModel { get; set; } | ||
internal IStoreModelExtension GatewayStoreModel { get; set; } | ||
|
||
/// <summary> | ||
/// Gets and sets on execute scalar query callback | ||
|
@@ -1462,6 +1462,44 @@ internal async Task<DocumentServiceResponse> ProcessRequestAsync( | |
} | ||
} | ||
|
||
/// <summary> | ||
/// Establishes and Initializes the Rntbd connection to all the backend replica nodes | ||
/// for the given database name and container. | ||
/// </summary> | ||
/// <param name="databaseName">A string containing the cosmos database name.</param> | ||
/// <param name="containerLinkUri">A string containing the cosmos container link uri.</param> | ||
/// <param name="cancellationToken">An instance of the <see cref="CancellationToken"/>.</param> | ||
internal async Task OpenConnectionsToAllReplicasAsync( | ||
string databaseName, | ||
string containerLinkUri, | ||
CancellationToken cancellationToken) | ||
{ | ||
if (string.IsNullOrEmpty(databaseName) || | ||
string.IsNullOrEmpty(containerLinkUri)) | ||
{ | ||
string resourceName = string.IsNullOrEmpty(databaseName) ? | ||
nameof(databaseName) : | ||
nameof(containerLinkUri); | ||
|
||
throw new ArgumentNullException(resourceName); | ||
ealsur marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
if (this.StoreModel != null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When is it expected? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think there is an scenario where StoreModel is null, we have checks like this in the same file, but can potentially be removed without issues |
||
{ | ||
try | ||
{ | ||
await this.StoreModel.OpenConnectionsToAllReplicasAsync( | ||
databaseName, | ||
containerLinkUri, | ||
cancellationToken); | ||
} | ||
catch (Exception) | ||
{ | ||
throw; | ||
} | ||
} | ||
} | ||
|
||
private static string NormalizeAuthorizationPayload(string input) | ||
{ | ||
const int expansionBuffer = 12; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ namespace Microsoft.Azure.Cosmos | |
using Newtonsoft.Json; | ||
|
||
// Marking it as non-sealed in order to unit test it using Moq framework | ||
internal class GatewayStoreModel : IStoreModel, IDisposable | ||
internal class GatewayStoreModel : IStoreModelExtension, IDisposable | ||
{ | ||
private static readonly string sessionConsistencyAsString = ConsistencyLevel.Session.ToString(); | ||
|
||
|
@@ -482,5 +482,10 @@ private Uri GetFeedUri(DocumentServiceRequest request) | |
{ | ||
return new Uri(this.endpointManager.ResolveServiceEndpoint(request), PathsHelper.GeneratePath(request.ResourceType, request, true)); | ||
} | ||
|
||
public Task OpenConnectionsToAllReplicasAsync(string databaseName, string containerLinkUri, CancellationToken cancellationToken = default) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Argument per line |
||
{ | ||
throw new NotImplementedException(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -412,6 +412,19 @@ internal override BatchAsyncContainerExecutor GetExecutorForContainer(ContainerI | |
return this.batchExecutorCache.GetExecutorForContainer(container, this); | ||
} | ||
|
||
/// <inheritdoc/> | ||
internal override async Task InitializeContainerUsingRntbdAsync( | ||
string databaseId, | ||
string containerLinkUri, | ||
CancellationToken cancellationToken) | ||
{ | ||
await this.DocumentClient.EnsureValidClientAsync(NoOpTrace.Singleton); | ||
await this.DocumentClient.OpenConnectionsToAllReplicasAsync( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
One option is we could leverage it and start creating connections after its populated. It's a serial execution but that might suffice for current scenarios. Thoghts? |
||
databaseId, | ||
containerLinkUri, | ||
cancellationToken); | ||
} | ||
|
||
public override void Dispose() | ||
{ | ||
this.Dispose(true); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,6 @@ namespace Microsoft.Azure.Cosmos.Routing | |
using System; | ||
using System.Collections.Concurrent; | ||
using System.Collections.Generic; | ||
using System.Diagnostics.CodeAnalysis; | ||
using System.Globalization; | ||
using System.Linq; | ||
using System.Net; | ||
|
@@ -16,6 +15,7 @@ namespace Microsoft.Azure.Cosmos.Routing | |
using System.Threading.Tasks; | ||
using Microsoft.Azure.Cosmos.Common; | ||
using Microsoft.Azure.Cosmos.Core.Trace; | ||
using Microsoft.Azure.Cosmos.Query.Core.Monads; | ||
using Microsoft.Azure.Cosmos.Tracing; | ||
using Microsoft.Azure.Cosmos.Tracing.TraceData; | ||
using Microsoft.Azure.Documents; | ||
|
@@ -84,15 +84,14 @@ public GatewayAddressCache( | |
|
||
public Uri ServiceEndpoint => this.serviceEndpoint; | ||
|
||
[SuppressMessage("", "AsyncFixer02", Justification = "Multi task completed with await")] | ||
[SuppressMessage("", "AsyncFixer04", Justification = "Multi task completed outside of await")] | ||
public async Task OpenAsync( | ||
public async Task OpenConnectionsAsync( | ||
string databaseName, | ||
ContainerProperties collection, | ||
IReadOnlyList<PartitionKeyRangeIdentity> partitionKeyRangeIdentities, | ||
Func<Uri, Task> openConnectionHandler, | ||
CancellationToken cancellationToken) | ||
{ | ||
List<Task<DocumentServiceResponse>> tasks = new List<Task<DocumentServiceResponse>>(); | ||
List<Task<TryCatch<DocumentServiceResponse>>> tasks = new (); | ||
int batchSize = GatewayAddressCache.DefaultBatchSize; | ||
|
||
#if !(NETSTANDARD15 || NETSTANDARD16) | ||
|
@@ -110,8 +109,14 @@ public async Task OpenAsync( | |
#endif | ||
#endif | ||
|
||
string collectionAltLink = string.Format(CultureInfo.InvariantCulture, "{0}/{1}/{2}/{3}", Paths.DatabasesPathSegment, Uri.EscapeUriString(databaseName), | ||
Paths.CollectionsPathSegment, Uri.EscapeUriString(collection.Id)); | ||
string collectionAltLink = string.Format( | ||
CultureInfo.InvariantCulture, | ||
"{0}/{1}/{2}/{3}", | ||
Paths.DatabasesPathSegment, | ||
Uri.EscapeUriString(databaseName), | ||
Paths.CollectionsPathSegment, | ||
Uri.EscapeUriString(collection.Id)); | ||
|
||
using (DocumentServiceRequest request = DocumentServiceRequest.CreateFromName( | ||
OperationType.Read, | ||
collectionAltLink, | ||
|
@@ -120,17 +125,22 @@ public async Task OpenAsync( | |
{ | ||
for (int i = 0; i < partitionKeyRangeIdentities.Count; i += batchSize) | ||
{ | ||
tasks.Add(this.GetServerAddressesViaGatewayAsync( | ||
request, | ||
collection.ResourceId, | ||
partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId), | ||
false)); | ||
tasks | ||
.Add(this.GetAddressesAsync( | ||
request: request, | ||
collectionRid: collection.ResourceId, | ||
partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId))); | ||
} | ||
} | ||
|
||
foreach (DocumentServiceResponse response in await Task.WhenAll(tasks)) | ||
foreach (TryCatch<DocumentServiceResponse> task in await Task.WhenAll(tasks)) | ||
kundadebdatta marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is Monoid type used jsut to capture exception? One alternative is to do simple try-catch and use Task to get the exception. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Monad is simpler/easier to work with afterwards, you cannot use try/catch inside the task with Task.WhenAll There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Additionally - We needed an approach where the task is throwing an exception, it should continue the execution and makes sure, it doesn't fail the other pending tasks. There is a way we could achieve the same using |
||
{ | ||
using (response) | ||
if (task.Failed) | ||
{ | ||
continue; | ||
} | ||
|
||
using (DocumentServiceResponse response = task.Result) | ||
kundadebdatta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
FeedResource<Address> addressFeed = response.GetResource<FeedResource<Address>>(); | ||
|
||
|
@@ -146,11 +156,50 @@ public async Task OpenAsync( | |
this.serverPartitionAddressCache.Set( | ||
new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId), | ||
addressInfo.Item2); | ||
|
||
if (openConnectionHandler != null) | ||
{ | ||
await this.OpenRntbdChannelsAsync( | ||
addressInfo, | ||
openConnectionHandler); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Invokes the transport client delegate to open the Rntbd connection | ||
/// and establish Rntbd context negotiation to the backend replica nodes. | ||
/// </summary> | ||
/// <param name="addressInfo">An instance of <see cref="Tuple{T1, T2}"/> containing the partition key id | ||
/// and it's corresponding address information.</param> | ||
/// <param name="openConnectionHandlerAsync">The transport client callback delegate to be invoked at a | ||
/// later point of time.</param> | ||
private async Task OpenRntbdChannelsAsync( | ||
Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo, | ||
Func<Uri, Task> openConnectionHandlerAsync) | ||
{ | ||
foreach (AddressInformation address in addressInfo.Item2.AllAddresses) | ||
{ | ||
DefaultTrace.TraceVerbose("Attempting to open Rntbd connection to backend uri: {0}. '{1}'", | ||
address.PhysicalUri, | ||
System.Diagnostics.Trace.CorrelationManager.ActivityId); | ||
try | ||
{ | ||
await openConnectionHandlerAsync( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As contract the caller is deciding to serialize creation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is only 1 transport that would call this (Direct) though? |
||
new Uri(address.PhysicalUri)); | ||
} | ||
catch (Exception ex) | ||
{ | ||
DefaultTrace.TraceWarning("Failed to open Rntbd connection to backend uri: {0} with exception: {1}. '{2}'", | ||
address.PhysicalUri, | ||
ex, | ||
System.Diagnostics.Trace.CorrelationManager.ActivityId); | ||
} | ||
} | ||
} | ||
|
||
public async Task<PartitionAddressInformation> TryGetAddressesAsync( | ||
DocumentServiceRequest request, | ||
PartitionKeyRangeIdentity partitionKeyRangeIdentity, | ||
|
@@ -698,6 +747,41 @@ private static string ProtocolString(Protocol protocol) | |
}; | ||
} | ||
|
||
/// <summary> | ||
/// Utilizes the <see cref="TryCatch{TResult}"/> to get the server addresses. If an | ||
/// exception is thrown during the invocation, it handles it gracefully and returns | ||
/// a <see cref="TryCatch{TResult}"/> Task containing the exception. | ||
/// </summary> | ||
/// <param name="request">An instance of <see cref="DocumentServiceRequest"/> containing the request payload.</param> | ||
/// <param name="collectionRid">A string containing the collection ids.</param> | ||
/// <param name="partitionKeyRangeIds">An instance of <see cref="IEnumerable{T}"/> containing the list of partition key range ids.</param> | ||
/// <returns>A task of <see cref="TryCatch{TResult}"/> containing the result.</returns> | ||
private async Task<TryCatch<DocumentServiceResponse>> GetAddressesAsync( | ||
DocumentServiceRequest request, | ||
string collectionRid, | ||
IEnumerable<string> partitionKeyRangeIds) | ||
{ | ||
try | ||
{ | ||
return TryCatch<DocumentServiceResponse> | ||
.FromResult( | ||
await this.GetServerAddressesViaGatewayAsync( | ||
request: request, | ||
collectionRid: collectionRid, | ||
partitionKeyRangeIds: partitionKeyRangeIds, | ||
forceRefresh: false)); | ||
} | ||
catch (Exception ex) | ||
{ | ||
DefaultTrace.TraceWarning("Failed to fetch the server addresses for: {0} with exception: {1}. '{2}'", | ||
collectionRid, | ||
ex, | ||
System.Diagnostics.Trace.CorrelationManager.ActivityId); | ||
|
||
return TryCatch<DocumentServiceResponse>.FromException(ex); | ||
} | ||
} | ||
|
||
protected virtual void Dispose(bool disposing) | ||
{ | ||
if (this.disposedValue) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Personally I like one if-statement for argument for readablity.