Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix customer serializer being used for internal types #1105

Merged
merged 15 commits into from
Dec 19, 2019
14 changes: 7 additions & 7 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Microsoft.Azure.Cosmos
/// <seealso cref="ItemBatchOperation"/>
internal class BatchAsyncBatcher
{
private readonly CosmosSerializer cosmosSerializer;
private readonly CosmosSerializerCore serializerCore;
private readonly List<ItemBatchOperation> batchOperations;
private readonly BatchAsyncBatcherExecuteDelegate executor;
private readonly BatchAsyncBatcherRetryDelegate retrier;
Expand All @@ -39,7 +39,7 @@ internal class BatchAsyncBatcher
public BatchAsyncBatcher(
int maxBatchOperationCount,
int maxBatchByteSize,
CosmosSerializer cosmosSerializer,
CosmosSerializerCore serializerCore,
BatchAsyncBatcherExecuteDelegate executor,
BatchAsyncBatcherRetryDelegate retrier)
{
Expand All @@ -63,17 +63,17 @@ public BatchAsyncBatcher(
throw new ArgumentNullException(nameof(retrier));
}

if (cosmosSerializer == null)
if (serializerCore == null)
{
throw new ArgumentNullException(nameof(cosmosSerializer));
throw new ArgumentNullException(nameof(serializerCore));
}

this.batchOperations = new List<ItemBatchOperation>(maxBatchOperationCount);
this.executor = executor;
this.retrier = retrier;
this.maxBatchByteSize = maxBatchByteSize;
this.maxBatchOperationCount = maxBatchOperationCount;
this.cosmosSerializer = cosmosSerializer;
this.serializerCore = serializerCore;
}

public virtual bool TryAdd(ItemBatchOperation operation)
Expand Down Expand Up @@ -152,7 +152,7 @@ public virtual bool TryAdd(ItemBatchOperation operation)
try
{
PartitionKeyRangeBatchExecutionResult result = await this.executor(serverRequest, cancellationToken);
using (PartitionKeyRangeBatchResponse batchResponse = new PartitionKeyRangeBatchResponse(serverRequest.Operations.Count, result.ServerResponse, this.cosmosSerializer))
using (PartitionKeyRangeBatchResponse batchResponse = new PartitionKeyRangeBatchResponse(serverRequest.Operations.Count, result.ServerResponse, this.serializerCore))
{
foreach (ItemBatchOperation itemBatchOperation in batchResponse.Operations)
{
Expand Down Expand Up @@ -207,7 +207,7 @@ internal virtual async Task<Tuple<PartitionKeyRangeServerBatchRequest, ArraySegm
this.maxBatchByteSize,
this.maxBatchOperationCount,
ensureContinuousOperationIndexes: false,
serializer: this.cosmosSerializer,
serializerCore: this.serializerCore,
cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ internal virtual async Task ValidateOperationAsync(
Debug.Assert(BatchAsyncContainerExecutor.ValidateOperationEPK(operation, itemRequestOptions));
}

await operation.MaterializeResourceAsync(this.cosmosClientContext.CosmosSerializer, cancellationToken);
await operation.MaterializeResourceAsync(this.cosmosClientContext.SerializerCore, cancellationToken);
}

private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions)
Expand Down Expand Up @@ -236,7 +236,7 @@ private async Task<PartitionKeyRangeBatchExecutionResult> ExecuteAsync(
requestEnricher: requestMessage => BatchAsyncContainerExecutor.AddHeadersToRequestMessage(requestMessage, serverRequest.PartitionKeyRangeId),
cancellationToken: cancellationToken).ConfigureAwait(false);

TransactionalBatchResponse serverResponse = await TransactionalBatchResponse.FromResponseMessageAsync(responseMessage, serverRequest, this.cosmosClientContext.CosmosSerializer).ConfigureAwait(false);
TransactionalBatchResponse serverResponse = await TransactionalBatchResponse.FromResponseMessageAsync(responseMessage, serverRequest, this.cosmosClientContext.SerializerCore).ConfigureAwait(false);

return new PartitionKeyRangeBatchExecutionResult(serverRequest.PartitionKeyRangeId, serverRequest.Operations, serverResponse);
}
Expand All @@ -250,7 +250,7 @@ private BatchAsyncStreamer GetOrAddStreamerForPartitionKeyRange(string partition
return streamer;
}

BatchAsyncStreamer newStreamer = new BatchAsyncStreamer(this.maxServerRequestOperationCount, this.maxServerRequestBodyLength, this.dispatchTimerInSeconds, this.timerPool, this.cosmosClientContext.CosmosSerializer, this.ExecuteAsync, this.ReBatchAsync);
BatchAsyncStreamer newStreamer = new BatchAsyncStreamer(this.maxServerRequestOperationCount, this.maxServerRequestBodyLength, this.dispatchTimerInSeconds, this.timerPool, this.cosmosClientContext.SerializerCore, this.ExecuteAsync, this.ReBatchAsync);
if (!this.streamersByPartitionKeyRange.TryAdd(partitionKeyRangeId, newStreamer))
{
newStreamer.Dispose();
Expand Down
12 changes: 6 additions & 6 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncStreamer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal class BatchAsyncStreamer : IDisposable
private readonly BatchAsyncBatcherExecuteDelegate executor;
private readonly BatchAsyncBatcherRetryDelegate retrier;
private readonly int dispatchTimerInSeconds;
private readonly CosmosSerializer cosmosSerializer;
private readonly CosmosSerializerCore serializerCore;
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private volatile BatchAsyncBatcher currentBatcher;
private TimerPool timerPool;
Expand All @@ -38,7 +38,7 @@ public BatchAsyncStreamer(
int maxBatchByteSize,
int dispatchTimerInSeconds,
TimerPool timerPool,
CosmosSerializer cosmosSerializer,
CosmosSerializerCore serializerCore,
BatchAsyncBatcherExecuteDelegate executor,
BatchAsyncBatcherRetryDelegate retrier)
{
Expand Down Expand Up @@ -67,9 +67,9 @@ public BatchAsyncStreamer(
throw new ArgumentNullException(nameof(retrier));
}

if (cosmosSerializer == null)
if (serializerCore == null)
{
throw new ArgumentNullException(nameof(cosmosSerializer));
throw new ArgumentNullException(nameof(serializerCore));
}

this.maxBatchOperationCount = maxBatchOperationCount;
Expand All @@ -78,7 +78,7 @@ public BatchAsyncStreamer(
this.retrier = retrier;
this.dispatchTimerInSeconds = dispatchTimerInSeconds;
this.timerPool = timerPool;
this.cosmosSerializer = cosmosSerializer;
this.serializerCore = serializerCore;
this.currentBatcher = this.CreateBatchAsyncBatcher();

this.ResetTimer();
Expand Down Expand Up @@ -157,7 +157,7 @@ private BatchAsyncBatcher GetBatchToDispatchAndCreate()

private BatchAsyncBatcher CreateBatchAsyncBatcher()
{
return new BatchAsyncBatcher(this.maxBatchOperationCount, this.maxBatchByteSize, this.cosmosSerializer, this.executor, this.retrier);
return new BatchAsyncBatcher(this.maxBatchOperationCount, this.maxBatchByteSize, this.serializerCore, this.executor, this.retrier);
}
}
}
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Batch/BatchExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public async Task<TransactionalBatchResponse> ExecuteAsync(CancellationToken can
SinglePartitionKeyServerBatchRequest serverRequest = await SinglePartitionKeyServerBatchRequest.CreateAsync(
serverRequestPartitionKey,
new ArraySegment<ItemBatchOperation>(this.inputOperations.ToArray()),
this.clientContext.CosmosSerializer,
this.clientContext.SerializerCore,
cancellationToken);

return await this.ExecuteServerRequestAsync(serverRequest, cancellationToken);
Expand Down Expand Up @@ -89,7 +89,7 @@ private async Task<TransactionalBatchResponse> ExecuteServerRequestAsync(
return await TransactionalBatchResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.clientContext.CosmosSerializer);
this.clientContext.SerializerCore);
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ internal int GetApproximateSerializedLength()
/// <summary>
/// Materializes the operation's resource into a Memory{byte} wrapping a byte array.
/// </summary>
/// <param name="serializer">Serializer to serialize user provided objects to JSON.</param>
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> for cancellation.</param>
internal virtual async Task MaterializeResourceAsync(CosmosSerializer serializer, CancellationToken cancellationToken)
internal virtual async Task MaterializeResourceAsync(CosmosSerializerCore serializerCore, CancellationToken cancellationToken)
{
if (this.body.IsEmpty && this.ResourceStream != null)
{
Expand Down Expand Up @@ -361,14 +361,14 @@ public ItemBatchOperation(
/// <summary>
/// Materializes the operation's resource into a Memory{byte} wrapping a byte array.
/// </summary>
/// <param name="serializer">Serializer to serialize user provided objects to JSON.</param>
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> for cancellation.</param>
internal override Task MaterializeResourceAsync(CosmosSerializer serializer, CancellationToken cancellationToken)
internal override Task MaterializeResourceAsync(CosmosSerializerCore serializerCore, CancellationToken cancellationToken)
{
if (this.body.IsEmpty && this.Resource != null)
{
this.ResourceStream = serializer.ToStream(this.Resource);
return base.MaterializeResourceAsync(serializer, cancellationToken);
this.ResourceStream = serializerCore.ToStream(this.Resource);
return base.MaterializeResourceAsync(serializerCore, cancellationToken);
}

return Task.FromResult(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ internal class PartitionKeyRangeBatchResponse : TransactionalBatchResponse
/// </summary>
/// <param name="originalOperationsCount">Original operations that generated the server responses.</param>
/// <param name="serverResponse">Response from the server.</param>
/// <param name="serializer">Serializer to deserialize response resource body streams.</param>
/// <param name="serializerCore">Serializer to deserialize response resource body streams.</param>
internal PartitionKeyRangeBatchResponse(
int originalOperationsCount,
TransactionalBatchResponse serverResponse,
CosmosSerializer serializer)
CosmosSerializerCore serializerCore)
{
this.StatusCode = serverResponse.StatusCode;

Expand Down Expand Up @@ -59,7 +59,7 @@ internal PartitionKeyRangeBatchResponse(

this.ErrorMessage = errorMessageBuilder.Length > 2 ? errorMessageBuilder.ToString(0, errorMessageBuilder.Length - 2) : null;
this.Operations = itemBatchOperations;
this.Serializer = serializer;
this.SerializerCore = serializerCore;
}

/// <summary>
Expand All @@ -70,7 +70,7 @@ internal PartitionKeyRangeBatchResponse(
/// <inheritdoc />
public override CosmosDiagnostics Diagnostics => this.serverResponse.Diagnostics;

internal override CosmosSerializer Serializer { get; }
internal override CosmosSerializerCore SerializerCore { get; }

/// <summary>
/// Gets the number of operation results.
Expand Down Expand Up @@ -98,7 +98,7 @@ public override TransactionalBatchOperationResult<T> GetOperationResultAtIndex<T
T resource = default(T);
if (result.ResourceStream != null)
{
resource = this.Serializer.FromStream<T>(result.ResourceStream);
resource = this.SerializerCore.FromStream<T>(result.ResourceStream);
}

return new TransactionalBatchOperationResult<T>(result, resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ internal sealed class PartitionKeyRangeServerBatchRequest : ServerBatchRequest
/// <param name="partitionKeyRangeId">The partition key range id associated with all requests.</param>
/// <param name="maxBodyLength">Maximum length allowed for the request body.</param>
/// <param name="maxOperationCount">Maximum number of operations allowed in the request.</param>
/// <param name="serializer">Serializer to serialize user provided objects to JSON.</param>
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
public PartitionKeyRangeServerBatchRequest(
string partitionKeyRangeId,
int maxBodyLength,
int maxOperationCount,
CosmosSerializer serializer)
: base(maxBodyLength, maxOperationCount, serializer)
CosmosSerializerCore serializerCore)
: base(maxBodyLength, maxOperationCount, serializerCore)
{
this.PartitionKeyRangeId = partitionKeyRangeId;
}
Expand All @@ -44,7 +44,7 @@ public PartitionKeyRangeServerBatchRequest(
/// <param name="maxBodyLength">Desired maximum length of the request body.</param>
/// <param name="maxOperationCount">Maximum number of operations allowed in the request.</param>
/// <param name="ensureContinuousOperationIndexes">Whether to stop adding operations to the request once there is non-continuity in the operation indexes.</param>
/// <param name="serializer">Serializer to serialize user provided objects to JSON.</param>
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>A newly created instance of <see cref="PartitionKeyRangeServerBatchRequest"/>.</returns>
public static async Task<Tuple<PartitionKeyRangeServerBatchRequest, ArraySegment<ItemBatchOperation>>> CreateAsync(
Expand All @@ -53,10 +53,10 @@ public static async Task<Tuple<PartitionKeyRangeServerBatchRequest, ArraySegment
int maxBodyLength,
int maxOperationCount,
bool ensureContinuousOperationIndexes,
CosmosSerializer serializer,
CosmosSerializerCore serializerCore,
CancellationToken cancellationToken)
{
PartitionKeyRangeServerBatchRequest request = new PartitionKeyRangeServerBatchRequest(partitionKeyRangeId, maxBodyLength, maxOperationCount, serializer);
PartitionKeyRangeServerBatchRequest request = new PartitionKeyRangeServerBatchRequest(partitionKeyRangeId, maxBodyLength, maxOperationCount, serializerCore);
ArraySegment<ItemBatchOperation> pendingOperations = await request.CreateBodyStreamAsync(operations, cancellationToken, ensureContinuousOperationIndexes);
return new Tuple<PartitionKeyRangeServerBatchRequest, ArraySegment<ItemBatchOperation>>(request, pendingOperations);
}
Expand Down
10 changes: 5 additions & 5 deletions Microsoft.Azure.Cosmos/src/Batch/ServerBatchRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal abstract class ServerBatchRequest

private readonly int maxOperationCount;

private readonly CosmosSerializer serializer;
private readonly CosmosSerializerCore serializerCore;

private ArraySegment<ItemBatchOperation> operations;

Expand All @@ -40,12 +40,12 @@ internal abstract class ServerBatchRequest
/// </summary>
/// <param name="maxBodyLength">Maximum length allowed for the request body.</param>
/// <param name="maxOperationCount">Maximum number of operations allowed in the request.</param>
/// <param name="serializer">Serializer to serialize user provided objects to JSON.</param>
protected ServerBatchRequest(int maxBodyLength, int maxOperationCount, CosmosSerializer serializer)
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
protected ServerBatchRequest(int maxBodyLength, int maxOperationCount, CosmosSerializerCore serializerCore)
{
this.maxBodyLength = maxBodyLength;
this.maxOperationCount = maxOperationCount;
this.serializer = serializer;
this.serializerCore = serializerCore;
}

public IReadOnlyList<ItemBatchOperation> Operations => this.operations;
Expand Down Expand Up @@ -87,7 +87,7 @@ protected async Task<ArraySegment<ItemBatchOperation>> CreateBodyStreamAsync(
break;
}

await operation.MaterializeResourceAsync(this.serializer, cancellationToken);
await operation.MaterializeResourceAsync(this.serializerCore, cancellationToken);
materializedCount++;

previousOperationIndex = operation.OperationIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ internal sealed class SinglePartitionKeyServerBatchRequest : ServerBatchRequest
/// Single partition key server request.
/// </summary>
/// <param name="partitionKey">Partition key that applies to all operations in this request.</param>
/// <param name="serializer">Serializer to serialize user provided objects to JSON.</param>
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
private SinglePartitionKeyServerBatchRequest(
PartitionKey? partitionKey,
CosmosSerializer serializer)
: base(maxBodyLength: int.MaxValue, maxOperationCount: int.MaxValue, serializer: serializer)
CosmosSerializerCore serializerCore)
: base(maxBodyLength: int.MaxValue,
maxOperationCount: int.MaxValue,
serializerCore: serializerCore)
{
this.PartitionKey = partitionKey;
}
Expand All @@ -35,16 +37,16 @@ private SinglePartitionKeyServerBatchRequest(
/// </summary>
/// <param name="partitionKey">Partition key of the request.</param>
/// <param name="operations">Operations to be added into this batch request.</param>
/// <param name="serializer">Serializer to serialize user provided objects to JSON.</param>
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>A newly created instance of <see cref="SinglePartitionKeyServerBatchRequest"/>.</returns>
public static async Task<SinglePartitionKeyServerBatchRequest> CreateAsync(
PartitionKey? partitionKey,
ArraySegment<ItemBatchOperation> operations,
CosmosSerializer serializer,
CosmosSerializerCore serializerCore,
CancellationToken cancellationToken)
{
SinglePartitionKeyServerBatchRequest request = new SinglePartitionKeyServerBatchRequest(partitionKey, serializer);
SinglePartitionKeyServerBatchRequest request = new SinglePartitionKeyServerBatchRequest(partitionKey, serializerCore);
await request.CreateBodyStreamAsync(operations, cancellationToken);
return request;
}
Expand Down
Loading