Skip to content

Commit

Permalink
Query: Adds Correlated ActivityId wiring through query and bumps Dire…
Browse files Browse the repository at this point in the history
…ct version (#2401)

This PR enable the SDK to send a common correlated activity id to BE, which will lead to better troubleshooting.
  • Loading branch information
asketagarwal authored Jan 31, 2022
1 parent c426b8b commit 1b5bfa8
Show file tree
Hide file tree
Showing 21 changed files with 2,956 additions and 2,648 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<ClientOfficialVersion>3.23.0</ClientOfficialVersion>
<ClientPreviewVersion>3.23.0</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
<DirectVersion>3.23.1</DirectVersion>
<DirectVersion>3.24.0</DirectVersion>
<EncryptionVersion>1.0.0-previewV19</EncryptionVersion>
<CustomEncryptionVersion>1.0.0-preview02</CustomEncryptionVersion>
<HybridRowVersion>1.1.0-preview3</HybridRowVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ internal sealed class NetworkAttachedDocumentContainer : IMonadicDocumentContain
private readonly ChangeFeedRequestOptions changeFeedRequestOptions;
private readonly string resourceLink;
private readonly ResourceType resourceType;
private readonly Guid correlatedActivityId;

public NetworkAttachedDocumentContainer(
ContainerInternal container,
CosmosQueryClient cosmosQueryClient,
Guid correlatedActivityId,
QueryRequestOptions queryRequestOptions = null,
ChangeFeedRequestOptions changeFeedRequestOptions = null,
string resourceLink = null,
Expand All @@ -48,6 +50,7 @@ public NetworkAttachedDocumentContainer(
this.changeFeedRequestOptions = changeFeedRequestOptions;
this.resourceLink = resourceLink ?? this.container.LinkUri;
this.resourceType = resourceType;
this.correlatedActivityId = correlatedActivityId;
}

public Task<TryCatch> MonadicSplitAsync(
Expand Down Expand Up @@ -257,7 +260,7 @@ public async Task<TryCatch<QueryPage>> MonadicQueryAsync(
this.resourceLink,
this.resourceType,
Documents.OperationType.Query,
Guid.NewGuid(),
this.correlatedActivityId,
feedRangeState.FeedRange,
queryRequestOptions,
sqlQuerySpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public abstract Task<PartitionedQueryExecutionInfo> ExecuteQueryPlanRequestAsync
SqlQuerySpec sqlQuerySpec,
PartitionKey? partitionKey,
string supportedQueryFeatures,
Guid clientQueryCorrelationId,
ITrace trace,
CancellationToken cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public override async Task<TryCatch<QueryPage>> ExecuteItemQueryAsync(
continuationToken);
cosmosRequestMessage.Headers.Add(HttpConstants.HttpHeaders.ContentType, MediaTypes.QueryJson);
cosmosRequestMessage.Headers.Add(HttpConstants.HttpHeaders.IsQuery, bool.TrueString);
cosmosRequestMessage.Headers.Add(WFConstants.BackendHeaders.CorrelatedActivityId, clientQueryCorrelationId.ToString());
},
trace: trace,
cancellationToken: cancellationToken);
Expand All @@ -149,6 +150,7 @@ public override async Task<PartitionedQueryExecutionInfo> ExecuteQueryPlanReques
SqlQuerySpec sqlQuerySpec,
PartitionKey? partitionKey,
string supportedQueryFeatures,
Guid clientQueryCorrelationId,
ITrace trace,
CancellationToken cancellationToken)
{
Expand All @@ -167,6 +169,7 @@ public override async Task<PartitionedQueryExecutionInfo> ExecuteQueryPlanReques
requestMessage.Headers.Add(HttpConstants.HttpHeaders.IsQueryPlanRequest, bool.TrueString);
requestMessage.Headers.Add(HttpConstants.HttpHeaders.SupportedQueryFeatures, supportedQueryFeatures);
requestMessage.Headers.Add(HttpConstants.HttpHeaders.QueryVersion, new Version(major: 1, minor: 0).ToString());
requestMessage.Headers.Add(WFConstants.BackendHeaders.CorrelatedActivityId, clientQueryCorrelationId.ToString());
requestMessage.UseGatewayMode = true;
},
trace: trace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ internal override Task<PartitionedQueryExecutionInfo> ExecuteQueryPlanRequestAsy
sqlQuerySpec,
partitionKey,
supportedQueryFeatures,
this.CorrelatedActivityId,
trace,
cancellationToken);
}
Expand Down
36 changes: 31 additions & 5 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.Query
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
Expand All @@ -23,13 +24,15 @@ namespace Microsoft.Azure.Cosmos.Query

