Skip to content

Commit

Permalink
[Internal] AI Integration: Fixes Container and Database name in attri…
Browse files Browse the repository at this point in the history
…butes (#3468)

* first draft

* passon container information everywhere

* typo

* fix tests

* rebase response message

* add coalesce check

* code refactor

* code refactor

* code refcator

* code refcator

* refactored response class

* named parameter

Co-authored-by: Sourabh Jain <sourabhjain@microsoft.com>
  • Loading branch information
sourabh1007 and sourabh1007 authored Sep 30, 2022
1 parent dfe1fac commit f982616
Show file tree
Hide file tree
Showing 31 changed files with 332 additions and 135 deletions.
5 changes: 4 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
this.operations = new List<ItemBatchOperation>();
return executor.ExecuteAsync(trace, cancellationToken);
},
(response) => new OpenTelemetryResponse(response));
(response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private static Result ReadOperationResult(ref RowReader reader, out Transactiona
return Result.Success;
}

internal ResponseMessage ToResponseMessage()
internal ResponseMessage ToResponseMessage(ContainerInternal cosmosContainerCore = null)
{
Headers headers = new Headers()
{
Expand All @@ -221,9 +221,17 @@ internal ResponseMessage ToResponseMessage()
ActivityId = this.ActivityId,
};

// Need this information in Open telemetry hence adding this information in the request
RequestMessage requestMessage = new ()
{
ContainerId = cosmosContainerCore?.Id,
DatabaseId = cosmosContainerCore?.Database?.Id,
Trace = null
};

ResponseMessage responseMessage = new ResponseMessage(
statusCode: this.StatusCode,
requestMessage: null,
requestMessage: requestMessage,
headers: headers,
cosmosException: null,
trace: this.Trace ?? NoOpTrace.Singleton)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public ChangeFeedIteratorCore(
ChangeFeedRequestOptions changeFeedRequestOptions,
ChangeFeedStartFrom changeFeedStartFrom,
CosmosClientContext clientContext,
ContainerInternal container,
ChangeFeedQuerySpec changeFeedQuerySpec = null)
{
if (changeFeedStartFrom == null)
Expand All @@ -43,6 +44,7 @@ public ChangeFeedIteratorCore(
throw new ArgumentNullException(nameof(changeFeedMode));
}

this.container = container;
this.clientContext = clientContext;
this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
this.changeFeedRequestOptions = changeFeedRequestOptions ?? new ChangeFeedRequestOptions();
Expand Down Expand Up @@ -222,7 +224,10 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc
return await this.clientContext.OperationHelperAsync("Change Feed Iterator Read Next Async",
requestOptions: this.changeFeedRequestOptions,
task: (trace) => this.ReadNextInternalAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(response),
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static ChangeFeedPartitionKeyResultSetIteratorCore Create(
}

private readonly CosmosClientContext clientContext;
private readonly ContainerInternal container;

private readonly ChangeFeedRequestOptions changeFeedOptions;
private ChangeFeedStartFrom changeFeedStartFrom;
private bool hasMoreResultsInternal;
Expand Down Expand Up @@ -94,7 +94,10 @@ public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellati
return this.clientContext.OperationHelperAsync("Change Feed Processor Read Next Async",
requestOptions: this.changeFeedOptions,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(response),
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal class StandByFeedIteratorCore : FeedIteratorInternal
internal StandByFeedContinuationToken compositeContinuationToken;

private readonly CosmosClientContext clientContext;
private readonly ContainerInternal container;

private string containerRid;
private string continuationToken;
private int? maxItemCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ public override Task<FeedResponse<ChangeFeedProcessorState>> ReadNextAsync(Cance
return this.monitoredContainer.ClientContext.OperationHelperAsync("Change Feed Estimator Read Next Async",
requestOptions: null,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(response),
openTelemetry: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(
responseMessage: response,
containerName: this.monitoredContainer?.Id,
databaseName: this.monitoredContainer?.Database?.Id ?? this.databaseName),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
17 changes: 14 additions & 3 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,10 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
trace: trace,
cancellationToken: cancellationToken);
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(response));
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
}

/// <summary>
Expand Down Expand Up @@ -773,7 +776,10 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
trace: trace,
cancellationToken: cancellationToken);
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(response));
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
}

/// <summary>
Expand Down Expand Up @@ -866,7 +872,10 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), readResponseAfterConflict);
}
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(response));
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
}

