Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed missing retry after for stream queries. #1263

Merged
merged 10 commits into from
Mar 17, 2020
6 changes: 4 additions & 2 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -831,8 +831,10 @@ private HttpClientHandler CreateHttpClientHandler(CosmosClientOptions clientOpti
return null;
}

HttpClientHandler httpClientHandler = new HttpClientHandler();
httpClientHandler.Proxy = clientOptions.WebProxy;
HttpClientHandler httpClientHandler = new HttpClientHandler
{
Proxy = clientOptions.WebProxy
};

return httpClientHandler;
}
Expand Down
39 changes: 19 additions & 20 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace Microsoft.Azure.Cosmos.Query
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext;
Expand Down Expand Up @@ -135,10 +134,9 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc
diagnostics.AddDiagnosticsInternal(queryPage);
}

QueryResponse queryResponse;
if (responseCore.IsSuccess)
{
queryResponse = QueryResponse.CreateSuccess(
return QueryResponse.CreateSuccess(
result: responseCore.CosmosElements,
count: responseCore.CosmosElements.Count,
responseLengthBytes: responseCore.ResponseLengthBytes,
Expand All @@ -155,26 +153,27 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc
SubStatusCode = responseCore.SubStatusCode ?? Documents.SubStatusCodes.Unknown
});
}
else

if (responseCore.CosmosException != null)
{
queryResponse = QueryResponse.CreateFailure(
statusCode: responseCore.StatusCode,
cosmosException: responseCore.CosmosException,
requestMessage: null,
diagnostics: diagnostics,
responseHeaders: new CosmosQueryResponseMessageHeaders(
responseCore.ContinuationToken,
responseCore.DisallowContinuationTokenMessage,
cosmosQueryContext.ResourceTypeEnum,
cosmosQueryContext.ContainerResourceId)
{
RequestCharge = responseCore.RequestCharge,
ActivityId = responseCore.ActivityId,
SubStatusCode = responseCore.SubStatusCode ?? Documents.SubStatusCodes.Unknown
});
return responseCore.CosmosException.ToCosmosResponseMessage(null);
}

return queryResponse;
return QueryResponse.CreateFailure(
statusCode: responseCore.StatusCode,
cosmosException: responseCore.CosmosException,
requestMessage: null,
diagnostics: diagnostics,
responseHeaders: new CosmosQueryResponseMessageHeaders(
responseCore.ContinuationToken,
responseCore.DisallowContinuationTokenMessage,
cosmosQueryContext.ResourceTypeEnum,
cosmosQueryContext.ContainerResourceId)
{
RequestCharge = responseCore.RequestCharge,
ActivityId = responseCore.ActivityId,
SubStatusCode = responseCore.SubStatusCode ?? Documents.SubStatusCodes.Unknown,
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Cosmos.Scripts;
using Microsoft.Azure.Cosmos.Linq;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Documents.Collections;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Newtonsoft.Json;

[TestClass]
Expand All @@ -23,7 +26,7 @@ public class CosmosBasicQueryTests
private static CosmosClient DirectCosmosClient;
private static CosmosClient GatewayCosmosClient;
private const string DatabaseId = "CosmosBasicQueryTests";
private const string ContainerId = "ContainerBasicQueryTests";
private static readonly string ContainerId = "ContainerBasicQueryTests" + Guid.NewGuid();

[ClassInitialize]
public static async Task TestInit(TestContext textContext)
Expand Down Expand Up @@ -147,13 +150,149 @@ public async Task ContainerTest(bool directMode)
}
}

