Skip to content

Commit

Permalink
Diagnostics: Add synchronization context Part 1 (#1587)
Browse files Browse the repository at this point in the history
* Diagnostic refactor for synchronization context

* Added Database changes to give more context on how it will look for inline classes
  • Loading branch information
j82w committed Jun 16, 2020
1 parent a18e5e8 commit 1785ee0
Show file tree
Hide file tree
Showing 21 changed files with 624 additions and 391 deletions.
197 changes: 100 additions & 97 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public virtual Task<AccountProperties> ReadAccountAsync()
/// <param name="id">The Cosmos database id</param>
/// <remarks>
/// <see cref="Database"/> proxy reference doesn't guarantee existence.
/// Please ensure database exists through <see cref="CosmosClient.CreateDatabaseAsync(DatabaseProperties, int?, RequestOptions, CancellationToken)"/>
/// Please ensure database exists through <see cref="CosmosClient.CreateDatabaseAsync(string, int?, RequestOptions, CancellationToken)"/>
/// or <see cref="CosmosClient.CreateDatabaseIfNotExistsAsync(string, int?, RequestOptions, CancellationToken)"/>, before
/// operating on it.
/// </remarks>
Expand Down Expand Up @@ -382,12 +382,21 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
throw new ArgumentNullException(nameof(id));
}

DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
return TaskHelper.RunInlineIfNeededAsync(() => this.CreateDatabaseAsync(
databaseProperties: databaseProperties,
throughput: throughput,
requestOptions: requestOptions,
cancellationToken: cancellationToken));
return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseAsync),
requestOptions,
(diagnostics) =>
{
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
ThroughputProperties throughputProperties = ThroughputProperties.CreateManualThroughput(throughput);
return this.CreateDatabaseInternalAsync(
databaseProperties: databaseProperties,
throughputProperties: throughputProperties,
requestOptions: requestOptions,
diagnosticsContext: diagnostics,
cancellationToken: cancellationToken);
});
}

/// <summary>
Expand Down Expand Up @@ -418,12 +427,19 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
throw new ArgumentNullException(nameof(id));
}

DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
return TaskHelper.RunInlineIfNeededAsync(() => this.CreateDatabaseAsync(
databaseProperties: databaseProperties,
throughputProperties: throughputProperties,
requestOptions: requestOptions,
cancellationToken: cancellationToken));
return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseAsync),
requestOptions,
(diagnostics) =>
{
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
return this.CreateDatabaseInternalAsync(
diagnosticsContext: diagnostics,
databaseProperties: databaseProperties,
throughputProperties: throughputProperties,
requestOptions: requestOptions,
cancellationToken: cancellationToken);
});
}

/// <summary>
Expand Down Expand Up @@ -468,35 +484,45 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
throw new ArgumentNullException(nameof(id));
}

return TaskHelper.RunInlineIfNeededAsync(async () =>
return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseIfNotExistsAsync),
requestOptions,
async (diagnostics) =>
{
// Doing a Read before Create will give us better latency for existing databases
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
Database database = this.GetDatabase(id);
ResponseMessage readResponse = await database.ReadStreamAsync(
DatabaseCore database = (DatabaseCore)this.GetDatabase(id);
using (ResponseMessage readResponse = await database.ReadStreamAsync(
diagnosticsContext: diagnostics,
requestOptions: requestOptions,
cancellationToken: cancellationToken);
if (readResponse.StatusCode != HttpStatusCode.NotFound)
cancellationToken: cancellationToken))
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(database, readResponse);
if (readResponse.StatusCode != HttpStatusCode.NotFound)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(database, readResponse);
}
}
ResponseMessage createResponse = await this.CreateDatabaseStreamAsync(databaseProperties, throughputProperties, requestOptions, cancellationToken);
// Merge the diagnostics with the first read request.
createResponse.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext);
if (createResponse.StatusCode != HttpStatusCode.Conflict)
using (ResponseMessage createResponse = await this.CreateDatabaseStreamInternalAsync(
diagnostics,
databaseProperties,
throughputProperties,
requestOptions,
cancellationToken))
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), createResponse);
if (createResponse.StatusCode != HttpStatusCode.Conflict)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), createResponse);
}
}
// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
ResponseMessage readResponseAfterConflict = await database.ReadStreamAsync(
diagnosticsContext: diagnostics,
requestOptions: requestOptions,
cancellationToken: cancellationToken);
readResponseAfterConflict.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext);
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), readResponseAfterConflict);
});
}
Expand Down Expand Up @@ -538,8 +564,7 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
ThroughputProperties throughputProperties = throughput.HasValue
? ThroughputProperties.CreateManualThroughput(throughput.Value) : null;
ThroughputProperties throughputProperties = ThroughputProperties.CreateManualThroughput(throughput);