internal sealed class QueryIterator : FeedIteratorInternal
{
private static readonly string CorrelatedActivityIdKeyName = "Query Correlated ActivityId";
private static readonly IReadOnlyList<CosmosElement> EmptyPage = new List<CosmosElement>();

private readonly CosmosQueryContextCore cosmosQueryContext;
private readonly IQueryPipelineStage queryPipelineStage;
private readonly CosmosSerializationFormatOptions cosmosSerializationFormatOptions;
private readonly RequestOptions requestOptions;
private readonly CosmosClientContext clientContext;
private readonly Guid correlatedActivityId;

private bool hasMoreResults;

Expand All @@ -38,14 +41,16 @@ private QueryIterator(
IQueryPipelineStage cosmosQueryExecutionContext,
CosmosSerializationFormatOptions cosmosSerializationFormatOptions,
RequestOptions requestOptions,
CosmosClientContext clientContext)
CosmosClientContext clientContext,
Guid correlatedActivityId)
{
this.cosmosQueryContext = cosmosQueryContext ?? throw new ArgumentNullException(nameof(cosmosQueryContext));
this.queryPipelineStage = cosmosQueryExecutionContext ?? throw new ArgumentNullException(nameof(cosmosQueryExecutionContext));
this.cosmosSerializationFormatOptions = cosmosSerializationFormatOptions;
this.requestOptions = requestOptions;
this.clientContext = clientContext ?? throw new ArgumentNullException(nameof(clientContext));
this.hasMoreResults = true;
this.correlatedActivityId = correlatedActivityId;
}

public static QueryIterator Create(
Expand All @@ -67,6 +72,7 @@ public static QueryIterator Create(
queryRequestOptions = new QueryRequestOptions();
}

Guid correlatedActivityId = Guid.NewGuid();
CosmosQueryContextCore cosmosQueryContext = new CosmosQueryContextCore(
client: client,
resourceTypeEnum: Documents.ResourceType.Document,
Expand All @@ -75,11 +81,12 @@ public static QueryIterator Create(
resourceLink: resourceLink,
isContinuationExpected: isContinuationExpected,
allowNonValueAggregateQuery: allowNonValueAggregateQuery,
correlatedActivityId: Guid.NewGuid());
correlatedActivityId: correlatedActivityId);

NetworkAttachedDocumentContainer networkAttachedDocumentContainer = new NetworkAttachedDocumentContainer(
containerCore,
client,
correlatedActivityId,
queryRequestOptions);
DocumentContainer documentContainer = new DocumentContainer(networkAttachedDocumentContainer);

Expand All @@ -100,7 +107,8 @@ public static QueryIterator Create(
innerException: tryParse.Exception)),
queryRequestOptions.CosmosSerializationFormatOptions,
queryRequestOptions,
clientContext);
clientContext,
correlatedActivityId);
}

requestContinuationToken = tryParse.Result;
Expand Down Expand Up @@ -139,7 +147,8 @@ public static QueryIterator Create(
CosmosQueryExecutionContextFactory.Create(documentContainer, cosmosQueryContext, inputParameters, NoOpTrace.Singleton),
queryRequestOptions.CosmosSerializationFormatOptions,
queryRequestOptions,
clientContext);
clientContext,
correlatedActivityId);
}

public override bool HasMoreResults => this.hasMoreResults;
Expand All @@ -156,6 +165,23 @@ public override async Task<ResponseMessage> ReadNextAsync(ITrace trace, Cancella
throw new ArgumentNullException(nameof(trace));
}

// If Correlated Id already exists and is different, add a new one in comma separated list
// Scenario: A new iterator is created with same ContinuationToken and Trace
if (trace.Data.TryGetValue(QueryIterator.CorrelatedActivityIdKeyName, out object correlatedActivityIds))
{
List<string> correlatedIdList = correlatedActivityIds.ToString().Split(',').ToList();
if (!correlatedIdList.Contains(this.correlatedActivityId.ToString()))
{
correlatedIdList.Add(this.correlatedActivityId.ToString());
trace.AddOrUpdateDatum(QueryIterator.CorrelatedActivityIdKeyName,
string.Join(",", correlatedIdList));
}
}
else
{
trace.AddDatum(QueryIterator.CorrelatedActivityIdKeyName, this.correlatedActivityId.ToString());
}

