Skip to content

Commit

Permalink
Adding diagnostic context (#1062)
Browse files Browse the repository at this point in the history
* Init diagnostics

* Added attributes to the json

* Additionaly diagnostics

* Added SDK retry count as a promoted property

* Fixed unit test

* Updated JSON format to make it easier to read.

* Added user agent string to the diagnotics

* Fixed unit tests

* Updated naming, fixed tests

* Made diagnsotics required on TransactionalBatchResponse.

* Revert changes to Batch. Moving to a separate PR

* Updated contract test

* Refactored handler diagnostics to avoid async task giving bad total time. Fixed tests.

* Updated handler test

* Fixed unit test

* Fixed pk test

* Refactored ICosmosDiagnosticsJsonWriter to take advantage of other diagnostics classes.

* Added arguement names

* Switched to Stopwatch.StartNew();

* Changes based on comments

* Switched to using json serializer for the ToString conversion.

* Added test for custom handler. Removed unnecessary wrapper.

* Fixed unit tests

* Fixed tests

* Updated naming to diagnosticsContext

* Updated changelog

* Added unit test

* Fixed null list handling

* Fixed batch response

* Fix linq aggregate diagnostic logic.

* Converted to use stringbuilder instead of Newtonsoft

* Fixed tests

* Fixed null checks and tests

* Empty lists are ignored in CosmosClientSideRequestStatistics.

* Refactored CosmosDiagnostics into multiple classes to add better summary support.

* Converting to JsonWriter

* Adding diagnostic context to batch and bulk (#1151)

* Wired batch and bulk with diagnostic context.

* fixed comments

* Fixed bulk test

* Fixed batch unit test

* Fixed more batch tests

* Fixed query unit test

* Fixed comments

* Adding overall scope to make it explicit for over-all elapsed time.

* Added additional unit test logic
  • Loading branch information
j82w authored Jan 21, 2020
1 parent 1aa7208 commit 36307e8
Show file tree
Hide file tree
Showing 78 changed files with 8,936 additions and 8,211 deletions.
14 changes: 13 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,19 @@ public virtual bool TryAdd(ItemBatchOperation operation)
foreach (ItemBatchOperation itemBatchOperation in batchResponse.Operations)
{
TransactionalBatchOperationResult response = batchResponse[itemBatchOperation.OperationIndex];
itemBatchOperation.Context.Diagnostics.AppendDiagnostics(batchResponse.Diagnostics);

// Bulk has diagnostics per a item operation.
// Batch has a single diagnostics for the execute operation
if (itemBatchOperation.DiagnosticsContext != null)
{
response.DiagnosticsContext = itemBatchOperation.DiagnosticsContext;
response.DiagnosticsContext.Append(batchResponse.DiagnosticsContext);
}
else
{
response.DiagnosticsContext = batchResponse.DiagnosticsContext;
}

if (!response.IsSuccessStatusCode)
{
Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
Expand Down
22 changes: 18 additions & 4 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,15 @@ private async Task<PartitionKeyRangeBatchExecutionResult> ExecuteAsync(
PartitionKeyRangeServerBatchRequest serverRequest,
CancellationToken cancellationToken)
{
CosmosDiagnosticsContext diagnosticsContext = new CosmosDiagnosticsContext();
CosmosDiagnosticScope limiterScope = diagnosticsContext.CreateScope("BatchAsyncContainerExecutor.Limiter");
SemaphoreSlim limiter = this.GetOrAddLimiterForPartitionKeyRange(serverRequest.PartitionKeyRangeId);
using (await limiter.UsingWaitAsync(cancellationToken))
{
limiterScope.Dispose();
using (Stream serverRequestPayload = serverRequest.TransferBodyStream())
{
Debug.Assert(serverRequestPayload != null, "Server request payload expected to be non-null");

ResponseMessage responseMessage = await this.cosmosClientContext.ProcessResourceOperationStreamAsync(
this.cosmosContainer.LinkUri,
ResourceType.Document,
Expand All @@ -236,11 +238,15 @@ private async Task<PartitionKeyRangeBatchExecutionResult> ExecuteAsync(
partitionKey: null,
streamPayload: serverRequestPayload,
requestEnricher: requestMessage => BatchAsyncContainerExecutor.AddHeadersToRequestMessage(requestMessage, serverRequest.PartitionKeyRangeId),
diagnosticsScope: diagnosticsContext,
cancellationToken: cancellationToken).ConfigureAwait(false);

TransactionalBatchResponse serverResponse = await TransactionalBatchResponse.FromResponseMessageAsync(responseMessage, serverRequest, this.cosmosClientContext.SerializerCore).ConfigureAwait(false);
using (diagnosticsContext.CreateScope("BatchAsyncContainerExecutor.ToResponse"))
{
TransactionalBatchResponse serverResponse = await TransactionalBatchResponse.FromResponseMessageAsync(responseMessage, serverRequest, this.cosmosClientContext.SerializerCore).ConfigureAwait(false);

return new PartitionKeyRangeBatchExecutionResult(serverRequest.PartitionKeyRangeId, serverRequest.Operations, serverResponse);
return new PartitionKeyRangeBatchExecutionResult(serverRequest.PartitionKeyRangeId, serverRequest.Operations, serverResponse);
}
}
}
}
Expand All @@ -252,7 +258,15 @@ private BatchAsyncStreamer GetOrAddStreamerForPartitionKeyRange(string partition
return streamer;
}

BatchAsyncStreamer newStreamer = new BatchAsyncStreamer(this.maxServerRequestOperationCount, this.maxServerRequestBodyLength, this.dispatchTimerInSeconds, this.timerPool, this.cosmosClientContext.SerializerCore, this.ExecuteAsync, this.ReBatchAsync);
BatchAsyncStreamer newStreamer = new BatchAsyncStreamer(
this.maxServerRequestOperationCount,
this.maxServerRequestBodyLength,
this.dispatchTimerInSeconds,
this.timerPool,
this.cosmosClientContext.SerializerCore,
this.ExecuteAsync,
this.ReBatchAsync);

if (!this.streamersByPartitionKeyRange.TryAdd(partitionKeyRangeId, newStreamer))
{
newStreamer.Dispose();
Expand Down
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,14 @@ public virtual Task<TransactionalBatchResponse> ExecuteAsync(
RequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken))
{
BatchExecutor executor = new BatchExecutor(this.container, this.partitionKey, this.operations, requestOptions);
CosmosDiagnosticsContext diagnosticsContext = new CosmosDiagnosticsContext();
BatchExecutor executor = new BatchExecutor(
container: this.container,
partitionKey: this.partitionKey,
operations: this.operations,
batchOptions: requestOptions,
diagnosticsContext: diagnosticsContext);

this.operations = new List<ItemBatchOperation>();
return executor.ExecuteAsync(cancellationToken);
}
Expand Down
49 changes: 32 additions & 17 deletions Microsoft.Azure.Cosmos/src/Batch/BatchExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,47 @@ internal sealed class BatchExecutor

private readonly RequestOptions batchOptions;

private readonly CosmosDiagnosticsContext diagnosticsContext;

public BatchExecutor(
ContainerCore container,
PartitionKey partitionKey,
IReadOnlyList<ItemBatchOperation> operations,
RequestOptions batchOptions)
RequestOptions batchOptions,
CosmosDiagnosticsContext diagnosticsContext)
{
this.container = container;
this.clientContext = this.container.ClientContext;
this.inputOperations = operations;
this.partitionKey = partitionKey;
this.batchOptions = batchOptions;
this.diagnosticsContext = diagnosticsContext;
}

public async Task<TransactionalBatchResponse> ExecuteAsync(CancellationToken cancellationToken)
{
BatchExecUtils.EnsureValid(this.inputOperations, this.batchOptions);

PartitionKey? serverRequestPartitionKey = this.partitionKey;
if (this.batchOptions != null && this.batchOptions.IsEffectivePartitionKeyRouting)
using (this.diagnosticsContext.CreateOverallScope("BatchExecuteAsync"))
{
serverRequestPartitionKey = null;
}
BatchExecUtils.EnsureValid(this.inputOperations, this.batchOptions);

SinglePartitionKeyServerBatchRequest serverRequest = await SinglePartitionKeyServerBatchRequest.CreateAsync(
serverRequestPartitionKey,
new ArraySegment<ItemBatchOperation>(this.inputOperations.ToArray()),
this.clientContext.SerializerCore,
cancellationToken);
PartitionKey? serverRequestPartitionKey = this.partitionKey;
if (this.batchOptions != null && this.batchOptions.IsEffectivePartitionKeyRouting)
{
serverRequestPartitionKey = null;
}

return await this.ExecuteServerRequestAsync(serverRequest, cancellationToken);
SinglePartitionKeyServerBatchRequest serverRequest;
using (this.diagnosticsContext.CreateScope("CreateBatchRequest"))
{
serverRequest = await SinglePartitionKeyServerBatchRequest.CreateAsync(
serverRequestPartitionKey,
new ArraySegment<ItemBatchOperation>(this.inputOperations.ToArray()),
this.clientContext.SerializerCore,
cancellationToken);
}

return await this.ExecuteServerRequestAsync(serverRequest, cancellationToken);
}
}

/// <summary>
Expand Down Expand Up @@ -84,12 +95,16 @@ private async Task<TransactionalBatchResponse> ExecuteServerRequestAsync(
requestMessage.Headers.Add(HttpConstants.HttpHeaders.IsBatchAtomic, bool.TrueString);
requestMessage.Headers.Add(HttpConstants.HttpHeaders.IsBatchOrdered, bool.TrueString);
},
diagnosticsScope: this.diagnosticsContext,
cancellationToken);

return await TransactionalBatchResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.clientContext.SerializerCore);
using (this.diagnosticsContext.CreateScope("TransactionalBatchResponse"))
{
return await TransactionalBatchResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.clientContext.SerializerCore);
}
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ public ItemBatchOperation(
PartitionKey partitionKey,
string id = null,
Stream resourceStream = null,
TransactionalBatchItemRequestOptions requestOptions = null)
TransactionalBatchItemRequestOptions requestOptions = null,
CosmosDiagnosticsContext diagnosticsContext = null)
{
this.OperationType = operationType;
this.OperationIndex = operationIndex;
this.PartitionKey = partitionKey;
this.Id = id;
this.ResourceStream = resourceStream;
this.RequestOptions = requestOptions;
this.DiagnosticsContext = diagnosticsContext;
}

