Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query and change feed: Add serialization optimization by using array instead of by item #1516

Merged
merged 16 commits into from
May 15, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ private static IEnumerable<JObject> GetItemsFromResponse(ResponseMessage respons
return new Collection<JObject>();
}

return CosmosContainerExtensions.DefaultJsonSerializer.FromFeedResponseStream<JObject>(
response.Content,
ResourceType.Document);
return CosmosFeedResponseSerializer.FromFeedResponseStream<JObject>(
CosmosContainerExtensions.DefaultJsonSerializer,
response.Content);
}

private async Task<long> GetRemainingWorkAsync(DocumentServiceLease existingLease, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ private Task DispatchChangesAsync(ResponseMessage response, CancellationToken ca
IEnumerable<T> asFeedResponse;
try
{
asFeedResponse = this.serializerCore.FromFeedResponseStream<T>(
response.Content,
Documents.ResourceType.Document);
asFeedResponse = CosmosFeedResponseSerializer.FromFeedResponseStream<T>(
this.serializerCore,
response.Content);
}
catch (Exception serializationException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ private async Task<IReadOnlyList<DocumentServiceLeaseCore>> ListDocumentsAsync(s
using (ResponseMessage responseMessage = await iterator.ReadNextAsync().ConfigureAwait(false))
{
responseMessage.EnsureSuccessStatusCode();
leases.AddRange(CosmosContainerExtensions.DefaultJsonSerializer.FromFeedResponseStream<DocumentServiceLeaseCore>(
responseMessage.Content,
Documents.ResourceType.Document));
leases.AddRange(CosmosFeedResponseSerializer.FromFeedResponseStream<DocumentServiceLeaseCore>(
CosmosContainerExtensions.DefaultJsonSerializer,
responseMessage.Content));
}
}

Expand Down
30 changes: 8 additions & 22 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/ReadFeedResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,21 @@
namespace Microsoft.Azure.Cosmos
{
using System.Collections.Generic;
using System.IO;
using System.Net;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Serializer;

internal class ReadFeedResponse<T> : FeedResponse<T>
{
protected ReadFeedResponse(
HttpStatusCode httpStatusCode,
CosmosArray cosmosArray,
CosmosSerializerCore serializerCore,
IReadOnlyCollection<T> resources,
Headers responseMessageHeaders,
CosmosDiagnostics diagnostics)
{
this.Count = cosmosArray != null ? cosmosArray.Count : 0;
this.Count = resources?.Count ?? 0;
this.Headers = responseMessageHeaders;
this.StatusCode = httpStatusCode;
this.Diagnostics = diagnostics;
this.Resource = CosmosElementSerializer.GetResources<T>(
cosmosArray: cosmosArray,
serializerCore: serializerCore);
this.Resource = resources;
}

public override int Count { get; }
Expand All @@ -47,8 +40,7 @@ public override IEnumerator<T> GetEnumerator()

internal static ReadFeedResponse<TInput> CreateResponse<TInput>(
ResponseMessage responseMessage,
CosmosSerializerCore serializerCore,
Documents.ResourceType resourceType)
CosmosSerializerCore serializerCore)
{
using (responseMessage)
{
Expand All @@ -58,19 +50,13 @@ internal static ReadFeedResponse<TInput> CreateResponse<TInput>(
responseMessage.EnsureSuccessStatusCode();
}

CosmosArray cosmosArray = null;
if (responseMessage.Content != null)
{
cosmosArray = CosmosElementSerializer.ToCosmosElements(
responseMessage.Content,
resourceType,
null);
}
IReadOnlyCollection<TInput> resources = CosmosFeedResponseSerializer.FromFeedResponseStream<TInput>(
serializerCore,
responseMessage.Content);

ReadFeedResponse<TInput> readFeedResponse = new ReadFeedResponse<TInput>(
httpStatusCode: responseMessage.StatusCode,
cosmosArray: cosmosArray,
serializerCore: serializerCore,
resources: resources,
responseMessageHeaders: responseMessage.Headers,
diagnostics: responseMessage.Diagnostics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,7 @@ public override FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage)
{
return this.CreateChangeFeedResponseHelper<T>(
responseMessage,
Documents.ResourceType.Document);
}

public override FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage,
Documents.ResourceType resourceType)
{
return this.CreateChangeFeedResponseHelper<T>(
responseMessage,
resourceType);
responseMessage);
}

public override FeedResponse<T> CreateQueryFeedUserTypeResponse<T>(
Expand Down Expand Up @@ -75,18 +65,15 @@ private FeedResponse<T> CreateQueryFeedResponseHelper<T>(

return ReadFeedResponse<T>.CreateResponse<T>(
cosmosResponseMessage,
this.serializerCore,
resourceType);
this.serializerCore);
}

private FeedResponse<T> CreateChangeFeedResponseHelper<T>(
ResponseMessage cosmosResponseMessage,
Documents.ResourceType resourceType)
ResponseMessage cosmosResponseMessage)
{
return ReadFeedResponse<T>.CreateResponse<T>(
cosmosResponseMessage,
this.serializerCore,
resourceType);
this.serializerCore);
}

public override ItemResponse<T> CreateItemResponse<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ internal abstract class CosmosResponseFactoryInternal : CosmosResponseFactory
public abstract FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage);

public abstract FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage,
Documents.ResourceType resourceType);

public abstract FeedResponse<T> CreateQueryFeedUserTypeResponse<T>(
ResponseMessage responseMessage);

