Skip to content

Commit

Permalink
Query and change feed: Add serialization optimization by using list i…
Browse files Browse the repository at this point in the history
…nstead of by item (#1516)

* ReadFeed and ChangeFeed optimization.
  • Loading branch information
j82w authored May 15, 2020
1 parent 46e8aca commit 08dfe3e
Show file tree
Hide file tree
Showing 15 changed files with 429 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ private static IEnumerable<JObject> GetItemsFromResponse(ResponseMessage respons
return new Collection<JObject>();
}

return CosmosContainerExtensions.DefaultJsonSerializer.FromFeedResponseStream<JObject>(
response.Content,
ResourceType.Document);
return CosmosFeedResponseSerializer.FromFeedResponseStream<JObject>(
CosmosContainerExtensions.DefaultJsonSerializer,
response.Content);
}

private async Task<long> GetRemainingWorkAsync(DocumentServiceLease existingLease, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ private Task DispatchChangesAsync(ResponseMessage response, CancellationToken ca
IEnumerable<T> asFeedResponse;
try
{
asFeedResponse = this.serializerCore.FromFeedResponseStream<T>(
response.Content,
Documents.ResourceType.Document);
asFeedResponse = CosmosFeedResponseSerializer.FromFeedResponseStream<T>(
this.serializerCore,
response.Content);
}
catch (Exception serializationException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ private async Task<IReadOnlyList<DocumentServiceLeaseCore>> ListDocumentsAsync(s
using (ResponseMessage responseMessage = await iterator.ReadNextAsync().ConfigureAwait(false))
{
responseMessage.EnsureSuccessStatusCode();
leases.AddRange(CosmosContainerExtensions.DefaultJsonSerializer.FromFeedResponseStream<DocumentServiceLeaseCore>(
responseMessage.Content,
Documents.ResourceType.Document));
leases.AddRange(CosmosFeedResponseSerializer.FromFeedResponseStream<DocumentServiceLeaseCore>(
CosmosContainerExtensions.DefaultJsonSerializer,
responseMessage.Content));
}
}

Expand Down
30 changes: 8 additions & 22 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/ReadFeedResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,21 @@
namespace Microsoft.Azure.Cosmos
{
using System.Collections.Generic;
using System.IO;
using System.Net;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Serializer;

internal class ReadFeedResponse<T> : FeedResponse<T>
{
protected ReadFeedResponse(
HttpStatusCode httpStatusCode,
CosmosArray cosmosArray,
CosmosSerializerCore serializerCore,
IReadOnlyCollection<T> resources,
Headers responseMessageHeaders,
CosmosDiagnostics diagnostics)
{
this.Count = cosmosArray != null ? cosmosArray.Count : 0;
this.Count = resources?.Count ?? 0;
this.Headers = responseMessageHeaders;
this.StatusCode = httpStatusCode;
this.Diagnostics = diagnostics;
this.Resource = CosmosElementSerializer.GetResources<T>(
cosmosArray: cosmosArray,
serializerCore: serializerCore);
this.Resource = resources;
}

public override int Count { get; }
Expand All @@ -47,8 +40,7 @@ public override IEnumerator<T> GetEnumerator()

internal static ReadFeedResponse<TInput> CreateResponse<TInput>(
ResponseMessage responseMessage,
CosmosSerializerCore serializerCore,
Documents.ResourceType resourceType)
CosmosSerializerCore serializerCore)
{
using (responseMessage)
{
Expand All @@ -58,19 +50,13 @@ internal static ReadFeedResponse<TInput> CreateResponse<TInput>(
responseMessage.EnsureSuccessStatusCode();
}

CosmosArray cosmosArray = null;
if (responseMessage.Content != null)
{
cosmosArray = CosmosElementSerializer.ToCosmosElements(
responseMessage.Content,
resourceType,
null);
}
IReadOnlyCollection<TInput> resources = CosmosFeedResponseSerializer.FromFeedResponseStream<TInput>(
serializerCore,
responseMessage.Content);

ReadFeedResponse<TInput> readFeedResponse = new ReadFeedResponse<TInput>(
httpStatusCode: responseMessage.StatusCode,
cosmosArray: cosmosArray,
serializerCore: serializerCore,
resources: resources,
responseMessageHeaders: responseMessage.Headers,
diagnostics: responseMessage.Diagnostics);

Expand Down
21 changes: 4 additions & 17 deletions Microsoft.Azure.Cosmos/src/Resource/CosmosResponseFactoryCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,7 @@ public override FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage)
{
return this.CreateChangeFeedResponseHelper<T>(
responseMessage,
Documents.ResourceType.Document);
}

public override FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage,
Documents.ResourceType resourceType)
{
return this.CreateChangeFeedResponseHelper<T>(
responseMessage,
resourceType);
responseMessage);
}

public override FeedResponse<T> CreateQueryFeedUserTypeResponse<T>(
Expand Down Expand Up @@ -75,18 +65,15 @@ private FeedResponse<T> CreateQueryFeedResponseHelper<T>(

return ReadFeedResponse<T>.CreateResponse<T>(
cosmosResponseMessage,
this.serializerCore,
resourceType);
this.serializerCore);
}

private FeedResponse<T> CreateChangeFeedResponseHelper<T>(
ResponseMessage cosmosResponseMessage,
Documents.ResourceType resourceType)
ResponseMessage cosmosResponseMessage)
{
return ReadFeedResponse<T>.CreateResponse<T>(
cosmosResponseMessage,
this.serializerCore,
resourceType);
this.serializerCore);
}

public override ItemResponse<T> CreateItemResponse<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ internal abstract class CosmosResponseFactoryInternal : CosmosResponseFactory
public abstract FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage);

