Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query and change feed: Add serialization optimization by using array instead of by item #1516

Merged
merged 16 commits into from
May 15, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ private static IEnumerable<JObject> GetItemsFromResponse(ResponseMessage respons
return new Collection<JObject>();
}

return CosmosContainerExtensions.DefaultJsonSerializer.FromFeedResponseStream<JObject>(
return CosmosFeedResponseSerializer.FromFeedResponseStream<JObject>(
CosmosContainerExtensions.DefaultJsonSerializer,
response.Content,
ResourceType.Document);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ private Task DispatchChangesAsync(ResponseMessage response, CancellationToken ca
IEnumerable<T> asFeedResponse;
try
{
asFeedResponse = this.serializerCore.FromFeedResponseStream<T>(
asFeedResponse = CosmosFeedResponseSerializer.FromFeedResponseStream<T>(
this.serializerCore,
response.Content,
Documents.ResourceType.Document);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ private async Task<IReadOnlyList<DocumentServiceLeaseCore>> ListDocumentsAsync(s
using (ResponseMessage responseMessage = await iterator.ReadNextAsync().ConfigureAwait(false))
{
responseMessage.EnsureSuccessStatusCode();
leases.AddRange(CosmosContainerExtensions.DefaultJsonSerializer.FromFeedResponseStream<DocumentServiceLeaseCore>(
leases.AddRange(CosmosFeedResponseSerializer.FromFeedResponseStream<DocumentServiceLeaseCore>(
CosmosContainerExtensions.DefaultJsonSerializer,
responseMessage.Content,
Documents.ResourceType.Document));
}
Expand Down
20 changes: 8 additions & 12 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/ReadFeedResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@ internal class ReadFeedResponse<T> : FeedResponse<T>
{
protected ReadFeedResponse(
HttpStatusCode httpStatusCode,
CosmosArray cosmosArray,
CosmosSerializerCore serializerCore,
IReadOnlyList<T> resources,
j82w marked this conversation as resolved.
Show resolved Hide resolved
Headers responseMessageHeaders,
CosmosDiagnostics diagnostics)
{
this.Count = cosmosArray != null ? cosmosArray.Count : 0;
this.Count = resources != null ? resources.Count : 0;
j82w marked this conversation as resolved.
Show resolved Hide resolved
this.Headers = responseMessageHeaders;
this.StatusCode = httpStatusCode;
this.Diagnostics = diagnostics;
this.Resource = CosmosElementSerializer.GetResources<T>(
cosmosArray: cosmosArray,
serializerCore: serializerCore);
this.Resource = resources;
}

public override int Count { get; }
Expand Down Expand Up @@ -58,19 +55,18 @@ internal static ReadFeedResponse<TInput> CreateResponse<TInput>(
responseMessage.EnsureSuccessStatusCode();
}

CosmosArray cosmosArray = null;
IReadOnlyList<TInput> resources = null;
if (responseMessage.Content != null)
{
cosmosArray = CosmosElementSerializer.ToCosmosElements(
resources = CosmosFeedResponseSerializer.FromFeedResponseStream<TInput>(
serializerCore,
responseMessage.Content,
resourceType,
null);
resourceType);
}

ReadFeedResponse<TInput> readFeedResponse = new ReadFeedResponse<TInput>(
httpStatusCode: responseMessage.StatusCode,
cosmosArray: cosmosArray,
serializerCore: serializerCore,
resources: resources,
responseMessageHeaders: responseMessage.Headers,
diagnostics: responseMessage.Diagnostics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,6 @@ namespace Microsoft.Azure.Cosmos.Serializer
#endif
static class CosmosElementSerializer
{
/// <summary>
/// Converts a list of CosmosElements into a memory stream.
/// </summary>
/// <param name="stream">The stream response from Azure Cosmos</param>
/// <param name="resourceType">The resource type</param>
/// <param name="cosmosSerializationOptions">The custom serialization options. This allows custom serialization types like BSON, JSON, or other formats</param>
/// <returns>Returns a memory stream of cosmos elements. By default the memory stream will contain JSON.</returns>
internal static CosmosArray ToCosmosElements(
j82w marked this conversation as resolved.
Show resolved Hide resolved
Stream stream,
ResourceType resourceType,
CosmosSerializationFormatOptions cosmosSerializationOptions = null)
{
MemoryStream memoryStream = stream as MemoryStream;
if (memoryStream == null)
{
memoryStream = new MemoryStream();
stream.CopyTo(memoryStream);
}

return CosmosElementSerializer.ToCosmosElements(
memoryStream,
resourceType,
cosmosSerializationOptions);
}
/// <summary>
/// Converts a list of CosmosElements into a memory stream.
/// </summary>
Expand Down Expand Up @@ -284,7 +260,7 @@ private static IEnumerable<T> GetResourcesHelper<T>(
/// <param name="cosmosElement">The cosmos elements</param>
/// <param name="cosmosSerializationOptions">The custom serialization options. This allows custom serialization types like BSON, JSON, or other formats</param>
/// <returns>Returns a memory stream of cosmos elements. By default the memory stream will contain JSON.</returns>
private static MemoryStream ElementToMemoryStream(
internal static MemoryStream ElementToMemoryStream(
CosmosElement cosmosElement,
CosmosSerializationFormatOptions cosmosSerializationOptions = null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;

#nullable enable
internal static class CosmosFeedResponseSerializer
{
private static readonly byte ArrayStart = Encoding.UTF8.GetBytes("[")[0];
j82w marked this conversation as resolved.
Show resolved Hide resolved
private static readonly byte ArrayEnd = Encoding.UTF8.GetBytes("]")[0];

/// <summary>
/// The service returns feed responses in an envelope. This removes the envelope
/// and serializes all the items into a list
/// </summary>
/// <param name="serializerCore">The cosmos serializer</param>
/// <param name="streamWithServiceEnvelope">A stream with the service envelope like: { "ContainerRid":"Test", "Documents":[{ "id":"MyItem"}], "count":1}</param>
/// <param name="resourceType">The resource type</param>
/// <returns>A read only list of the serialized items</returns>
internal static IReadOnlyList<T> FromFeedResponseStream<T>(
j82w marked this conversation as resolved.
Show resolved Hide resolved
CosmosSerializerCore serializerCore,
Stream streamWithServiceEnvelope,
Documents.ResourceType resourceType)
{
if (streamWithServiceEnvelope == null)
{
return new List<T>();
}

using (MemoryStream stream = GetStreamWithoutServiceEnvelope(
streamWithServiceEnvelope,
resourceType))
{
j82w marked this conversation as resolved.
Show resolved Hide resolved
return serializerCore.FromStream<List<T>>(stream);
}
}

/// <summary>
/// The service returns feed responses in an envelope. This removes the envelope
/// so it only returns the array of items.
/// </summary>
/// <param name="streamWithServiceEnvelope">A stream with the service envelope like: { "ContainerRid":"Test", "Documents":[{ "id":"MyItem"}], "count":1}</param>
/// <param name="resourceType">The resource type</param>
/// <returns>A stream containing only the array of items</returns>
internal static MemoryStream GetStreamWithoutServiceEnvelope(
Stream streamWithServiceEnvelope,
Documents.ResourceType resourceType)
{
ReadOnlyMemory<byte> content;
MemoryStream? memoryStreamWithEnvelope = streamWithServiceEnvelope as MemoryStream;
j82w marked this conversation as resolved.
Show resolved Hide resolved
if (memoryStreamWithEnvelope == null)
{
memoryStreamWithEnvelope = new MemoryStream();
j82w marked this conversation as resolved.
Show resolved Hide resolved
streamWithServiceEnvelope.CopyTo(memoryStreamWithEnvelope);
memoryStreamWithEnvelope.Position = 0;
}

if (memoryStreamWithEnvelope.TryGetBuffer(out ArraySegment<byte> buffer))
{
content = buffer;
}
else
{
content = memoryStreamWithEnvelope.ToArray();
}

int start = GetArrayStartPosition(content, resourceType);
int end = GetArrayEndPosition(content, resourceType);

ReadOnlyMemory<byte> spanwithOnlyArray = content.Slice(start, end - start + 1);
if (!MemoryMarshal.TryGetArray(spanwithOnlyArray, out ArraySegment<byte> resultAsArray))
{
resultAsArray = new ArraySegment<byte>(spanwithOnlyArray.ToArray());
}

MemoryStream arrayOnlyStream = new MemoryStream(resultAsArray.Array, resultAsArray.Offset, resultAsArray.Count);
j82w marked this conversation as resolved.
Show resolved Hide resolved
return arrayOnlyStream;
}

private static int GetArrayStartPosition(
ReadOnlyMemory<byte> memoryByte,
Documents.ResourceType resourceType)
{
ReadOnlySpan<byte> span = memoryByte.Span;

// This is an optimization for documents to check the
// default envelope position for the array start
int defaultDocumentArrayStartPosition = 35;
j82w marked this conversation as resolved.
Show resolved Hide resolved
if (resourceType == Documents.ResourceType.Document &&
span[defaultDocumentArrayStartPosition] == ArrayStart)
{
return defaultDocumentArrayStartPosition;
}

for (int i = 0; i < span.Length; i++)
{
if (span[i] == ArrayStart)
{
return i;
}
j82w marked this conversation as resolved.
Show resolved Hide resolved
}

throw new ArgumentException("ArrayStart not found");
j82w marked this conversation as resolved.
Show resolved Hide resolved
}

private static int GetArrayEndPosition(
ReadOnlyMemory<byte> memoryByte,
Documents.ResourceType resourceType)
{
ReadOnlySpan<byte> span = memoryByte.Span;
j82w marked this conversation as resolved.
Show resolved Hide resolved

// This is an optimization for documents to check the
// default envelope position for the array start
int defaultDocumentArrayEndPosition = span.Length - 13;
if (resourceType == Documents.ResourceType.Document &&
span[defaultDocumentArrayEndPosition] == ArrayStart)
{
return defaultDocumentArrayEndPosition;
}

for (int i = span.Length - 1; i < span.Length; i--)
{
if (span[i] == ArrayEnd)
{
return i;
}
}

throw new ArgumentException("ArrayEnd not found");
}
}
}
17 changes: 1 addition & 16 deletions Microsoft.Azure.Cosmos/src/Serializer/CosmosSerializerCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Scripts;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Documents;

/// <summary>
Expand All @@ -20,6 +17,7 @@ namespace Microsoft.Azure.Cosmos
internal class CosmosSerializerCore
{
private static readonly CosmosSerializer propertiesSerializer = new CosmosJsonSerializerWrapper(new CosmosJsonDotNetSerializer());

private readonly CosmosSerializer customSerializer;
private readonly CosmosSerializer sqlQuerySpecSerializer;

Expand Down Expand Up @@ -102,19 +100,6 @@ internal CosmosSerializer GetCustomOrDefaultSerializer()
return CosmosSerializerCore.propertiesSerializer;
}

internal IEnumerable<T> FromFeedResponseStream<T>(
Stream stream,
ResourceType resourceType)
{
CosmosArray cosmosArray = CosmosElementSerializer.ToCosmosElements(
stream,
resourceType);

return CosmosElementSerializer.GetResources<T>(
cosmosArray: cosmosArray,
serializerCore: this);
}

private CosmosSerializer GetSerializer<T>()
{
if (this.customSerializer == null)
Expand Down
Loading