Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throughput fix for throwing on not found and using custom serializer #772

Merged
merged 9 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public async override Task<ThroughputResponse> ReplaceThroughputAsync(

CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
return await cosmosOffers.ReplaceThroughputAsync(
resourceName: this.Id,
targetRID: rid,
throughput: throughput,
requestOptions: requestOptions,
Expand Down
30 changes: 27 additions & 3 deletions Microsoft.Azure.Cosmos/src/Resource/CosmosResponseFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,44 @@ internal CosmosResponseFactory(
this.cosmosSerializer = userJsonSerializer;
}

internal FeedResponse<T> CreateQueryFeedResponseWithPropertySerializer<T>(
ResponseMessage cosmosResponseMessage)
{
return this.CreateQueryFeedResponseHelper<T>(
cosmosResponseMessage,
true);
}

internal FeedResponse<T> CreateQueryFeedResponse<T>(
ResponseMessage cosmosResponseMessage)
{
return this.CreateQueryFeedResponseHelper<T>(
cosmosResponseMessage,
false);
}

private FeedResponse<T> CreateQueryFeedResponseHelper<T>(
ResponseMessage cosmosResponseMessage,
bool usePropertySerializer)
{
//Throw the exception
cosmosResponseMessage.EnsureSuccessStatusCode();

// The property serializer should be used for internal
// query operations like throughput since user serializer can break the logic
CosmosSerializer serializer = usePropertySerializer ? this.propertiesSerializer : this.cosmosSerializer;

QueryResponse queryResponse = cosmosResponseMessage as QueryResponse;
if (queryResponse != null)
{
return QueryResponse<T>.CreateResponse<T>(
cosmosQueryResponse: queryResponse,
jsonSerializer: this.cosmosSerializer);
jsonSerializer: serializer);
}

return ReadFeedResponse<T>.CreateResponse<T>(
cosmosResponseMessage,
this.cosmosSerializer);
serializer);
}