public ItemBatchOperation(
Expand All @@ -53,6 +55,7 @@ public ItemBatchOperation(
this.Id = id;
this.ResourceStream = resourceStream;
this.RequestOptions = requestOptions;
this.DiagnosticsContext = null;
}

public PartitionKey? PartitionKey { get; internal set; }
Expand All @@ -67,6 +70,8 @@ public ItemBatchOperation(

public int OperationIndex { get; internal set; }

internal CosmosDiagnosticsContext DiagnosticsContext { get; }

internal string PartitionKeyJson { get; set; }

internal Documents.PartitionKey ParsedPartitionKey { get; set; }
Expand Down
4 changes: 0 additions & 4 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ internal class ItemBatchOperationContext : IDisposable

public Task<TransactionalBatchOperationResult> OperationTask => this.taskCompletionSource.Task;

public ItemBatchOperationStatistics Diagnostics { get; } = new ItemBatchOperationStatistics();

private readonly IDocumentClientRetryPolicy retryPolicy;

private TaskCompletionSource<TransactionalBatchOperationResult> taskCompletionSource = new TaskCompletionSource<TransactionalBatchOperationResult>();
Expand Down Expand Up @@ -58,8 +56,6 @@ public void Complete(
{
if (this.AssertBatcher(completer))
{
this.Diagnostics.Complete();
result.Diagnostics = this.Diagnostics;
this.taskCompletionSource.SetResult(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ internal PartitionKeyRangeBatchResponse(
/// <inheritdoc />
public override CosmosDiagnostics Diagnostics => this.serverResponse.Diagnostics;

internal override CosmosDiagnosticsContext DiagnosticsContext => this.serverResponse.DiagnosticsContext;

internal override CosmosSerializerCore SerializerCore { get; }

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public PartitionKeyRangeServerBatchRequest(
/// <param name="ensureContinuousOperationIndexes">Whether to stop adding operations to the request once there is non-continuity in the operation indexes.</param>
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>A newly created instance of <see cref="PartitionKeyRangeServerBatchRequest"/>.</returns>
/// <returns>A newly created instance of <see cref="PartitionKeyRangeServerBatchRequest"/> and the overflow ItemBatchOperation not being processed.</returns>
public static async Task<Tuple<PartitionKeyRangeServerBatchRequest, ArraySegment<ItemBatchOperation>>> CreateAsync(
string partitionKeyRangeId,
ArraySegment<ItemBatchOperation> operations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
public class TransactionalBatchOperationResult
{
internal TransactionalBatchOperationResult(HttpStatusCode statusCode)
internal TransactionalBatchOperationResult(
HttpStatusCode statusCode)
{
this.StatusCode = statusCode;
}
Expand Down Expand Up @@ -92,7 +93,7 @@ public virtual bool IsSuccessStatusCode
/// <summary>
/// Gets the cosmos diagnostic information for the current request to Azure Cosmos DB service
/// </summary>
internal virtual CosmosDiagnostics Diagnostics { get; set; }
internal virtual CosmosDiagnosticsContext DiagnosticsContext { get; set; }

internal static Result ReadOperationResult(Memory<byte> input, out TransactionalBatchOperationResult batchOperationResult)
{
Expand Down Expand Up @@ -200,13 +201,25 @@ private static Result ReadOperationResult(ref RowReader reader, out Transactiona

internal ResponseMessage ToResponseMessage()
{
ResponseMessage responseMessage = new ResponseMessage(this.StatusCode);
responseMessage.Headers.SubStatusCode = this.SubStatusCode;
responseMessage.Headers.ETag = this.ETag;
responseMessage.Headers.RetryAfter = this.RetryAfter;
responseMessage.Headers.RequestCharge = this.RequestCharge;
responseMessage.Content = this.ResourceStream;
responseMessage.Diagnostics = this.Diagnostics;
Headers headers = new Headers()
{
SubStatusCode = this.SubStatusCode,
ETag = this.ETag,
RetryAfter = this.RetryAfter,
RequestCharge = this.RequestCharge,
};

ResponseMessage responseMessage = new ResponseMessage(
statusCode: this.StatusCode,
requestMessage: null,
errorMessage: null,
error: null,
headers: headers,
diagnostics: this.DiagnosticsContext ?? new CosmosDiagnosticsContext())
{
Content = this.ResourceStream
};

return responseMessage;
}
}
Expand Down
21 changes: 13 additions & 8 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@ public class TransactionalBatchResponse : IReadOnlyList<TransactionalBatchOperat
/// <param name="subStatusCode">Provides further details about why the batch was not processed.</param>
/// <param name="errorMessage">The reason for failure.</param>
/// <param name="operations">Operations that were to be executed.</param>
/// <param name="diagnosticsContext">Diagnostics for the operation</param>
internal TransactionalBatchResponse(
HttpStatusCode statusCode,
SubStatusCodes subStatusCode,
string errorMessage,
IReadOnlyList<ItemBatchOperation> operations)
IReadOnlyList<ItemBatchOperation> operations,
CosmosDiagnosticsContext diagnosticsContext)
: this(statusCode,
subStatusCode,
errorMessage,
requestCharge: 0,
retryAfter: null,
activityId: Guid.Empty.ToString(),
cosmosDiagnostics: null,
diagnosticsContext: diagnosticsContext,
operations: operations,
serializer: null)
{
Expand All @@ -65,7 +67,7 @@ private TransactionalBatchResponse(
double requestCharge,
TimeSpan? retryAfter,
string activityId,
CosmosDiagnostics cosmosDiagnostics,
CosmosDiagnosticsContext diagnosticsContext,
IReadOnlyList<ItemBatchOperation> operations,
CosmosSerializerCore serializer)
{
Expand All @@ -77,7 +79,8 @@ private TransactionalBatchResponse(
this.RequestCharge = requestCharge;
this.RetryAfter = retryAfter;
this.ActivityId = activityId;
this.Diagnostics = cosmosDiagnostics;
this.Diagnostics = diagnosticsContext;
this.DiagnosticsContext = diagnosticsContext ?? throw new ArgumentNullException(nameof(diagnosticsContext));
}

/// <summary>
Expand Down Expand Up @@ -132,6 +135,8 @@ public virtual bool IsSuccessStatusCode
/// </summary>
public virtual CosmosDiagnostics Diagnostics { get; }

internal virtual CosmosDiagnosticsContext DiagnosticsContext { get; }

internal virtual SubStatusCodes SubStatusCode { get; }

internal virtual CosmosSerializerCore SerializerCore { get; }
Expand Down Expand Up @@ -248,7 +253,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
}
Expand All @@ -264,7 +269,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
}
Expand All @@ -282,7 +287,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
}
Expand Down Expand Up @@ -378,7 +383,7 @@ record =>
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);

Expand Down
Loading

0 comments on commit 36307e8

Please sign in to comment.