Expand Down
80 changes: 33 additions & 47 deletions Microsoft.Azure.Cosmos/src/Serializer/CosmosElementSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,6 @@ namespace Microsoft.Azure.Cosmos.Serializer
#endif
static class CosmosElementSerializer
{
/// <summary>
/// Converts a list of CosmosElements into a memory stream.
/// </summary>
/// <param name="stream">The stream response from Azure Cosmos</param>
/// <param name="resourceType">The resource type</param>
/// <param name="cosmosSerializationOptions">The custom serialization options. This allows custom serialization types like BSON, JSON, or other formats</param>
/// <returns>Returns a memory stream of cosmos elements. By default the memory stream will contain JSON.</returns>
internal static CosmosArray ToCosmosElements(
j82w marked this conversation as resolved.
Show resolved Hide resolved
Stream stream,
ResourceType resourceType,
CosmosSerializationFormatOptions cosmosSerializationOptions = null)
{
MemoryStream memoryStream = stream as MemoryStream;
if (memoryStream == null)
{
memoryStream = new MemoryStream();
stream.CopyTo(memoryStream);
}

return CosmosElementSerializer.ToCosmosElements(
memoryStream,
resourceType,
cosmosSerializationOptions);
}
/// <summary>
/// Converts a list of CosmosElements into a memory stream.
/// </summary>
Expand Down Expand Up @@ -236,16 +212,10 @@ internal static MemoryStream ToStream(

jsonWriter.WriteObjectEnd();

ReadOnlyMemory<byte> result = jsonWriter.GetResult();
if (!MemoryMarshal.TryGetArray(result, out ArraySegment<byte> resultAsArray))
{
resultAsArray = new ArraySegment<byte>(result.ToArray());
}

return new MemoryStream(resultAsArray.Array, resultAsArray.Offset, resultAsArray.Count, writable: false, publiclyVisible: true);
return GetMemoryStreamFromJsonWriter(jsonWriter);
}

internal static IEnumerable<T> GetResources<T>(
internal static IReadOnlyList<T> GetResources<T>(
IReadOnlyList<CosmosElement> cosmosArray,
CosmosSerializerCore serializerCore)
{
Expand All @@ -256,36 +226,35 @@ internal static IEnumerable<T> GetResources<T>(

if (typeof(CosmosElement).IsAssignableFrom(typeof(T)))
{
return cosmosArray.Cast<T>();
return cosmosArray.Cast<T>().ToList();
}

return CosmosElementSerializer.GetResourcesHelper<T>(
cosmosArray,
serializerCore);
}

private static IEnumerable<T> GetResourcesHelper<T>(
IReadOnlyList<CosmosElement> cosmosArray,
CosmosSerializerCore serializerCore)
internal static T[] GetResourcesHelper<T>(
IReadOnlyList<CosmosElement> cosmosElements,
CosmosSerializerCore serializerCore,
CosmosSerializationFormatOptions cosmosSerializationOptions = null)
{
List<T> result = new List<T>();
foreach (CosmosElement element in cosmosArray)
using (MemoryStream memoryStream = ElementsToMemoryStream(
cosmosElements,
cosmosSerializationOptions))
{
MemoryStream memory = CosmosElementSerializer.ElementToMemoryStream(element, null);
result.Add(serializerCore.FromStream<T>(memory));
return serializerCore.FromFeedStream<T>(memoryStream);
}

return result;
}

/// <summary>
/// Converts a list of CosmosElements into a memory stream.
/// </summary>
/// <param name="cosmosElement">The cosmos elements</param>
/// <param name="cosmosElements">The cosmos elements</param>
/// <param name="cosmosSerializationOptions">The custom serialization options. This allows custom serialization types like BSON, JSON, or other formats</param>
/// <returns>Returns a memory stream of cosmos elements. By default the memory stream will contain JSON.</returns>
private static MemoryStream ElementToMemoryStream(
CosmosElement cosmosElement,
internal static MemoryStream ElementsToMemoryStream(
IReadOnlyList<CosmosElement> cosmosElements,
CosmosSerializationFormatOptions cosmosSerializationOptions = null)
{
IJsonWriter jsonWriter;
Expand All @@ -298,15 +267,32 @@ private static MemoryStream ElementToMemoryStream(
jsonWriter = JsonWriter.Create(JsonSerializationFormat.Text);
}

cosmosElement.WriteTo(jsonWriter);
jsonWriter.WriteArrayStart();

foreach (CosmosElement element in cosmosElements)
{
element.WriteTo(jsonWriter);
}

jsonWriter.WriteArrayEnd();

return GetMemoryStreamFromJsonWriter(jsonWriter);
}

private static MemoryStream GetMemoryStreamFromJsonWriter(IJsonWriter jsonWriter)
{
ReadOnlyMemory<byte> result = jsonWriter.GetResult();
if (!MemoryMarshal.TryGetArray(result, out ArraySegment<byte> resultAsArray))
{
resultAsArray = new ArraySegment<byte>(result.ToArray());
}

return new MemoryStream(resultAsArray.Array, resultAsArray.Offset, resultAsArray.Count);
return new MemoryStream(
buffer: resultAsArray.Array,
index: resultAsArray.Offset,
count: resultAsArray.Count,
writable: false,
publiclyVisible: true);
}

private static string GetRootNodeName(ResourceType resourceType)
Expand Down
Loading