/// <summary>
Expand Down Expand Up @@ -1339,6 +1348,7 @@ private FeedIteratorInternal GetDatabaseQueryStreamIteratorHelper(
resourceType: ResourceType.Database,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
container: null,
options: requestOptions);
}

Expand Down Expand Up @@ -1388,6 +1398,7 @@ private int DecrementNumberOfActiveClients()
private async Task InitializeContainerAsync(string databaseId, string containerId, CancellationToken cancellationToken = default)
{
ContainerInternal container = (ContainerInternal)this.GetContainer(databaseId, containerId);

IReadOnlyList<FeedRange> feedRanges = await container.GetFeedRangesAsync(cancellationToken);
List<Task> tasks = new List<Task>();
foreach (FeedRange feedRange in feedRanges)
Expand Down
11 changes: 8 additions & 3 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ private QueryIterator(
CosmosSerializationFormatOptions cosmosSerializationFormatOptions,
RequestOptions requestOptions,
CosmosClientContext clientContext,
Guid correlatedActivityId)
Guid correlatedActivityId,
ContainerInternal container)
{
this.cosmosQueryContext = cosmosQueryContext ?? throw new ArgumentNullException(nameof(cosmosQueryContext));
this.queryPipelineStage = cosmosQueryExecutionContext ?? throw new ArgumentNullException(nameof(cosmosQueryExecutionContext));
Expand All @@ -50,6 +51,8 @@ private QueryIterator(
this.clientContext = clientContext ?? throw new ArgumentNullException(nameof(clientContext));
this.hasMoreResults = true;
this.correlatedActivityId = correlatedActivityId;

this.container = container;
}

public static QueryIterator Create(
Expand Down Expand Up @@ -108,7 +111,8 @@ public static QueryIterator Create(
queryRequestOptions.CosmosSerializationFormatOptions,
queryRequestOptions,
clientContext,
correlatedActivityId);
correlatedActivityId,
containerCore);
}

requestContinuationToken = tryParse.Result;
Expand Down Expand Up @@ -148,7 +152,8 @@ public static QueryIterator Create(
queryRequestOptions.CosmosSerializationFormatOptions,
queryRequestOptions,
clientContext,
correlatedActivityId);
correlatedActivityId,
containerCore);
}

public override bool HasMoreResults => this.hasMoreResults;
Expand Down
3 changes: 3 additions & 0 deletions Microsoft.Azure.Cosmos/src/ReadFeed/ReadFeedIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ public ReadFeedIteratorCore(
string continuationToken,
ReadFeedPaginationOptions readFeedPaginationOptions,
QueryRequestOptions queryRequestOptions,
ContainerInternal container,
CancellationToken cancellationToken)
{
this.container = container;

this.queryRequestOptions = queryRequestOptions;
readFeedPaginationOptions ??= ReadFeedPaginationOptions.Default;

Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ private async Task<ResponseMessage> ProcessResourceOperationAsBulkStreamAsync(
itemRequestOptions,
cancellationToken);

return batchOperationResult.ToResponseMessage();
return batchOperationResult.ToResponseMessage(cosmosContainerCore);
}

private bool IsBulkOperationSupported(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ public override FeedIteratorInternal GetItemQueryStreamIteratorInternal(
continuationToken,
readFeedPaginationOptions,
requestOptions,
this,
cancellationToken: default);
}