internal Task<ItemResponse<T>> CreateItemResponseAsync<T>(
Expand Down Expand Up @@ -112,7 +133,10 @@ internal Task<DatabaseResponse> CreateDatabaseResponseAsync(
{
return this.ProcessMessageAsync(cosmosResponseMessageTask, (cosmosResponseMessage) =>
{
DatabaseProperties databaseProperties = this.ToObjectInternal<DatabaseProperties>(cosmosResponseMessage, this.propertiesSerializer);
DatabaseProperties databaseProperties = this.ToObjectInternal<DatabaseProperties>(
cosmosResponseMessage,
this.propertiesSerializer);

return new DatabaseResponse(
cosmosResponseMessage.StatusCode,
cosmosResponseMessage.Headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public async override Task<ThroughputResponse> ReplaceThroughputAsync(
string rid = await this.GetRIDAsync(cancellationToken);
CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
return await cosmosOffers.ReplaceThroughputAsync(
resourceName: this.Id,
targetRID: rid,
throughput: throughput,
requestOptions: requestOptions,
Expand Down
32 changes: 19 additions & 13 deletions Microsoft.Azure.Cosmos/src/Resource/Offer/CosmosOffers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ internal async Task<ThroughputResponse> ReadThroughputAsync(
CancellationToken cancellationToken = default(CancellationToken))
{
OfferV2 offerV2 = await this.GetOfferV2Async(targetRID, cancellationToken);
if (offerV2 == null)
{
return new ThroughputResponse(HttpStatusCode.NotFound, null, null);
}

return await this.GetThroughputResponseAsync(
streamPayload: null,
Expand All @@ -40,12 +44,18 @@ internal async Task<ThroughputResponse> ReadThroughputAsync(
}

internal async Task<ThroughputResponse> ReplaceThroughputAsync(
string resourceName,
string targetRID,
int throughput,
RequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken))
{
OfferV2 offerV2 = await this.GetOfferV2Async(targetRID, cancellationToken);
if (offerV2 == null)
{
throw new CosmosException(HttpStatusCode.NotFound, $"Throughput is not configured for resource:{resourceName}");
}

OfferV2 newOffer = new OfferV2(offerV2, throughput);

return await this.GetThroughputResponseAsync(
Expand Down Expand Up @@ -73,11 +83,6 @@ internal async Task<OfferV2> GetOfferV2Async(
queryDefinition);
OfferV2 offerV2 = await this.SingleOrDefaultAsync<OfferV2>(databaseStreamIterator);

if (offerV2 == null)
{
throw new CosmosException(HttpStatusCode.NotFound, "Throughput is not configured");
}

return offerV2;
}

Expand All @@ -87,15 +92,15 @@ internal virtual FeedIterator<T> GetOfferQueryIterator<T>(
QueryRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
FeedIterator databaseStreamIterator = GetOfferQueryStreamIterator(
FeedIterator databaseStreamIterator = this.GetOfferQueryStreamIterator(
queryDefinition,
continuationToken,
requestOptions,
cancellationToken);

return new FeedIteratorCore<T>(
databaseStreamIterator,
this.ClientContext.ResponseFactory.CreateQueryFeedResponse<T>);
this.ClientContext.ResponseFactory.CreateQueryFeedResponseWithPropertySerializer<T>);
}

internal virtual FeedIterator GetOfferQueryStreamIterator(
Expand All @@ -105,12 +110,13 @@ internal virtual FeedIterator GetOfferQueryStreamIterator(
CancellationToken cancellationToken = default(CancellationToken))
{
return new FeedIteratorCore(
this.ClientContext,
this.OfferRootUri,
ResourceType.Offer,
queryDefinition,
continuationToken,
requestOptions);
clientContext: this.ClientContext,
resourceLink: this.OfferRootUri,
resourceType: ResourceType.Offer,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
options: requestOptions,
usePropertySerializer: true);
}

private CosmosOfferResult GetThroughputIfExists(Offer offer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class FeedIteratorCore : FeedIterator
private readonly Uri resourceLink;
private readonly ResourceType resourceType;
private readonly SqlQuerySpec querySpec;
private readonly bool usePropertySerializer;
private bool hasMoreResultsInternal;

internal FeedIteratorCore(
Expand All @@ -29,14 +30,16 @@ internal FeedIteratorCore(
ResourceType resourceType,
QueryDefinition queryDefinition,
string continuationToken,
QueryRequestOptions options)
QueryRequestOptions options,
bool usePropertySerializer = false)
{
this.resourceLink = resourceLink;
this.clientContext = clientContext;
this.resourceType = resourceType;
this.querySpec = queryDefinition?.ToSqlQuerySpec();
this.continuationToken = continuationToken;
this.requestOptions = options;
this.usePropertySerializer = usePropertySerializer;
this.hasMoreResultsInternal = true;
}

Expand All @@ -63,7 +66,13 @@ internal FeedIteratorCore(
OperationType operation = OperationType.ReadFeed;
if (this.querySpec != null)
{
stream = this.clientContext.SqlQuerySpecSerializer.ToStream(querySpec);
// Use property serializer is for internal query operations like throughput
// that should not use custom serializer
CosmosSerializer serializer = this.usePropertySerializer ?
this.clientContext.PropertiesSerializer :
this.clientContext.SqlQuerySpecSerializer;

stream = serializer.ToStream(this.querySpec);
operation = OperationType.Query;
}

Expand All @@ -77,7 +86,7 @@ internal FeedIteratorCore(
streamPayload: stream,
requestEnricher: request =>
{
QueryRequestOptions.FillContinuationToken(request, continuationToken);
QueryRequestOptions.FillContinuationToken(request, this.continuationToken);
if (this.querySpec != null)
{
request.Headers.Add(HttpConstants.HttpHeaders.ContentType, MediaTypes.QueryJson);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public async Task ReadDatabase()
Assert.AreEqual(createResponse.Database.Id, readResponse.Database.Id);
Assert.AreEqual(createResponse.Resource.Id, readResponse.Resource.Id);
Assert.AreNotEqual(createResponse.ActivityId, readResponse.ActivityId);
ValidateHeaders(readResponse);
this.ValidateHeaders(readResponse);
await createResponse.Database.DeleteAsync(cancellationToken: this.cancellationToken);
}

Expand All @@ -208,15 +208,8 @@ public async Task NoThroughputTests()
Assert.AreEqual(HttpStatusCode.Created, createResponse.StatusCode);

Cosmos.Database cosmosDatabase = createResponse;
try
{
int? readThroughput = await ((DatabaseCore)cosmosDatabase).ReadThroughputAsync();
Assert.Fail("Should through not found exception as throughput is not configured");
}
catch (CosmosException exception)
{
Assert.AreEqual(HttpStatusCode.NotFound, exception.StatusCode);
}
int? readThroughput = await cosmosDatabase.ReadThroughputAsync();
Assert.IsNull(readThroughput);

await cosmosDatabase.DeleteAsync();
}
Expand All @@ -243,7 +236,8 @@ public async Task SharedThroughputTests()
{
readThroughput = await ((ContainerCore)container).ReadThroughputAsync();
Assert.Fail("Should through not found exception as throughput is not configured");
} catch (CosmosException exception)
}
catch (CosmosException exception)
{
Assert.AreEqual(HttpStatusCode.NotFound, exception.StatusCode);
}
Expand All @@ -255,9 +249,21 @@ public async Task SharedThroughputTests()
[TestMethod]
public async Task ReadReplaceThroughputResponseTests()
{
int toStreamCount = 0;
int fromStreamCount = 0;

CosmosSerializerHelper mockJsonSerializer = new CosmosSerializerHelper(
null,
(x) => fromStreamCount++,
(x) => toStreamCount++);

//Create a new cosmos client with the mocked cosmos json serializer
CosmosClient client = TestCommon.CreateCosmosClient(
(cosmosClientBuilder) => cosmosClientBuilder.WithCustomSerializer(mockJsonSerializer));

string databaseId = Guid.NewGuid().ToString();
int throughput = 10000;
DatabaseResponse createResponse = await this.CreateDatabaseHelper(databaseId, databaseExists: false, throughput: throughput);
DatabaseResponse createResponse = await client.CreateDatabaseAsync(databaseId, throughput, null);
Assert.AreEqual(HttpStatusCode.Created, createResponse.StatusCode);

Cosmos.Database cosmosDatabase = createResponse;
Expand Down Expand Up @@ -288,17 +294,35 @@ public async Task ReadReplaceThroughputResponseTests()
Assert.AreEqual(readThroughputResponse.Resource.Throughput.Value + 1000, replaceThroughputResponse.Resource.Throughput.Value);

Container container = containerResponse;
readThroughputResponse = await container.ReadThroughputAsync(new RequestOptions());
Assert.IsNotNull(readThroughputResponse);
Assert.AreEqual(readThroughputResponse.StatusCode, HttpStatusCode.NotFound);
Assert.IsNull(readThroughputResponse.Resource);

await container.DeleteContainerAsync();

Container containerNoTroughput = await cosmosDatabase.CreateContainerAsync(
Guid.NewGuid().ToString(),
partitionPath,
throughput: null);

Assert.AreEqual(HttpStatusCode.Created, containerResponse.StatusCode);

int? noThroughput = await containerNoTroughput.ReadThroughputAsync();
Assert.IsNull(noThroughput);

try
{
readThroughputResponse = await container.ReadThroughputAsync(new RequestOptions());
await containerNoTroughput.ReplaceThroughputAsync(600);
Assert.Fail("Should through not found exception as throughput is not configured");
}
catch (CosmosException exception)
{
Assert.AreEqual(HttpStatusCode.NotFound, exception.StatusCode);
}

await container.DeleteContainerAsync();
Assert.AreEqual(0, toStreamCount, "Custom serializer to stream should not be used for offer operations");
Assert.AreEqual(0, fromStreamCount, "Custom serializer from stream should not be used for offer operations");
await cosmosDatabase.DeleteAsync();
}

Expand Down Expand Up @@ -361,7 +385,7 @@ public async Task DatabaseQueryIterator()
deleteList.Add(createResponse.Database);
DatabaseResponse createResponse3 = await this.cosmosClient.CreateDatabaseIfNotExistsAsync(thirdDb);
deleteList.Add(createResponse3.Database);

FeedIterator<DatabaseProperties> feedIterator =
this.cosmosClient.GetDatabaseQueryIterator<DatabaseProperties>(
new QueryDefinition("select c.id From c where c.id = @id ")
Expand Down Expand Up @@ -414,7 +438,7 @@ private async Task<DatabaseResponse> CreateDatabaseHelper(
Assert.IsNotNull(response.Resource);
Assert.AreEqual(databaseId, response.Resource.Id);
Assert.AreEqual(databaseId, response.Database.Id);
ValidateHeaders(response);
this.ValidateHeaders(response);

Assert.IsTrue(response.StatusCode == HttpStatusCode.OK || (response.StatusCode == HttpStatusCode.Created && !databaseExists));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public async Task TestCustomJsonSerializer()
{
int toStreamCount = 0;
int fromStreamCount = 0;

Mock<CosmosSerializer> mockJsonSerializer = new Mock<CosmosSerializer>();

//The item object will be serialized with the custom json serializer.
ToDoActivity testItem = CreateRandomToDoActivity();
ToDoActivity testItem = this.CreateRandomToDoActivity();
mockJsonSerializer.Setup(x => x.ToStream<ToDoActivity>(It.IsAny<ToDoActivity>()))
.Callback(()=> toStreamCount++)
.Callback(() => toStreamCount++)
.Returns(TestCommon.Serializer.ToStream<ToDoActivity>(testItem));

mockJsonSerializer.Setup(x => x.FromStream<ToDoActivity>(It.IsAny<Stream>()))
Expand Down Expand Up @@ -83,15 +83,15 @@ public async Task DefaultNullValueHandling()
cost = double.MaxValue
};

await container.UpsertItemAsync(document);
await this.container.UpsertItemAsync(document);

ResponseMessage cosmosResponseMessage = await container.ReadItemStreamAsync(document.id, new PartitionKey(document.status));
ResponseMessage cosmosResponseMessage = await this.container.ReadItemStreamAsync(document.id, new PartitionKey(document.status));
StreamReader reader = new StreamReader(cosmosResponseMessage.Content);
string text = reader.ReadToEnd();

Assert.IsTrue(text.IndexOf(nameof(document.description)) > -1, "Stored item doesn't contains null attributes");
}

private ToDoActivity CreateRandomToDoActivity(string pk = null)
{
if (string.IsNullOrEmpty(pk))
Expand Down
Loading