Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query and change feed: Add serialization optimization by using array instead of by item #1516

Merged
merged 16 commits into from
May 15, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ private static IEnumerable<JObject> GetItemsFromResponse(ResponseMessage respons
return new Collection<JObject>();
}

return CosmosContainerExtensions.DefaultJsonSerializer.FromFeedResponseStream<JObject>(
response.Content,
ResourceType.Document);
return CosmosFeedResponseSerializer.FromFeedResponseStream<JObject>(
CosmosContainerExtensions.DefaultJsonSerializer,
response.Content);
}

private async Task<long> GetRemainingWorkAsync(DocumentServiceLease existingLease, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ private Task DispatchChangesAsync(ResponseMessage response, CancellationToken ca
IEnumerable<T> asFeedResponse;
try
{
asFeedResponse = this.serializerCore.FromFeedResponseStream<T>(
response.Content,
Documents.ResourceType.Document);
asFeedResponse = CosmosFeedResponseSerializer.FromFeedResponseStream<T>(
this.serializerCore,
response.Content);
}
catch (Exception serializationException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ private async Task<IReadOnlyList<DocumentServiceLeaseCore>> ListDocumentsAsync(s
using (ResponseMessage responseMessage = await iterator.ReadNextAsync().ConfigureAwait(false))
{
responseMessage.EnsureSuccessStatusCode();
leases.AddRange(CosmosContainerExtensions.DefaultJsonSerializer.FromFeedResponseStream<DocumentServiceLeaseCore>(
responseMessage.Content,
Documents.ResourceType.Document));
leases.AddRange(CosmosFeedResponseSerializer.FromFeedResponseStream<DocumentServiceLeaseCore>(
CosmosContainerExtensions.DefaultJsonSerializer,
responseMessage.Content));
}
}

Expand Down
30 changes: 8 additions & 22 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/ReadFeedResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,21 @@
namespace Microsoft.Azure.Cosmos
{
using System.Collections.Generic;
using System.IO;
using System.Net;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Serializer;

internal class ReadFeedResponse<T> : FeedResponse<T>
{
protected ReadFeedResponse(
HttpStatusCode httpStatusCode,
CosmosArray cosmosArray,
CosmosSerializerCore serializerCore,
IReadOnlyList<T> resources,
j82w marked this conversation as resolved.
Show resolved Hide resolved
Headers responseMessageHeaders,
CosmosDiagnostics diagnostics)
{
this.Count = cosmosArray != null ? cosmosArray.Count : 0;
this.Count = resources != null ? resources.Count : 0;
j82w marked this conversation as resolved.
Show resolved Hide resolved
this.Headers = responseMessageHeaders;
this.StatusCode = httpStatusCode;
this.Diagnostics = diagnostics;
this.Resource = CosmosElementSerializer.GetResources<T>(
cosmosArray: cosmosArray,
serializerCore: serializerCore);
this.Resource = resources;
}

public override int Count { get; }
Expand All @@ -47,8 +40,7 @@ public override IEnumerator<T> GetEnumerator()

internal static ReadFeedResponse<TInput> CreateResponse<TInput>(
ResponseMessage responseMessage,
CosmosSerializerCore serializerCore,
Documents.ResourceType resourceType)
CosmosSerializerCore serializerCore)
{
using (responseMessage)
{
Expand All @@ -58,19 +50,13 @@ internal static ReadFeedResponse<TInput> CreateResponse<TInput>(
responseMessage.EnsureSuccessStatusCode();
}

CosmosArray cosmosArray = null;
if (responseMessage.Content != null)
{
cosmosArray = CosmosElementSerializer.ToCosmosElements(
responseMessage.Content,
resourceType,
null);
}
IReadOnlyList<TInput> resources = CosmosFeedResponseSerializer.FromFeedResponseStream<TInput>(
j82w marked this conversation as resolved.
Show resolved Hide resolved
serializerCore,
responseMessage.Content);

ReadFeedResponse<TInput> readFeedResponse = new ReadFeedResponse<TInput>(
httpStatusCode: responseMessage.StatusCode,
cosmosArray: cosmosArray,
serializerCore: serializerCore,
resources: resources,
responseMessageHeaders: responseMessage.Headers,
diagnostics: responseMessage.Diagnostics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,7 @@ public override FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage)
{
return this.CreateChangeFeedResponseHelper<T>(
responseMessage,
Documents.ResourceType.Document);
}

public override FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage,
Documents.ResourceType resourceType)
{
return this.CreateChangeFeedResponseHelper<T>(
responseMessage,
resourceType);
responseMessage);
}

public override FeedResponse<T> CreateQueryFeedUserTypeResponse<T>(
Expand Down Expand Up @@ -75,18 +65,15 @@ private FeedResponse<T> CreateQueryFeedResponseHelper<T>(

return ReadFeedResponse<T>.CreateResponse<T>(
cosmosResponseMessage,
this.serializerCore,
resourceType);
this.serializerCore);
}

private FeedResponse<T> CreateChangeFeedResponseHelper<T>(
ResponseMessage cosmosResponseMessage,
Documents.ResourceType resourceType)
ResponseMessage cosmosResponseMessage)
{
return ReadFeedResponse<T>.CreateResponse<T>(
cosmosResponseMessage,
this.serializerCore,
resourceType);
this.serializerCore);
}

public override ItemResponse<T> CreateItemResponse<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ internal abstract class CosmosResponseFactoryInternal : CosmosResponseFactory
public abstract FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage);

public abstract FeedResponse<T> CreateChangeFeedUserTypeResponse<T>(
ResponseMessage responseMessage,
Documents.ResourceType resourceType);

public abstract FeedResponse<T> CreateQueryFeedUserTypeResponse<T>(
ResponseMessage responseMessage);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,6 @@ namespace Microsoft.Azure.Cosmos.Serializer
#endif
static class CosmosElementSerializer
{
/// <summary>
/// Converts a list of CosmosElements into a memory stream.
/// </summary>
/// <param name="stream">The stream response from Azure Cosmos</param>
/// <param name="resourceType">The resource type</param>
/// <param name="cosmosSerializationOptions">The custom serialization options. This allows custom serialization types like BSON, JSON, or other formats</param>
/// <returns>Returns a memory stream of cosmos elements. By default the memory stream will contain JSON.</returns>
internal static CosmosArray ToCosmosElements(
j82w marked this conversation as resolved.
Show resolved Hide resolved
Stream stream,
ResourceType resourceType,
CosmosSerializationFormatOptions cosmosSerializationOptions = null)
{
MemoryStream memoryStream = stream as MemoryStream;
if (memoryStream == null)
{
memoryStream = new MemoryStream();
stream.CopyTo(memoryStream);
}

return CosmosElementSerializer.ToCosmosElements(
memoryStream,
resourceType,
cosmosSerializationOptions);
}
/// <summary>
/// Converts a list of CosmosElements into a memory stream.
/// </summary>
Expand Down Expand Up @@ -284,7 +260,7 @@ private static IEnumerable<T> GetResourcesHelper<T>(
/// <param name="cosmosElement">The cosmos elements</param>
/// <param name="cosmosSerializationOptions">The custom serialization options. This allows custom serialization types like BSON, JSON, or other formats</param>
/// <returns>Returns a memory stream of cosmos elements. By default the memory stream will contain JSON.</returns>
private static MemoryStream ElementToMemoryStream(
internal static MemoryStream ElementToMemoryStream(
CosmosElement cosmosElement,
CosmosSerializationFormatOptions cosmosSerializationOptions = null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;

#nullable enable
internal static class CosmosFeedResponseSerializer
{
private static readonly byte ArrayStart = Encoding.UTF8.GetBytes("[")[0];
j82w marked this conversation as resolved.
Show resolved Hide resolved
private static readonly byte ArrayEnd = Encoding.UTF8.GetBytes("]")[0];

/// <summary>
/// The service returns feed responses in an envelope. This removes the envelope
/// and serializes all the items into a list
/// </summary>
/// <param name="serializerCore">The cosmos serializer</param>
/// <param name="streamWithServiceEnvelope">A stream with the service envelope like: { "ContainerRid":"Test", "Documents":[{ "id":"MyItem"}], "count":1}</param>
/// <returns>A read only list of the serialized items</returns>
internal static IReadOnlyList<T> FromFeedResponseStream<T>(
j82w marked this conversation as resolved.
Show resolved Hide resolved
CosmosSerializerCore serializerCore,
Stream streamWithServiceEnvelope)
{
if (streamWithServiceEnvelope == null)
{
return new List<T>();
}

using (streamWithServiceEnvelope)
{
j82w marked this conversation as resolved.
Show resolved Hide resolved
using (MemoryStream stream = GetStreamWithoutServiceEnvelope(
streamWithServiceEnvelope))
{
return serializerCore.FromFeedStream<T>(stream);
}
}
}

/// <summary>
/// The service returns feed responses in an envelope. This removes the envelope
/// so it only returns the array of items.
/// </summary>
/// <param name="streamWithServiceEnvelope">A stream with the service envelope like: { "ContainerRid":"Test", "Documents":[{ "id":"MyItem"}], "count":1}</param>
/// <returns>A stream containing only the array of items</returns>
internal static MemoryStream GetStreamWithoutServiceEnvelope(
Stream streamWithServiceEnvelope)
{
using (streamWithServiceEnvelope)
j82w marked this conversation as resolved.
Show resolved Hide resolved
{
ReadOnlyMemory<byte> content;
MemoryStream? memoryStreamWithEnvelope = streamWithServiceEnvelope as MemoryStream;
j82w marked this conversation as resolved.
Show resolved Hide resolved
if (memoryStreamWithEnvelope == null)
{
memoryStreamWithEnvelope = new MemoryStream();
streamWithServiceEnvelope.CopyTo(memoryStreamWithEnvelope);
j82w marked this conversation as resolved.
Show resolved Hide resolved
memoryStreamWithEnvelope.Position = 0;
}

if (memoryStreamWithEnvelope.TryGetBuffer(out ArraySegment<byte> buffer))
{
content = buffer;
}
else
{
content = memoryStreamWithEnvelope.ToArray();
}

int start = GetArrayStartPosition(content);
int end = GetArrayEndPosition(content);
j82w marked this conversation as resolved.
Show resolved Hide resolved
j82w marked this conversation as resolved.
Show resolved Hide resolved

ReadOnlyMemory<byte> spanwithOnlyArray = content.Slice(start, end - start + 1);
j82w marked this conversation as resolved.
Show resolved Hide resolved
if (!MemoryMarshal.TryGetArray(spanwithOnlyArray, out ArraySegment<byte> resultAsArray))
{
resultAsArray = new ArraySegment<byte>(spanwithOnlyArray.ToArray());
j82w marked this conversation as resolved.
Show resolved Hide resolved
}

MemoryStream arrayOnlyStream = new MemoryStream(resultAsArray.Array, resultAsArray.Offset, resultAsArray.Count, writable: false, publiclyVisible: true);
return arrayOnlyStream;
}
}

private static int GetArrayStartPosition(
ReadOnlyMemory<byte> memoryByte)
{
ReadOnlySpan<byte> span = memoryByte.Span;
for (int i = 0; i < span.Length; i++)
{
if (span[i] == ArrayStart)
{
return i;
}
j82w marked this conversation as resolved.
Show resolved Hide resolved
}

throw new ArgumentException("ArrayStart not found");
j82w marked this conversation as resolved.
Show resolved Hide resolved
}

private static int GetArrayEndPosition(
ReadOnlyMemory<byte> memoryByte)
{
ReadOnlySpan<byte> span = memoryByte.Span;
j82w marked this conversation as resolved.
Show resolved Hide resolved
for (int i = span.Length - 1; i < span.Length; i--)
{
if (span[i] == ArrayEnd)
{
return i;
}
}

throw new ArgumentException("ArrayEnd not found");
}
}
}
22 changes: 7 additions & 15 deletions Microsoft.Azure.Cosmos/src/Serializer/CosmosSerializerCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Collections.Generic;
using System.IO;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Scripts;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Documents;

/// <summary>
Expand All @@ -20,6 +18,7 @@ namespace Microsoft.Azure.Cosmos
internal class CosmosSerializerCore
{
private static readonly CosmosSerializer propertiesSerializer = new CosmosJsonSerializerWrapper(new CosmosJsonDotNetSerializer());

private readonly CosmosSerializer customSerializer;
private readonly CosmosSerializer sqlQuerySpecSerializer;

Expand Down Expand Up @@ -63,6 +62,12 @@ internal T FromStream<T>(Stream stream)
return serializer.FromStream<T>(stream);
}

internal IReadOnlyList<T> FromFeedStream<T>(Stream stream)
{
CosmosSerializer serializer = this.GetSerializer<T>();
return serializer.FromStream<List<T>>(stream);
}

internal Stream ToStream<T>(T input)
{
CosmosSerializer serializer = this.GetSerializer<T>();
Expand Down Expand Up @@ -102,19 +107,6 @@ internal CosmosSerializer GetCustomOrDefaultSerializer()
return CosmosSerializerCore.propertiesSerializer;
}

internal IEnumerable<T> FromFeedResponseStream<T>(
Stream stream,
ResourceType resourceType)
{
CosmosArray cosmosArray = CosmosElementSerializer.ToCosmosElements(
stream,
resourceType);

return CosmosElementSerializer.GetResources<T>(
cosmosArray: cosmosArray,
serializerCore: this);
}

private CosmosSerializer GetSerializer<T>()
{
if (this.customSerializer == null)
Expand Down
Loading