Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding diagnostic context #1062

Merged
merged 54 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
8a8b8cf
Init diagnostics
Nov 9, 2019
7ba75e3
Added attributes to the json
Nov 9, 2019
8924b59
Additionaly diagnostics
Nov 11, 2019
2523948
Added SDK retry count as a promoted property
Nov 11, 2019
be379a0
merge to latest
Nov 22, 2019
1baabf7
Fixed unit test
Nov 25, 2019
278b810
Updated JSON format to make it easier to read.
Nov 25, 2019
ed5d8b7
Merge remote-tracking branch 'origin/master' into users/jawilley/diag…
Nov 25, 2019
5bde76f
Added user agent string to the diagnotics
Nov 26, 2019
c039a2a
Fixed unit tests
Nov 26, 2019
de40429
Updated naming, fixed tests
Nov 26, 2019
f49a955
Made diagnsotics required on TransactionalBatchResponse.
Nov 27, 2019
883c9fe
Revert changes to Batch. Moving to a separate PR
Nov 27, 2019
23112e1
Updated contract test
Nov 27, 2019
6fe7e2c
Refactored handler diagnostics to avoid async task giving bad total t…
Nov 27, 2019
98b9bf9
Updated handler test
Nov 27, 2019
7cfb29d
Merge remote-tracking branch 'origin/master' into users/jawilley/diag…
Nov 28, 2019
58eb4e4
Fixed unit test
Nov 28, 2019
3ede90e
Fixed pk test
Dec 2, 2019
84e24cd
Merge remote-tracking branch 'origin/master' into users/jawilley/diag…
Dec 2, 2019
d11d728
Refactored ICosmosDiagnosticsJsonWriter to take advantage of other di…
Dec 2, 2019
f83f2f9
Merged to latest
Dec 2, 2019
8587292
Added arguement names
Dec 3, 2019
2db493b
Switched to Stopwatch.StartNew();
Dec 3, 2019
461749c
Merge to latest
Dec 6, 2019
473bbc7
Changes based on comments
Dec 6, 2019
2b52631
Merge to latest
Dec 16, 2019
807834f
Switched to using json serializer for the ToString conversion.
Dec 17, 2019
45c3f1c
Added test for custom handler. Removed unnecessary wrapper.
Dec 17, 2019
ae7d492
Fixed unit tests
Dec 17, 2019
4f1bc63
Fixed tests
Dec 18, 2019
f0e8f7b
Updated naming to diagnosticsContext
Dec 18, 2019
6fd0b91
Updated changelog
Dec 18, 2019
b14ce16
Merge remote-tracking branch 'origin/master' into users/jawilley/diag…
Dec 18, 2019
67955dc
Added unit test
Dec 18, 2019
793679d
Fixed null list handling
Dec 19, 2019
edbfa67
Fixed merge conflicts
Dec 19, 2019
0dbd6de
Fixed batch response
Dec 19, 2019
93df478
Fix linq aggregate diagnostic logic.
Dec 20, 2019
9bfbfb3
Merge remote-tracking branch 'origin/master' into users/jawilley/diag…
Jan 6, 2020
b5abac7
Converted to use stringbuilder instead of Newtonsoft
Jan 8, 2020
f67310a
Fixed tests
Jan 8, 2020
d757fb6
Fixed null checks and tests
Jan 8, 2020
a47fde2
Empty lists are ignored in CosmosClientSideRequestStatistics.
Jan 9, 2020
51f28ae
Refactored CosmosDiagnostics into multiple classes to add better summ…
Jan 15, 2020
7b49bf1
Converting to JsonWriter
Jan 16, 2020
5bb3562
Adding diagnostic context to batch and bulk (#1151)
j82w Jan 16, 2020
6c9ddb5
Fixed batch unit test
Jan 16, 2020
847fce1
Fixed more batch tests
Jan 16, 2020
40570c8
Fixed query unit test
Jan 16, 2020
ccdaf97
Merge remote-tracking branch 'origin/master' into users/jawilley/diag…
Jan 17, 2020
9f036aa
Fixed comments
Jan 17, 2020
ff3ec62
Adding overall scope to make it explicit for over-all elapsed time.
Jan 21, 2020
9c223a0
Added additional unit test logic
Jan 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,19 @@ public virtual bool TryAdd(ItemBatchOperation operation)
foreach (ItemBatchOperation itemBatchOperation in batchResponse.Operations)
{
TransactionalBatchOperationResult response = batchResponse[itemBatchOperation.OperationIndex];
itemBatchOperation.Context.Diagnostics.AppendDiagnostics(batchResponse.Diagnostics);

// Bulk has diagnostics per a item operation.
// Batch has a single diagnostics for the execute operation
if (itemBatchOperation.DiagnosticsContext != null)
{
response.DiagnosticsContext = itemBatchOperation.DiagnosticsContext;
response.DiagnosticsContext.Append(batchResponse.DiagnosticsContext);
}
else
{
response.DiagnosticsContext = batchResponse.DiagnosticsContext;
}

if (!response.IsSuccessStatusCode)
{
Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
Expand Down
22 changes: 18 additions & 4 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,15 @@ private async Task<PartitionKeyRangeBatchExecutionResult> ExecuteAsync(
PartitionKeyRangeServerBatchRequest serverRequest,
CancellationToken cancellationToken)
{
CosmosDiagnosticsContext diagnosticsContext = new CosmosDiagnosticsContext();
CosmosDiagnosticScope limiterScope = diagnosticsContext.CreateScope("BatchAsyncContainerExecutor.Limiter");
SemaphoreSlim limiter = this.GetOrAddLimiterForPartitionKeyRange(serverRequest.PartitionKeyRangeId);
using (await limiter.UsingWaitAsync(cancellationToken))
{
limiterScope.Dispose();
using (Stream serverRequestPayload = serverRequest.TransferBodyStream())
{
Debug.Assert(serverRequestPayload != null, "Server request payload expected to be non-null");

ResponseMessage responseMessage = await this.cosmosClientContext.ProcessResourceOperationStreamAsync(
this.cosmosContainer.LinkUri,
ResourceType.Document,
Expand All @@ -236,11 +238,15 @@ private async Task<PartitionKeyRangeBatchExecutionResult> ExecuteAsync(
partitionKey: null,
streamPayload: serverRequestPayload,
requestEnricher: requestMessage => BatchAsyncContainerExecutor.AddHeadersToRequestMessage(requestMessage, serverRequest.PartitionKeyRangeId),
diagnosticsScope: diagnosticsContext,
cancellationToken: cancellationToken).ConfigureAwait(false);

TransactionalBatchResponse serverResponse = await TransactionalBatchResponse.FromResponseMessageAsync(responseMessage, serverRequest, this.cosmosClientContext.SerializerCore).ConfigureAwait(false);
using (diagnosticsContext.CreateScope("BatchAsyncContainerExecutor.ToResponse"))
{
TransactionalBatchResponse serverResponse = await TransactionalBatchResponse.FromResponseMessageAsync(responseMessage, serverRequest, this.cosmosClientContext.SerializerCore).ConfigureAwait(false);

return new PartitionKeyRangeBatchExecutionResult(serverRequest.PartitionKeyRangeId, serverRequest.Operations, serverResponse);
return new PartitionKeyRangeBatchExecutionResult(serverRequest.PartitionKeyRangeId, serverRequest.Operations, serverResponse);
}
}
}
}
Expand All @@ -252,7 +258,15 @@ private BatchAsyncStreamer GetOrAddStreamerForPartitionKeyRange(string partition
return streamer;
}

BatchAsyncStreamer newStreamer = new BatchAsyncStreamer(this.maxServerRequestOperationCount, this.maxServerRequestBodyLength, this.dispatchTimerInSeconds, this.timerPool, this.cosmosClientContext.SerializerCore, this.ExecuteAsync, this.ReBatchAsync);
BatchAsyncStreamer newStreamer = new BatchAsyncStreamer(
this.maxServerRequestOperationCount,
this.maxServerRequestBodyLength,
this.dispatchTimerInSeconds,
this.timerPool,
this.cosmosClientContext.SerializerCore,
this.ExecuteAsync,
this.ReBatchAsync);

if (!this.streamersByPartitionKeyRange.TryAdd(partitionKeyRangeId, newStreamer))
{
newStreamer.Dispose();
Expand Down
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,14 @@ public virtual Task<TransactionalBatchResponse> ExecuteAsync(
RequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken))
{
BatchExecutor executor = new BatchExecutor(this.container, this.partitionKey, this.operations, requestOptions);
CosmosDiagnosticsContext diagnosticsContext = new CosmosDiagnosticsContext();
BatchExecutor executor = new BatchExecutor(
container: this.container,
partitionKey: this.partitionKey,
operations: this.operations,
batchOptions: requestOptions,
diagnosticsContext: diagnosticsContext);

this.operations = new List<ItemBatchOperation>();
return executor.ExecuteAsync(cancellationToken);
}
Expand Down
49 changes: 32 additions & 17 deletions Microsoft.Azure.Cosmos/src/Batch/BatchExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,47 @@ internal sealed class BatchExecutor

private readonly RequestOptions batchOptions;

private readonly CosmosDiagnosticsContext diagnosticsContext;

public BatchExecutor(
ContainerCore container,
PartitionKey partitionKey,
IReadOnlyList<ItemBatchOperation> operations,
RequestOptions batchOptions)
RequestOptions batchOptions,
CosmosDiagnosticsContext diagnosticsContext)
{
this.container = container;
this.clientContext = this.container.ClientContext;
this.inputOperations = operations;
this.partitionKey = partitionKey;
this.batchOptions = batchOptions;
this.diagnosticsContext = diagnosticsContext;
}

public async Task<TransactionalBatchResponse> ExecuteAsync(CancellationToken cancellationToken)
{
BatchExecUtils.EnsureValid(this.inputOperations, this.batchOptions);

PartitionKey? serverRequestPartitionKey = this.partitionKey;
if (this.batchOptions != null && this.batchOptions.IsEffectivePartitionKeyRouting)
using (this.diagnosticsContext.CreateOverallScope("BatchExecuteAsync"))
{
serverRequestPartitionKey = null;
}
BatchExecUtils.EnsureValid(this.inputOperations, this.batchOptions);

SinglePartitionKeyServerBatchRequest serverRequest = await SinglePartitionKeyServerBatchRequest.CreateAsync(
serverRequestPartitionKey,
new ArraySegment<ItemBatchOperation>(this.inputOperations.ToArray()),
this.clientContext.SerializerCore,
cancellationToken);
PartitionKey? serverRequestPartitionKey = this.partitionKey;
if (this.batchOptions != null && this.batchOptions.IsEffectivePartitionKeyRouting)
{
serverRequestPartitionKey = null;
}

return await this.ExecuteServerRequestAsync(serverRequest, cancellationToken);
SinglePartitionKeyServerBatchRequest serverRequest;
using (this.diagnosticsContext.CreateScope("CreateBatchRequest"))
{
serverRequest = await SinglePartitionKeyServerBatchRequest.CreateAsync(
serverRequestPartitionKey,
new ArraySegment<ItemBatchOperation>(this.inputOperations.ToArray()),
this.clientContext.SerializerCore,
cancellationToken);
}

return await this.ExecuteServerRequestAsync(serverRequest, cancellationToken);
}
}

/// <summary>
Expand Down Expand Up @@ -84,12 +95,16 @@ private async Task<TransactionalBatchResponse> ExecuteServerRequestAsync(
requestMessage.Headers.Add(HttpConstants.HttpHeaders.IsBatchAtomic, bool.TrueString);
requestMessage.Headers.Add(HttpConstants.HttpHeaders.IsBatchOrdered, bool.TrueString);
},
diagnosticsScope: this.diagnosticsContext,
cancellationToken);

return await TransactionalBatchResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.clientContext.SerializerCore);
using (this.diagnosticsContext.CreateScope("TransactionalBatchResponse"))
{
return await TransactionalBatchResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.clientContext.SerializerCore);
}
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ public ItemBatchOperation(
PartitionKey partitionKey,
string id = null,
Stream resourceStream = null,
TransactionalBatchItemRequestOptions requestOptions = null)
TransactionalBatchItemRequestOptions requestOptions = null,
CosmosDiagnosticsContext diagnosticsContext = null)
{
this.OperationType = operationType;
this.OperationIndex = operationIndex;
this.PartitionKey = partitionKey;
this.Id = id;
this.ResourceStream = resourceStream;
this.RequestOptions = requestOptions;
this.DiagnosticsContext = diagnosticsContext;
}

