Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Upgrade Resiliency: Adds Code to Enable Replica Validation Feature Through CosmosClientOptions #3967

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,18 @@ public Func<HttpClient> HttpClientFactory
set;
}

/// <summary>
/// Gets or sets the boolean flag to enable replica validation.
/// </summary>
/// <value>
/// The default value for this parameter is false.
/// </value>
public bool EnableAdvancedReplicaSelectionForTcp
{
get;
set;
}

/// <summary>
/// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end.
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,17 @@ public ConnectionMode ConnectionMode
/// <seealso cref="TransactionalBatchItemRequestOptions.EnableContentResponseOnWrite"/>
public bool? EnableContentResponseOnWrite { get; set; }

/// <summary>
/// Gets or sets the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection
/// status, and based on status, it prioritizes the replicas which show healthy stable connections, so that the requests can be sent
/// confidently to the particular replica. This helps the cosmos client to become more resilient and effective to any connectivity issues.
/// The default value for this parameter is 'false'.
/// </summary>
/// <remarks>
/// <para>This is optimal for latency-sensitive workloads. Does not apply if <see cref="ConnectionMode.Gateway"/> is used.</para>
/// </remarks>
internal bool EnableAdvancedReplicaSelectionForTcp { get; set; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is explicit differentiation about opted/not vs default behavior for compute?
If so one option is to model it as nullable. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to EnableContentResponseOnWrite

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, not setting this flag EnableAdvancedReplicaSelectionForTcp will automatically make the SDK to opt out from replica validation. In order to opt the feature, Compute doesn't need to specify any value for this field and the default value for this flag is always false.

I think this brings more simplicity, since we are anyhow not going to make this flag to public.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make the SDK default to disabled if the bool? has not value by just capturing:

bool isEnabled = EnableAdvancedReplicaSelectionForTcp.HasValue && EnableAdvancedReplicaSelectionForTcp.Value;

I think Kiran's point is that semantically it's always better to make settings bool? like EnableContentResponseOnWrite because it allows us (for whatever reason) to tell apart the scenario where the property's setter was not touched (use default) and when the user did set some preference (either true or false). In this particular case there might not be a special case for not touched, but for consistency's sake, if we are using bool? for other settings, might as well use it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its an optional property and hence the conversation.

If compute if always set it explicitly (true/false) then for current requirements its makes sense.


/// <summary>
/// (Direct/TCP) Controls the amount of idle time after which unused connections are closed.
/// </summary>
Expand Down Expand Up @@ -758,6 +769,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
EnablePartitionLevelFailover = this.EnablePartitionLevelFailover,
PortReuseMode = this.portReuseMode,
EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery,
EnableAdvancedReplicaSelectionForTcp = this.EnableAdvancedReplicaSelectionForTcp,
HttpClientFactory = this.httpClientFactory,
ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback
};
Expand Down
4 changes: 1 addition & 3 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
private const string DefaultInitTaskKey = "InitTaskKey";

private readonly bool IsLocalQuorumConsistency = false;
private readonly bool isReplicaAddressValidationEnabled;

//Auth
internal readonly AuthorizationTokenProvider cosmosAuthorization;
Expand Down Expand Up @@ -233,7 +232,6 @@ public DocumentClient(Uri serviceEndpoint,

this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is CosmosCLientOptions needs to default to ConfigurationManager.IsReplicaAddressValidationEnabled()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we ship a version that supported setting the environment variable and is anyone using it? If yes, we might want to have backward compatibility?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CosmosClientOptions does not need to default to ConfigurationManager.IsReplicaAddressValidationEnabled(). This was an earlier implementation, which was reading the environment variable to set the replica validation flag. Since we never shipped this feature with any version of the SDK, we can safely remove this and rely on the CosmosClientOptions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we never shipped this feature with any version of the SDK, we can safely remove this

Then yeah, no issues from my side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do external CX enable/disable this feature?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to have both the environment variable and CosmosClientOptions way of enabling and disabling the feature ? My understanding was we will flight it with Compute first, and thereafter we will enable it by default for the external Cx ? That way, there is no need to have any env variables/ client options.

}