[TestMethod]
[DataRow(false)]
[DataRow(true)]
public async Task QueryRequestRateTest(bool directMode)
{
string firstItemIdAndPk = "BasicQueryItem" + Guid.NewGuid();

// Prevent the test from changing the static client
{
CosmosClient client = directMode ? DirectCosmosClient : GatewayCosmosClient;
Container container = client.GetContainer(DatabaseId, ContainerId);

List<string> createdIds = new List<string>()
{
firstItemIdAndPk,
"BasicQueryItem2"+ Guid.NewGuid(),
"BasicQueryItem3"+ Guid.NewGuid()
};

foreach (string id in createdIds)
{
dynamic item = new
{
id = id,
pk = id,
};

await container.CreateItemAsync<dynamic>(item: item);
}
}

CosmosClient clientWithThrottle;
if (directMode)
{
clientWithThrottle = TestCommon.CreateCosmosClient();
}
else
{
clientWithThrottle = TestCommon.CreateCosmosClient((builder) => builder.WithConnectionModeGateway());
}

Container containerWithThrottle = clientWithThrottle.GetContainer(DatabaseId, ContainerId);

// Do a read to warm up all the caches to prevent them from getting the throttle errors
using (await containerWithThrottle.ReadItemStreamAsync(firstItemIdAndPk, new PartitionKey(firstItemIdAndPk))) { }

Documents.IStoreModel storeModel = clientWithThrottle.ClientContext.DocumentClient.StoreModel;
Mock<Documents.IStoreModel> mockStore = new Mock<Documents.IStoreModel>();
clientWithThrottle.ClientContext.DocumentClient.StoreModel = mockStore.Object;

// Cause 429 after the first call
int callCount = 0;
string activityId = null;
string errorMessage = "QueryRequestRateTest Resource Not Found";
mockStore.Setup(x => x.ProcessMessageAsync(It.IsAny<Documents.DocumentServiceRequest>(), It.IsAny<CancellationToken>()))
.Returns<Documents.DocumentServiceRequest, CancellationToken>((dsr, token) =>
{
callCount++;

if (callCount > 1)
{
INameValueCollection headers = new DictionaryNameValueCollection();
headers.Add(Documents.HttpConstants.HttpHeaders.RetryAfterInMilliseconds, "42");
activityId = Guid.NewGuid().ToString();
headers.Add(Documents.HttpConstants.HttpHeaders.ActivityId, activityId);
Documents.DocumentServiceResponse response = new Documents.DocumentServiceResponse(
body: TestCommon.GenerateStreamFromString(@"{""Errors"":[""" + errorMessage + @"""]}"),
headers: headers,
statusCode: (HttpStatusCode)429,
clientSideRequestStatistics: dsr.RequestContext.ClientRequestStatistics);

return Task.FromResult(response);
}

return storeModel.ProcessMessageAsync(dsr, token);
});

List<dynamic> results = new List<dynamic>();
try
{
FeedIterator<dynamic> feedIterator = containerWithThrottle.GetItemQueryIterator<dynamic>(
"select * from T where STARTSWITH(T.id, \"BasicQueryItem\")",
requestOptions: new QueryRequestOptions()
{
MaxItemCount = 1,
MaxConcurrency = 1
});

while (feedIterator.HasMoreResults)
{
FeedResponse<dynamic> response = await feedIterator.ReadNextAsync();
Assert.IsTrue(response.Count <= 1);
Assert.IsTrue(response.Resource.Count() <= 1);

results.AddRange(response);
}
Assert.Fail("Should throw 429 exception after the first page.");
}
catch (CosmosException ce)
{
Assert.IsTrue(ce.RetryAfter.HasValue);
Assert.AreEqual(42, ce.RetryAfter.Value.TotalMilliseconds);
Assert.AreEqual(activityId, ce.ActivityId);
Assert.IsNotNull(ce.DiagnosticsContext);
Assert.IsTrue(ce.Message.Contains(errorMessage));
}

callCount = 0;
FeedIterator streamIterator = containerWithThrottle.GetItemQueryStreamIterator(
"select * from T where STARTSWITH(T.id, \"BasicQueryItem\")",
requestOptions: new QueryRequestOptions()
{
MaxItemCount = 1,
MaxConcurrency = 1
});

// First request should be a success
using (ResponseMessage response = await streamIterator.ReadNextAsync())
{
response.EnsureSuccessStatusCode();
Assert.IsNotNull(response.Content);
}

// Second page should be a failure
using (ResponseMessage response = await streamIterator.ReadNextAsync())
{
Assert.AreEqual(429, (int)response.StatusCode);
Assert.AreEqual("42", response.Headers.RetryAfterLiteral);
Assert.AreEqual(activityId, response.Headers.ActivityId);
Assert.IsNotNull(response.DiagnosticsContext);
Assert.IsTrue(response.ErrorMessage.Contains(errorMessage));
}
}

[TestMethod]
[DataRow(false)]
[DataRow(true)]
public async Task ItemTest(bool directMode)
{
CosmosClient client = directMode ? DirectCosmosClient : GatewayCosmosClient;
Container container = client.GetContainer(DatabaseId, ContainerId);
Database database = client.GetDatabase(DatabaseId);
Container container = await database.CreateContainerAsync(Guid.NewGuid().ToString(), "/pk");

List<string> createdIds = new List<string>()
{
"BasicQueryItem",
Expand Down Expand Up @@ -250,7 +389,9 @@ public async Task ItemTest(bool directMode)
public async Task ScriptsStoredProcedureTest(bool directMode)
{
CosmosClient client = directMode ? DirectCosmosClient : GatewayCosmosClient;
Scripts scripts = client.GetContainer(DatabaseId, ContainerId).Scripts;
Database database = client.GetDatabase(DatabaseId);
Container container = await database.CreateContainerAsync(Guid.NewGuid().ToString(), "/pk");
Scripts scripts = container.Scripts;

List<string> createdIds = new List<string>()
{
Expand Down Expand Up @@ -302,7 +443,9 @@ public async Task ScriptsStoredProcedureTest(bool directMode)
public async Task ScriptsUserDefinedFunctionTest(bool directMode)
{
CosmosClient client = directMode ? DirectCosmosClient : GatewayCosmosClient;
Scripts scripts = client.GetContainer(DatabaseId, ContainerId).Scripts;
Database database = client.GetDatabase(DatabaseId);
Container container = await database.CreateContainerAsync(Guid.NewGuid().ToString(), "/pk");
Scripts scripts = container.Scripts;

List<string> createdIds = new List<string>()
{
Expand Down Expand Up @@ -354,7 +497,9 @@ public async Task ScriptsUserDefinedFunctionTest(bool directMode)
public async Task ScriptsTriggerTest(bool directMode)
{
CosmosClient client = directMode ? DirectCosmosClient : GatewayCosmosClient;
Scripts scripts = client.GetContainer(DatabaseId, ContainerId).Scripts;
Database database = client.GetDatabase(DatabaseId);
Container container = await database.CreateContainerAsync(Guid.NewGuid().ToString(), "/pk");
Scripts scripts = container.Scripts;

List<string> createdIds = new List<string>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ static TestCommon()
TestCommon.masterStalenessIntervalInSeconds = int.Parse(ConfigurationManager.AppSettings["MasterStalenessIntervalInSeconds"], CultureInfo.InvariantCulture);
}

internal static MemoryStream GenerateStreamFromString(string s)
{
MemoryStream stream = new MemoryStream();
StreamWriter writer = new StreamWriter(stream);
writer.Write(s);
writer.Flush();
stream.Position = 0;
return stream;
}

internal static (string endpoint, string authKey) GetAccountInfo()
{
string authKey = ConfigurationManager.AppSettings["MasterKey"];
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#1242](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1242) Client encryption - Fix bug in read path without encrypted properties
- [#1189](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1189) Query diagnostics shows correct overall time.
- [#1189](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1189) Fixed a bug that caused duplicate information in diagnostic context.
- [#1263](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1263) Fix a bug where retry after internval did not get set on query stream responses
- [#1268](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1268) Fix bug where Request Options was getting lost for Database.ReadStreamAsync and Database.DeleteStreamAsync

## <a name="3.7.0-preview2"/> [3.7.0-preview2](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.7.0-preview2) - 2020-03-09
Expand Down