public ItemBatchOperation(
Expand All @@ -53,6 +55,7 @@ public ItemBatchOperation(
this.Id = id;
this.ResourceStream = resourceStream;
this.RequestOptions = requestOptions;
this.DiagnosticsContext = null;
}

public PartitionKey? PartitionKey { get; internal set; }
Expand All @@ -67,6 +70,8 @@ public ItemBatchOperation(

public int OperationIndex { get; internal set; }

internal CosmosDiagnosticsContext DiagnosticsContext { get; }

internal string PartitionKeyJson { get; set; }

internal Documents.PartitionKey ParsedPartitionKey { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ internal class ItemBatchOperationContext : IDisposable

public Task<TransactionalBatchOperationResult> OperationTask => this.taskCompletionSource.Task;

public ItemBatchOperationStatistics Diagnostics { get; } = new ItemBatchOperationStatistics();

private readonly IDocumentClientRetryPolicy retryPolicy;

private TaskCompletionSource<TransactionalBatchOperationResult> taskCompletionSource = new TaskCompletionSource<TransactionalBatchOperationResult>();
Expand Down Expand Up @@ -58,8 +56,6 @@ public void Complete(
{
if (this.AssertBatcher(completer))
{
this.Diagnostics.Complete();
result.Diagnostics = this.Diagnostics;
this.taskCompletionSource.SetResult(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ internal PartitionKeyRangeBatchResponse(
/// <inheritdoc />
public override CosmosDiagnostics Diagnostics => this.serverResponse.Diagnostics;

internal override CosmosDiagnosticsContext DiagnosticsContext => this.serverResponse.DiagnosticsContext;

internal override CosmosSerializerCore SerializerCore { get; }

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public PartitionKeyRangeServerBatchRequest(
/// <param name="ensureContinuousOperationIndexes">Whether to stop adding operations to the request once there is non-continuity in the operation indexes.</param>
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>A newly created instance of <see cref="PartitionKeyRangeServerBatchRequest"/>.</returns>
/// <returns>A newly created instance of <see cref="PartitionKeyRangeServerBatchRequest"/> and the overflow ItemBatchOperation not being processed.</returns>
public static async Task<Tuple<PartitionKeyRangeServerBatchRequest, ArraySegment<ItemBatchOperation>>> CreateAsync(
string partitionKeyRangeId,
ArraySegment<ItemBatchOperation> operations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
public class TransactionalBatchOperationResult
{
internal TransactionalBatchOperationResult(HttpStatusCode statusCode)
internal TransactionalBatchOperationResult(
HttpStatusCode statusCode)
{
this.StatusCode = statusCode;
}
Expand Down Expand Up @@ -92,7 +93,7 @@ public virtual bool IsSuccessStatusCode
/// <summary>
/// Gets the cosmos diagnostic information for the current request to Azure Cosmos DB service
/// </summary>
internal virtual CosmosDiagnostics Diagnostics { get; set; }
internal virtual CosmosDiagnosticsContext DiagnosticsContext { get; set; }

internal static Result ReadOperationResult(Memory<byte> input, out TransactionalBatchOperationResult batchOperationResult)
{
Expand Down Expand Up @@ -200,13 +201,25 @@ private static Result ReadOperationResult(ref RowReader reader, out Transactiona

internal ResponseMessage ToResponseMessage()
{
ResponseMessage responseMessage = new ResponseMessage(this.StatusCode);
responseMessage.Headers.SubStatusCode = this.SubStatusCode;
responseMessage.Headers.ETag = this.ETag;
responseMessage.Headers.RetryAfter = this.RetryAfter;
responseMessage.Headers.RequestCharge = this.RequestCharge;
responseMessage.Content = this.ResourceStream;
responseMessage.Diagnostics = this.Diagnostics;
Headers headers = new Headers()
{
SubStatusCode = this.SubStatusCode,
ETag = this.ETag,
RetryAfter = this.RetryAfter,
RequestCharge = this.RequestCharge,
};

ResponseMessage responseMessage = new ResponseMessage(
statusCode: this.StatusCode,
requestMessage: null,
errorMessage: null,
error: null,
headers: headers,
diagnostics: this.DiagnosticsContext ?? new CosmosDiagnosticsContext())
{
Content = this.ResourceStream
};

return responseMessage;
}
}
Expand Down
21 changes: 13 additions & 8 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@ public class TransactionalBatchResponse : IReadOnlyList<TransactionalBatchOperat
/// <param name="subStatusCode">Provides further details about why the batch was not processed.</param>
/// <param name="errorMessage">The reason for failure.</param>
/// <param name="operations">Operations that were to be executed.</param>
/// <param name="diagnosticsContext">Diagnostics for the operation</param>
internal TransactionalBatchResponse(
HttpStatusCode statusCode,
SubStatusCodes subStatusCode,
string errorMessage,
IReadOnlyList<ItemBatchOperation> operations)
IReadOnlyList<ItemBatchOperation> operations,
CosmosDiagnosticsContext diagnosticsContext)
: this(statusCode,
subStatusCode,
errorMessage,
requestCharge: 0,
retryAfter: null,
activityId: Guid.Empty.ToString(),
cosmosDiagnostics: null,
diagnosticsContext: diagnosticsContext,
operations: operations,
serializer: null)
{
Expand All @@ -65,7 +67,7 @@ private TransactionalBatchResponse(
double requestCharge,
TimeSpan? retryAfter,
string activityId,
CosmosDiagnostics cosmosDiagnostics,
CosmosDiagnosticsContext diagnosticsContext,
IReadOnlyList<ItemBatchOperation> operations,
CosmosSerializerCore serializer)
{
Expand All @@ -77,7 +79,8 @@ private TransactionalBatchResponse(
this.RequestCharge = requestCharge;
this.RetryAfter = retryAfter;
this.ActivityId = activityId;
this.Diagnostics = cosmosDiagnostics;
this.Diagnostics = diagnosticsContext;
this.DiagnosticsContext = diagnosticsContext ?? throw new ArgumentNullException(nameof(diagnosticsContext));
}

/// <summary>
Expand Down Expand Up @@ -132,6 +135,8 @@ public virtual bool IsSuccessStatusCode
/// </summary>
public virtual CosmosDiagnostics Diagnostics { get; }

internal virtual CosmosDiagnosticsContext DiagnosticsContext { get; }

internal virtual SubStatusCodes SubStatusCode { get; }

internal virtual CosmosSerializerCore SerializerCore { get; }
Expand Down Expand Up @@ -248,7 +253,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
}
Expand All @@ -264,7 +269,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
}
Expand All @@ -282,7 +287,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
}
Expand Down Expand Up @@ -378,7 +383,7 @@ record =>
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);

Expand Down
Loading