Expand Down Expand Up @@ -893,6 +894,7 @@ public override FeedIteratorInternal GetReadFeedIterator(
queryRequestOptions: queryRequestOptions,
continuationToken: continuationToken,
readFeedPaginationOptions: readFeedPaginationOptions,
container: this,
cancellationToken: default);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed;
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core;
Expand Down Expand Up @@ -342,7 +343,8 @@ public override FeedIterator GetChangeFeedStreamIterator(
changeFeedStartFrom: changeFeedStartFrom,
changeFeedMode: changeFeedMode,
changeFeedRequestOptions: changeFeedRequestOptions,
clientContext: this.ClientContext);
clientContext: this.ClientContext,
container: this);
}

public override FeedIterator<T> GetChangeFeedIterator<T>(
Expand Down Expand Up @@ -372,6 +374,7 @@ public override FeedIterator<T> GetChangeFeedIterator<T>(
changeFeedStartFrom: changeFeedStartFrom,
changeFeedMode: changeFeedMode,
changeFeedRequestOptions: changeFeedRequestOptions,
container: this,
clientContext: this.ClientContext);

return new FeedIteratorCore<T>(
Expand Down Expand Up @@ -657,7 +660,8 @@ public override FeedIterator GetChangeFeedStreamIteratorWithQuery(
changeFeedMode: changeFeedMode,
changeFeedRequestOptions: changeFeedRequestOptions,
clientContext: this.ClientContext,
changeFeedQuerySpec: changeFeedQuerySpec);
changeFeedQuerySpec: changeFeedQuerySpec,
container: this);
}

public override FeedIterator<T> GetChangeFeedIteratorWithQuery<T>(
Expand Down Expand Up @@ -689,6 +693,7 @@ public override FeedIterator<T> GetChangeFeedIteratorWithQuery<T>(
changeFeedMode: changeFeedMode,
changeFeedRequestOptions: changeFeedRequestOptions,
clientContext: this.ClientContext,
container: this,
changeFeedQuerySpec: changeFeedQuerySpec);

return new FeedIteratorCore<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,10 @@ public override Task<ResponseMessage> ReadManyItemsStreamAsync(
nameof(ReadManyItemsStreamAsync),
null,
(trace) => base.ReadManyItemsStreamAsync(items, trace, readManyRequestOptions, cancellationToken),
(response) => new OpenTelemetryResponse(response));
(response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.Id,
databaseName: this.Database?.Id));
}

public override Task<FeedResponse<T>> ReadManyItemsAsync<T>(
Expand All @@ -365,7 +368,10 @@ public override Task<FeedResponse<T>> ReadManyItemsAsync<T>(
nameof(ReadManyItemsAsync),
null,
(trace) => base.ReadManyItemsAsync<T>(items, trace, readManyRequestOptions, cancellationToken),
(response) => new OpenTelemetryResponse<T>(response));
(response) => new OpenTelemetryResponse<T>(
responseMessage: response,
containerName: this.Id,
databaseName: this.Database?.Id));
}

public override FeedIterator GetItemQueryStreamIterator(
Expand Down
12 changes: 9 additions & 3 deletions Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,9 @@ public override FeedIterator GetContainerQueryStreamIterator(
resourceType: ResourceType.Collection,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
options: requestOptions);
options: requestOptions,
container: null,
databaseId: this.Id);
}

public override FeedIterator<T> GetContainerQueryIterator<T>(
Expand Down Expand Up @@ -640,7 +642,9 @@ public override FeedIterator GetUserQueryStreamIterator(
resourceType: ResourceType.User,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
options: requestOptions);
options: requestOptions,
container: null,
databaseId: this.Id);
}

public override FeedIterator<T> GetUserQueryIterator<T>(
Expand Down Expand Up @@ -730,7 +734,9 @@ private FeedIterator GetClientEncryptionKeyQueryStreamIterator(
resourceType: ResourceType.ClientEncryptionKey,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
options: requestOptions);
options: requestOptions,
container: null,
databaseId: this.Id);
}

public async Task<ClientEncryptionKeyResponse> CreateClientEncryptionKeyAsync(
Expand Down
Loading

0 comments on commit f982616

Please sign in to comment.