TryCatch<QueryPage> tryGetQueryPage;
try
{
Expand All @@ -176,7 +202,7 @@ public override async Task<ResponseMessage> ReadNextAsync(ITrace trace, Cancella
this.cosmosQueryContext.ContainerResourceId)
{
RequestCharge = default,
ActivityId = Guid.Empty.ToString(),
ActivityId = this.correlatedActivityId.ToString(),
SubStatusCode = Documents.SubStatusCodes.Unknown
},
trace: trace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ public override TransactionalBatch CreateTransactionalBatch(PartitionKey partiti
NetworkAttachedDocumentContainer networkAttachedDocumentContainer = new NetworkAttachedDocumentContainer(
this,
this.queryClient,
Guid.NewGuid(),
changeFeedRequestOptions: changeFeedRequestOptions);
DocumentContainer documentContainer = new DocumentContainer(networkAttachedDocumentContainer);

Expand Down Expand Up @@ -798,6 +799,7 @@ public override FeedIteratorInternal GetItemQueryStreamIteratorInternal(
NetworkAttachedDocumentContainer networkAttachedDocumentContainer = new NetworkAttachedDocumentContainer(
this,
this.queryClient,
Guid.NewGuid(),
requestOptions);

DocumentContainer documentContainer = new DocumentContainer(networkAttachedDocumentContainer);
Expand Down Expand Up @@ -848,6 +850,7 @@ public override FeedIteratorInternal GetReadFeedIterator(
NetworkAttachedDocumentContainer networkAttachedDocumentContainer = new NetworkAttachedDocumentContainer(
this,
this.queryClient,
Guid.NewGuid(),
queryRequestOptions,
resourceLink: resourceLink,
resourceType: resourceType);
Expand Down Expand Up @@ -901,6 +904,7 @@ public override FeedIteratorInternal GetReadFeedIterator(
NetworkAttachedDocumentContainer networkAttachedDocumentContainer = new NetworkAttachedDocumentContainer(
this,
this.queryClient,
Guid.NewGuid(),
queryRequestOptions);
DocumentContainer documentContainer = new DocumentContainer(networkAttachedDocumentContainer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ public override FeedIterator GetChangeFeedStreamIterator(
NetworkAttachedDocumentContainer networkAttachedDocumentContainer = new NetworkAttachedDocumentContainer(
this,
this.queryClient,
Guid.NewGuid(),
changeFeedRequestOptions: changeFeedRequestOptions);
DocumentContainer documentContainer = new DocumentContainer(networkAttachedDocumentContainer);

Expand Down Expand Up @@ -361,6 +362,7 @@ public override FeedIterator<T> GetChangeFeedIterator<T>(
NetworkAttachedDocumentContainer networkAttachedDocumentContainer = new NetworkAttachedDocumentContainer(
this,
this.queryClient,
Guid.NewGuid(),
changeFeedRequestOptions: changeFeedRequestOptions);
DocumentContainer documentContainer = new DocumentContainer(networkAttachedDocumentContainer);

Expand Down
7 changes: 7 additions & 0 deletions Microsoft.Azure.Cosmos/src/Tracing/ITrace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ ITrace StartChild(
/// <param name="value">The datum itself.</param>
void AddDatum(string key, object value);

/// <summary>
/// Updates the given datum in this trace instance if exists, otherwise Add
/// </summary>
/// <param name="key">The key to associate the datum.</param>
/// <param name="value">The datum itself.</param>
void AddOrUpdateDatum(string key, object value);

/// <summary>
/// Adds a trace children that is already completed.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions Microsoft.Azure.Cosmos/src/Tracing/NoOpTrace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public void AddChild(ITrace trace)
// NoOp
}

public void AddOrUpdateDatum(string key, object value)
{
// NoOp
}

public void UpdateRegionContacted(TraceDatum traceDatum)
{
// NoOp
Expand Down
Loading

0 comments on commit 1b5bfa8

Please sign in to comment.