Skip to content

Commit

Permalink
[PREVIEW] AI Integration: Adds open telemetry support for most of the…
Browse files Browse the repository at this point in the history
… Operations (#3277)

* container related operation

* database operation

* item operations

* fix test

* code refactor

* regain preview tag

* code refactor and fix

* add unit tests

* put preview tag again

* fix assertion

* fix test

* fix code

* userinlinecore

* permission core

* modify assertion message

* remove opentelemetry test decorator

* code refactor

* throw same exception

* fix test

* fix tests

* enable open telemetry for all the operations

* added itemcount

* code refactor

* add baseline tests for open telemetry

* add a flag to enable open telemetry featur

* remove unit tests

* fix test

* fix bug

* basdeline tests

* fix baseline expectations

* refactor code for exception

* review comments

* code refactor

* rename enavle open telemetry option and remove cosmos exception

* add doc

* doc fix

* updated preview contracts

Co-authored-by: Sourabh Jain <sourabhjain@microsoft.com>
  • Loading branch information
sourabh1007 and sourabh1007 committed Jul 6, 2022
1 parent 795978a commit 66be1a3
Show file tree
Hide file tree
Showing 52 changed files with 1,257 additions and 218 deletions.
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
this.operations = new List<ItemBatchOperation>();
return executor.ExecuteAsync(trace, cancellationToken);
});
},
(response) => new OpenTelemetryResponse(response));
}

/// <summary>
Expand Down
11 changes: 6 additions & 5 deletions Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,12 @@ public ChangeFeedIteratorCore(

public override async Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
return await this.clientContext.OperationHelperAsync("Change Feed Iterator Read Next Async",
requestOptions: this.changeFeedRequestOptions,
task: (trace) => this.ReadNextInternalAsync(trace, cancellationToken),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
return await this.clientContext.OperationHelperAsync("Change Feed Iterator Read Next Async",
requestOptions: this.changeFeedRequestOptions,
task: (trace) => this.ReadNextInternalAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}

public override async Task<ResponseMessage> ReadNextAsync(ITrace trace, CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellati
return this.clientContext.OperationHelperAsync("Change Feed Processor Read Next Async",
requestOptions: this.changeFeedOptions,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public override Task<FeedResponse<ChangeFeedProcessorState>> ReadNextAsync(Cance
return this.monitoredContainer.ClientContext.OperationHelperAsync("Change Feed Estimator Read Next Async",
requestOptions: null,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.Runtime.Serialization;
using global::Azure.Core.Pipeline;
using Microsoft.Azure.Cosmos.Telemetry;
using Microsoft.Azure.Cosmos.Telemetry.Diagnostics;

/// <summary>
/// Exception occurred when an operation in an IChangeFeedObserver is running and throws by user code
Expand Down Expand Up @@ -53,5 +56,15 @@ public override void GetObjectData(SerializationInfo info, StreamingContext cont
{
base.GetObjectData(info, context);
}

/// <summary>
/// RecordOtelAttributes
/// </summary>
/// <param name="exception"></param>
/// <param name="scope"></param>
internal static void RecordOtelAttributes(ChangeFeedProcessorUserException exception, DiagnosticScope scope)
{
scope.AddAttribute(OpenTelemetryAttributeKeys.ExceptionMessage, exception.Message);
}
}
}
15 changes: 10 additions & 5 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,8 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
});
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(response));
}

/// <summary>
Expand Down Expand Up @@ -771,7 +772,8 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
});
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(response));
}

/// <summary>
Expand Down Expand Up @@ -863,7 +865,8 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), readResponseAfterConflict);
}
});
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(response));
}

/// <summary>
Expand Down Expand Up @@ -1163,7 +1166,8 @@ public virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
requestOptions,
trace,
cancellationToken);
});
},
(response) => new OpenTelemetryResponse(response));
}

/// <summary>
Expand Down Expand Up @@ -1257,7 +1261,8 @@ internal virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
requestOptions,
trace,
cancellationToken);
});
},
(response) => new OpenTelemetryResponse(response));
}

private async Task<DatabaseResponse> CreateDatabaseInternalAsync(
Expand Down
12 changes: 11 additions & 1 deletion Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -923,5 +923,15 @@ public override bool CanConvert(Type objectType)
return objectType == typeof(DateTime);
}
}

/// <summary>
/// Enable OpenTelemetry and start emiting activities for each operations
/// </summary>
#if PREVIEW
public
#else
internal
#endif
bool EnableOpenTelemetry { get; set; }
}
}
}
15 changes: 15 additions & 0 deletions Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,21 @@ public CosmosClientBuilder WithConsistencyLevel(Cosmos.ConsistencyLevel consiste
return this;
}

/// <summary>
/// Enable OpenTelemetry and start emiting activities for each operations
/// </summary>
/// <returns>The current <see cref="CosmosClientBuilder"/>.</returns>
#if PREVIEW
public
#else
internal
#endif
CosmosClientBuilder EnableOpenTelemetry()
{
this.clientOptions.EnableOpenTelemetry = true;
return this;
}