return this.CreateDatabaseIfNotExistsAsync(
id,
Expand Down Expand Up @@ -586,10 +611,10 @@ public virtual FeedIterator<T> GetDatabaseQueryIterator<T>(
QueryRequestOptions requestOptions = null)
{
return new FeedIteratorInlineCore<T>(
this.GetDatabaseQueryIteratorHelper<T>(
queryDefinition,
continuationToken,
requestOptions));
this.GetDatabaseQueryIteratorHelper<T>(
queryDefinition,
continuationToken,
requestOptions));
}

/// <summary>
Expand Down Expand Up @@ -771,14 +796,19 @@ public virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
throw new ArgumentNullException(nameof(databaseProperties));
}

this.ClientContext.ValidateResource(databaseProperties.Id);
Stream streamPayload = this.ClientContext.SerializerCore.ToStream<DatabaseProperties>(databaseProperties);

return TaskHelper.RunInlineIfNeededAsync(() => this.CreateDatabaseStreamInternalAsync(
streamPayload,
throughput,
requestOptions,
cancellationToken));
return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseStreamAsync),
requestOptions,
(diagnostics) =>
{
this.ClientContext.ValidateResource(databaseProperties.Id);
return this.CreateDatabaseStreamInternalAsync(
diagnostics,
databaseProperties,
ThroughputProperties.CreateManualThroughput(throughput),
requestOptions,
cancellationToken);
});
}

internal virtual async Task<ConsistencyLevel> GetAccountConsistencyLevelAsync()
Expand Down Expand Up @@ -835,21 +865,27 @@ internal virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
throw new ArgumentNullException(nameof(databaseProperties));
}

this.ClientContext.ValidateResource(databaseProperties.Id);
Stream streamPayload = this.ClientContext.SerializerCore.ToStream<DatabaseProperties>(databaseProperties);

return TaskHelper.RunInlineIfNeededAsync(() => this.CreateDatabaseStreamInternalAsync(
streamPayload,
throughputProperties,
return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseIfNotExistsAsync),
requestOptions,
cancellationToken));
(diagnostics) =>
{
this.ClientContext.ValidateResource(databaseProperties.Id);
return this.CreateDatabaseStreamInternalAsync(
diagnostics,
databaseProperties,
throughputProperties,
requestOptions,
cancellationToken);
});
}

internal async Task<DatabaseResponse> CreateDatabaseAsync(
DatabaseProperties databaseProperties,
ThroughputProperties throughputProperties,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
private async Task<DatabaseResponse> CreateDatabaseInternalAsync(
CosmosDiagnosticsContext diagnosticsContext,
DatabaseProperties databaseProperties,
ThroughputProperties throughputProperties,
RequestOptions requestOptions,
CancellationToken cancellationToken)
{
ResponseMessage response = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.DatabaseRootUri,
Expand All @@ -860,63 +896,30 @@ internal async Task<DatabaseResponse> CreateDatabaseAsync(
partitionKey: null,
streamPayload: this.ClientContext.SerializerCore.ToStream<DatabaseProperties>(databaseProperties),
requestEnricher: (httpRequestMessage) => httpRequestMessage.AddThroughputPropertiesHeader(throughputProperties),
diagnosticsContext: null,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), response);
}

internal async Task<DatabaseResponse> CreateDatabaseAsync(
DatabaseProperties databaseProperties,
int? throughput = null,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
ResponseMessage response = await this.CreateDatabaseStreamInternalAsync(
streamPayload: this.ClientContext.SerializerCore.ToStream<DatabaseProperties>(databaseProperties),
throughput: throughput,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), response);
}