public abstract FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage,
Documents.ResourceType resourceType);

public abstract FeedResponse<T> CreateQueryFeedUserTypeResponse<T>(
ResponseMessage responseMessage);

Expand Down
80 changes: 33 additions & 47 deletions Microsoft.Azure.Cosmos/src/Serializer/CosmosElementSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,6 @@ namespace Microsoft.Azure.Cosmos.Serializer
#endif
static class CosmosElementSerializer
{
/// <summary>
/// Converts a list of CosmosElements into a memory stream.
/// </summary>
/// <param name="stream">The stream response from Azure Cosmos</param>
/// <param name="resourceType">The resource type</param>
/// <param name="cosmosSerializationOptions">The custom serialization options. This allows custom serialization types like BSON, JSON, or other formats</param>
/// <returns>Returns a memory stream of cosmos elements. By default the memory stream will contain JSON.</returns>
internal static CosmosArray ToCosmosElements(
Stream stream,
ResourceType resourceType,
CosmosSerializationFormatOptions cosmosSerializationOptions = null)
{
MemoryStream memoryStream = stream as MemoryStream;
if (memoryStream == null)
{
memoryStream = new MemoryStream();
stream.CopyTo(memoryStream);
}

return CosmosElementSerializer.ToCosmosElements(
memoryStream,
resourceType,
cosmosSerializationOptions);
}
/// <summary>
/// Converts a list of CosmosElements into a memory stream.
/// </summary>
Expand Down Expand Up @@ -236,16 +212,10 @@ internal static MemoryStream ToStream(

jsonWriter.WriteObjectEnd();

ReadOnlyMemory<byte> result = jsonWriter.GetResult();
if (!MemoryMarshal.TryGetArray(result, out ArraySegment<byte> resultAsArray))
{
resultAsArray = new ArraySegment<byte>(result.ToArray());
}

return new MemoryStream(resultAsArray.Array, resultAsArray.Offset, resultAsArray.Count, writable: false, publiclyVisible: true);
return GetMemoryStreamFromJsonWriter(jsonWriter);
}

internal static IEnumerable<T> GetResources<T>(
internal static IReadOnlyList<T> GetResources<T>(
IReadOnlyList<CosmosElement> cosmosArray,
CosmosSerializerCore serializerCore)
{
Expand All @@ -256,36 +226,35 @@ internal static IEnumerable<T> GetResources<T>(

if (typeof(CosmosElement).IsAssignableFrom(typeof(T)))
{
return cosmosArray.Cast<T>();
return cosmosArray.Cast<T>().ToList();
}

return CosmosElementSerializer.GetResourcesHelper<T>(
cosmosArray,
serializerCore);
}

private static IEnumerable<T> GetResourcesHelper<T>(
IReadOnlyList<CosmosElement> cosmosArray,
CosmosSerializerCore serializerCore)
internal static T[] GetResourcesHelper<T>(
IReadOnlyList<CosmosElement> cosmosElements,
CosmosSerializerCore serializerCore,
CosmosSerializationFormatOptions cosmosSerializationOptions = null)
{
List<T> result = new List<T>();
foreach (CosmosElement element in cosmosArray)
using (MemoryStream memoryStream = ElementsToMemoryStream(
cosmosElements,
cosmosSerializationOptions))
{
MemoryStream memory = CosmosElementSerializer.ElementToMemoryStream(element, null);
result.Add(serializerCore.FromStream<T>(memory));
return serializerCore.FromFeedStream<T>(memoryStream);
}

return result;
}

/// <summary>
/// Converts a list of CosmosElements into a memory stream.
/// </summary>
/// <param name="cosmosElement">The cosmos elements</param>
/// <param name="cosmosElements">The cosmos elements</param>
/// <param name="cosmosSerializationOptions">The custom serialization options. This allows custom serialization types like BSON, JSON, or other formats</param>
/// <returns>Returns a memory stream of cosmos elements. By default the memory stream will contain JSON.</returns>
private static MemoryStream ElementToMemoryStream(
CosmosElement cosmosElement,
internal static MemoryStream ElementsToMemoryStream(
IReadOnlyList<CosmosElement> cosmosElements,
CosmosSerializationFormatOptions cosmosSerializationOptions = null)
{
IJsonWriter jsonWriter;
Expand All @@ -298,15 +267,32 @@ private static MemoryStream ElementToMemoryStream(
jsonWriter = JsonWriter.Create(JsonSerializationFormat.Text);
}

cosmosElement.WriteTo(jsonWriter);
jsonWriter.WriteArrayStart();

foreach (CosmosElement element in cosmosElements)
{
element.WriteTo(jsonWriter);
}

jsonWriter.WriteArrayEnd();

return GetMemoryStreamFromJsonWriter(jsonWriter);
}

private static MemoryStream GetMemoryStreamFromJsonWriter(IJsonWriter jsonWriter)
{
ReadOnlyMemory<byte> result = jsonWriter.GetResult();
if (!MemoryMarshal.TryGetArray(result, out ArraySegment<byte> resultAsArray))
{
resultAsArray = new ArraySegment<byte>(result.ToArray());
}

return new MemoryStream(resultAsArray.Array, resultAsArray.Offset, resultAsArray.Count);
return new MemoryStream(
buffer: resultAsArray.Array,
index: resultAsArray.Offset,
count: resultAsArray.Count,
writable: false,
publiclyVisible: true);
}

private static string GetRootNodeName(ResourceType resourceType)
Expand Down
Loading

0 comments on commit 08dfe3e

Please sign in to comment.