/// <summary>
/// Sets the connection mode to Gateway. This is used by the client when connecting to the Azure Cosmos DB service.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions Microsoft.Azure.Cosmos/src/Headers/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ internal virtual string EndEpk
set => this.CosmosMessageHeaders.EndEpk = value;
}

internal virtual string ItemCount => this.CosmosMessageHeaders.Get(HttpConstants.HttpHeaders.ItemCount);

/// <summary>
/// Creates a new instance of <see cref="Headers"/>.
/// </summary>
Expand Down
35 changes: 25 additions & 10 deletions Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ internal override Task<TResult>
string operationName,
RequestOptions requestOptions,
Func<ITrace, Task<TResult>> task,
Func<TResult, OpenTelemetryAttributes> openTelemetry = null,
Func<TResult, OpenTelemetryAttributes> openTelemetry,
TraceComponent traceComponent = TraceComponent.Transport,
Tracing.TraceLevel traceLevel = Tracing.TraceLevel.Info)
{
Expand Down Expand Up @@ -473,7 +473,10 @@ private async Task<TResult> RunWithDiagnosticsHelperAsync<TResult>(
Func<TResult, OpenTelemetryAttributes> openTelemetry,
string operationName)
{
using (OpenTelemetryCoreRecorder recorder = OpenTelemetryRecorderFactory.CreateRecorder(operationName))
using (OpenTelemetryCoreRecorder recorder =
OpenTelemetryRecorderFactory.CreateRecorder(
operationName: operationName,
isFeatureEnabled: this.clientOptions.EnableOpenTelemetry))
using (new ActivityScope(Guid.NewGuid()))
{
try
Expand All @@ -489,23 +492,35 @@ private async Task<TResult> RunWithDiagnosticsHelperAsync<TResult>(
}
catch (OperationCanceledException oe) when (!(oe is CosmosOperationCanceledException))
{
recorder.MarkFailed(oe);
throw new CosmosOperationCanceledException(oe, trace);
CosmosOperationCanceledException operationCancelledException = new CosmosOperationCanceledException(oe, trace);
recorder.MarkFailed(operationCancelledException);

throw operationCancelledException;
}
catch (ObjectDisposedException objectDisposed) when (!(objectDisposed is CosmosObjectDisposedException))
{
recorder.MarkFailed(objectDisposed);
throw new CosmosObjectDisposedException(
objectDisposed,
this.client,
CosmosObjectDisposedException objectDisposedException = new CosmosObjectDisposedException(
objectDisposed,
this.client,
trace);
recorder.MarkFailed(objectDisposedException);

throw objectDisposedException;
}
catch (NullReferenceException nullRefException) when (!(nullRefException is CosmosNullReferenceException))
{
recorder.MarkFailed(nullRefException);
throw new CosmosNullReferenceException(
CosmosNullReferenceException nullException = new CosmosNullReferenceException(
nullRefException,
trace);
recorder.MarkFailed(nullException);

throw nullException;
}
catch (Exception ex)
{
recorder.MarkFailed(ex);

throw;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public override Task<ClientEncryptionKeyResponse> ReadAsync(
return this.ClientContext.OperationHelperAsync(
nameof(ReadAsync),
requestOptions,
(trace) => base.ReadAsync(requestOptions, cancellationToken));
(trace) => base.ReadAsync(requestOptions, cancellationToken),
(response) => new OpenTelemetryResponse<ClientEncryptionKeyProperties>(response));
}

public override Task<ClientEncryptionKeyResponse> ReplaceAsync(
Expand All @@ -42,7 +43,8 @@ public override Task<ClientEncryptionKeyResponse> ReplaceAsync(
return this.ClientContext.OperationHelperAsync(
nameof(ReplaceAsync),
requestOptions,
(trace) => base.ReplaceAsync(clientEncryptionKeyProperties, requestOptions, cancellationToken));
(trace) => base.ReplaceAsync(clientEncryptionKeyProperties, requestOptions, cancellationToken),
(response) => new OpenTelemetryResponse<ClientEncryptionKeyProperties>(response));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public override Task<ResponseMessage> DeleteAsync(
return this.ClientContext.OperationHelperAsync(
operationName: nameof(DeleteAsync),
requestOptions: null,
task: (trace) => base.DeleteAsync(conflict, partitionKey, trace, cancellationToken));
task: (trace) => base.DeleteAsync(conflict, partitionKey, trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(response));
}

public override FeedIterator GetConflictQueryStreamIterator(
Expand Down Expand Up @@ -86,7 +87,8 @@ public override Task<ItemResponse<T>> ReadCurrentAsync<T>(
return this.ClientContext.OperationHelperAsync(
operationName: nameof(ReadCurrentAsync),
requestOptions: null,
task: (trace) => base.ReadCurrentAsync<T>(cosmosConflict, partitionKey, trace, cancellationToken));
task: (trace) => base.ReadCurrentAsync<T>(cosmosConflict, partitionKey, trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse<T>(response));
}

public override T ReadConflictContent<T>(ConflictProperties cosmosConflict)
Expand Down
Loading

0 comments on commit 66be1a3

Please sign in to comment.