private Task<ResponseMessage> CreateDatabaseStreamInternalAsync(
Stream streamPayload,
int? throughput,
RequestOptions requestOptions,
CancellationToken cancellationToken)
{
ThroughputProperties throughputProperties = null;
if (throughput.HasValue)
{
throughputProperties = ThroughputProperties.CreateManualThroughput(throughput.Value);
}

return this.CreateDatabaseStreamInternalAsync(
streamPayload,
throughputProperties,
requestOptions,
cancellationToken);

}

private Task<ResponseMessage> CreateDatabaseStreamInternalAsync(
Stream streamPayload,
ThroughputProperties throughputProperties,
RequestOptions requestOptions,
CancellationToken cancellationToken)
CosmosDiagnosticsContext diagnosticsContext,
DatabaseProperties databaseProperties,
ThroughputProperties throughputProperties,
RequestOptions requestOptions,
CancellationToken cancellationToken)
{
return this.ClientContext.ProcessResourceOperationStreamAsync(
return this.ClientContext.ProcessResourceOperationAsync(
resourceUri: this.DatabaseRootUri,
resourceType: ResourceType.Database,
operationType: OperationType.Create,
requestOptions: requestOptions,
cosmosContainerCore: null,
containerInternal: null,
partitionKey: null,
streamPayload: streamPayload,
streamPayload: this.ClientContext.SerializerCore.ToStream<DatabaseProperties>(databaseProperties),
requestEnricher: (httpRequestMessage) => httpRequestMessage.AddThroughputPropertiesHeader(throughputProperties),
diagnosticsContext: null,
responseCreator: (response) => response,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public CosmosClientSideRequestStatistics(CosmosDiagnosticsContext diagnosticsCon
this.ContactedReplicas = new List<Uri>();
this.FailedReplicas = new HashSet<Uri>();
this.RegionsContacted = new HashSet<Uri>();
this.DiagnosticsContext = diagnosticsContext ?? new CosmosDiagnosticsContextCore();
this.DiagnosticsContext = diagnosticsContext ?? CosmosDiagnosticsContextCore.Create(requestOptions: null);
this.DiagnosticsContext.AddDiagnosticsInternal(this);
this.clientSideRequestStatisticsCreateTime = Stopwatch.GetTimestamp();
}
Expand Down
33 changes: 24 additions & 9 deletions Microsoft.Azure.Cosmos/src/Diagnostics/CosmosDiagnosticsContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@ internal abstract class CosmosDiagnosticsContext : CosmosDiagnosticsInternal, IE
{
public abstract DateTime StartUtc { get; }

public abstract int TotalRequestCount { get; protected set; }
public abstract string UserAgent { get; }

public abstract int FailedRequestCount { get; protected set; }

public abstract string UserAgent { get; protected set; }
public abstract string OperationName { get; }

internal abstract CosmosDiagnostics Diagnostics { get; }

public abstract int GetTotalRequestCount();

public abstract int GetFailedRequestCount();

internal abstract IDisposable GetOverallScope();

internal abstract IDisposable CreateScope(string name);

internal abstract IDisposable CreateRequestHandlerScopeScope(RequestHandler requestHandler);

internal abstract TimeSpan GetClientElapsedTime();
internal abstract TimeSpan GetRunningElapsedTime();

internal abstract bool TryGetTotalElapsedTime(out TimeSpan timeSpan);

internal abstract bool IsComplete();

Expand All @@ -50,18 +54,29 @@ internal abstract class CosmosDiagnosticsContext : CosmosDiagnosticsInternal, IE

internal abstract void AddDiagnosticsInternal(CosmosDiagnosticsContext newContext);

internal abstract void SetSdkUserAgent(string userAgent);

public abstract IEnumerator<CosmosDiagnosticsInternal> GetEnumerator();

IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}

internal static CosmosDiagnosticsContext Create(RequestOptions requestOptions)
internal static CosmosDiagnosticsContext Create(
RequestOptions requestOptions)
{
return requestOptions?.DiagnosticContextFactory?.Invoke() ??
new CosmosDiagnosticsContextCore();
}

internal static CosmosDiagnosticsContext Create(
string operationName,
RequestOptions requestOptions,
string userAgentString)
{
return requestOptions?.DiagnosticContextFactory?.Invoke() ?? new CosmosDiagnosticsContextCore();
return requestOptions?.DiagnosticContextFactory?.Invoke() ??
new CosmosDiagnosticsContextCore(
operationName,
userAgentString);
}
}
}
Loading

0 comments on commit 1785ee0

Please sign in to comment.