/// <summary>
Expand Down Expand Up @@ -6703,7 +6701,7 @@ private void CreateStoreModel(bool subscribeRntbdStatus)
!this.enableRntbdChannel,
this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong),
true,
enableReplicaValidation: this.isReplicaAddressValidationEnabled);
enableReplicaValidation: this.ConnectionPolicy.EnableAdvancedReplicaSelectionForTcp);

if (subscribeRntbdStatus)
{
Expand Down
8 changes: 3 additions & 5 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace Microsoft.Azure.Cosmos.Routing
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
Expand All @@ -38,7 +37,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos
private readonly CosmosHttpClient httpClient;
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
private readonly bool enableTcpConnectionEndpointRediscovery;
private readonly bool isReplicaAddressValidationEnabled;
private readonly bool replicaAddressValidationEnabled;
private IOpenConnectionsHandler openConnectionsHandler;

public GlobalAddressResolver(
Expand Down Expand Up @@ -66,8 +65,7 @@ public GlobalAddressResolver(
? GlobalAddressResolver.MaxBackupReadRegions : 0;

this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery;

this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled();
this.replicaAddressValidationEnabled = connectionPolicy.EnableAdvancedReplicaSelectionForTcp;

this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover)

Expand Down Expand Up @@ -284,7 +282,7 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint)
this.httpClient,
this.openConnectionsHandler,
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery,
replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled);
replicaAddressValidationEnabled: this.replicaAddressValidationEnabled);

string location = this.endpointManager.GetLocation(endpoint);
AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,58 @@ public async Task Cleanup()
}

[TestMethod]
public async Task ReadManyTypedTest()
[DataRow(true, DisplayName = "Validates Read Many scenario with advanced replica selection enabled.")]
[DataRow(false, DisplayName = "Validates Read Many scenario with advanced replica selection disabled.")]
public async Task ReadManyTypedTestWithAdvancedReplicaSelection(
bool advancedReplicaSelectionEnabled)
{
List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i=0; i<10; i++)
CosmosClientOptions clientOptions = new ()
{
itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString())));
}
EnableAdvancedReplicaSelectionForTcp = advancedReplicaSelectionEnabled,
};

FeedResponse<ToDoActivity> feedResponse= await this.Container.ReadManyItemsAsync<ToDoActivity>(itemList);
Assert.IsNotNull(feedResponse);
Assert.AreEqual(feedResponse.Count, 10);
Assert.IsTrue(feedResponse.Headers.RequestCharge > 0);
Assert.IsNotNull(feedResponse.Diagnostics);
Database database = null;
CosmosClient cosmosClient = TestCommon.CreateCosmosClient(clientOptions);
try
{
database = await cosmosClient.CreateDatabaseAsync("ReadManyTypedTestScenarioDb");
Container container = await database.CreateContainerAsync("ReadManyTypedTestContainer", "/pk");

int count = 0;
foreach (ToDoActivity item in feedResponse)
// Create items with different pk values
for (int i = 0; i < 500; i++)
{
ToDoActivity item = ToDoActivity.CreateRandomToDoActivity();
item.pk = "pk" + i.ToString();
item.id = i.ToString();
ItemResponse<ToDoActivity> itemResponse = await container.CreateItemAsync(item);
Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode);
}

List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i = 0; i < 20; i++)
{
itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString())));
}

FeedResponse<ToDoActivity> feedResponse = await container.ReadManyItemsAsync<ToDoActivity>(itemList);
Assert.IsNotNull(feedResponse);
Assert.AreEqual(20, feedResponse.Count);
Assert.IsTrue(feedResponse.Headers.RequestCharge > 0);
Assert.IsNotNull(feedResponse.Diagnostics);

int count = 0;
foreach (ToDoActivity item in feedResponse)
{
count++;
Assert.IsNotNull(item);
}
Assert.AreEqual(20, count);
}
finally
{
count++;
Assert.IsNotNull(item);
await database.DeleteAsync();
cosmosClient.Dispose();
}
Assert.AreEqual(count, 10);
}

[TestMethod]
Expand Down
Loading