Skip to content

Commit

Permalink
Merged to latest
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake Willey committed Mar 12, 2020
2 parents e4b7efb + 16f054a commit d21afbf
Show file tree
Hide file tree
Showing 50 changed files with 762 additions and 771 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private async Task<PartitionKeyRangeBatchExecutionResult> ExecuteAsync(
PartitionKeyRangeServerBatchRequest serverRequest,
CancellationToken cancellationToken)
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create();
CosmosDiagnosticsContext diagnosticsContext = new CosmosDiagnosticsContextCore();
CosmosDiagnosticScope limiterScope = diagnosticsContext.CreateScope("BatchAsyncContainerExecutor.Limiter");
SemaphoreSlim limiter = this.GetOrAddLimiterForPartitionKeyRange(serverRequest.PartitionKeyRangeId);
using (await limiter.UsingWaitAsync(cancellationToken))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ internal ResponseMessage ToResponseMessage()
requestMessage: null,
headers: headers,
cosmosException: null,
diagnostics: this.DiagnosticsContext ?? CosmosDiagnosticsContext.Create())
diagnostics: this.DiagnosticsContext ?? new CosmosDiagnosticsContextCore())
{
Content = this.ResourceStream
};
Expand Down
16 changes: 13 additions & 3 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public ClientRetryPolicy(
this.enableEndpointDiscovery = enableEndpointDiscovery;
this.sessionTokenRetryCount = 0;
this.canUseMultipleWriteLocations = false;

this.sharedStatistics = new CosmosClientSideRequestStatistics();
}

/// <summary>
Expand Down Expand Up @@ -126,7 +124,19 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)
this.isReadRequest = request.IsReadOnlyRequest;
this.canUseMultipleWriteLocations = this.globalEndpointManager.CanUseMultipleWriteLocations(request);

request.RequestContext.ClientRequestStatistics = this.sharedStatistics;
if (request.RequestContext.ClientRequestStatistics == null)
{
if (this.sharedStatistics == null)
{
this.sharedStatistics = new CosmosClientSideRequestStatistics();
}

request.RequestContext.ClientRequestStatistics = this.sharedStatistics;
}
else
{
this.sharedStatistics = request.RequestContext.ClientRequestStatistics;
}

// clear previous location-based routing directive
request.RequestContext.ClearRouteToLocation();
Expand Down
24 changes: 17 additions & 7 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -493,21 +493,31 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
// Doing a Read before Create will give us better latency for existing databases
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
Database database = this.GetDatabase(id);
ResponseMessage response = await database.ReadStreamAsync(requestOptions: requestOptions, cancellationToken: cancellationToken);
if (response.StatusCode != HttpStatusCode.NotFound)
ResponseMessage readResponse = await database.ReadStreamAsync(
requestOptions: requestOptions,
cancellationToken: cancellationToken);
if (readResponse.StatusCode != HttpStatusCode.NotFound)
{
return await this.ClientContext.ResponseFactory.CreateDatabaseResponseAsync(database, Task.FromResult(response));
return await this.ClientContext.ResponseFactory.CreateDatabaseResponseAsync(database, Task.FromResult(readResponse));
}
response = await this.CreateDatabaseStreamAsync(databaseProperties, throughput, requestOptions, cancellationToken);
if (response.StatusCode != HttpStatusCode.Conflict)
ResponseMessage createResponse = await this.CreateDatabaseStreamAsync(databaseProperties, throughput, requestOptions, cancellationToken);
// Merge the diagnostics with the first read request.
createResponse.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext);
if (createResponse.StatusCode != HttpStatusCode.Conflict)
{
return await this.ClientContext.ResponseFactory.CreateDatabaseResponseAsync(this.GetDatabase(databaseProperties.Id), Task.FromResult(response));
return await this.ClientContext.ResponseFactory.CreateDatabaseResponseAsync(this.GetDatabase(databaseProperties.Id), Task.FromResult(createResponse));
}
// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
return await database.ReadAsync(cancellationToken: cancellationToken);
ResponseMessage readResponseAfterConflict = await database.ReadStreamAsync(
requestOptions: requestOptions,
cancellationToken: cancellationToken);
readResponseAfterConflict.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext);
return await this.ClientContext.ResponseFactory.CreateDatabaseResponseAsync(this.GetDatabase(databaseProperties.Id), Task.FromResult(readResponseAfterConflict));
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Diagnostics
{
using System;

internal sealed class AddressResolutionStatistics : CosmosDiagnosticsInternal
{
public AddressResolutionStatistics(
DateTime startTime,
DateTime endTime,
string targetEndpoint)
{
this.StartTime = startTime;
this.EndTime = endTime;
this.TargetEndpoint = targetEndpoint ?? throw new ArgumentNullException(nameof(startTime));
}

public DateTime StartTime { get; }
public DateTime? EndTime { get; set; }
public string TargetEndpoint { get; }

public override void Accept(CosmosDiagnosticsInternalVisitor visitor)
{
visitor.Visit(this);
}

public override TResult Accept<TResult>(CosmosDiagnosticsInternalVisitor<TResult> visitor)
{
return visitor.Visit(this);
}
}
}
20 changes: 18 additions & 2 deletions Microsoft.Azure.Cosmos/src/Diagnostics/BackendMetricsExtractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.Diagnostics
internal sealed class BackendMetricsExtractor : CosmosDiagnosticsInternalVisitor<(ParseFailureReason, BackendMetrics)>
{
public static readonly BackendMetricsExtractor Singleton = new BackendMetricsExtractor();
private static readonly (ParseFailureReason, BackendMetrics) MetricsNotFound = (ParseFailureReason.MetricsNotFound, default);

private BackendMetricsExtractor()
{
Expand All @@ -21,7 +22,7 @@ private BackendMetricsExtractor()

public override (ParseFailureReason, BackendMetrics) Visit(PointOperationStatistics pointOperationStatistics)
{
return (ParseFailureReason.MetricsNotFound, default);
return BackendMetricsExtractor.MetricsNotFound;
}

public override (ParseFailureReason, BackendMetrics) Visit(CosmosDiagnosticsContext cosmosDiagnosticsContext)
Expand Down Expand Up @@ -61,7 +62,7 @@ public override (ParseFailureReason, BackendMetrics) Visit(CosmosDiagnosticsCont

public override (ParseFailureReason, BackendMetrics) Visit(CosmosDiagnosticScope cosmosDiagnosticScope)
{
return (ParseFailureReason.MetricsNotFound, default);
return BackendMetricsExtractor.MetricsNotFound;
}

public override (ParseFailureReason, BackendMetrics) Visit(QueryPageDiagnostics queryPageDiagnostics)
Expand All @@ -74,6 +75,21 @@ public override (ParseFailureReason, BackendMetrics) Visit(QueryPageDiagnostics
return (ParseFailureReason.None, backendMetrics);
}

public override (ParseFailureReason, BackendMetrics) Visit(AddressResolutionStatistics addressResolutionStatistics)
{
return BackendMetricsExtractor.MetricsNotFound;
}

public override (ParseFailureReason, BackendMetrics) Visit(StoreResponseStatistics storeResponseStatistics)
{
return BackendMetricsExtractor.MetricsNotFound;
}

public override (ParseFailureReason, BackendMetrics) Visit(CosmosClientSideRequestStatistics clientSideRequestStatistics)
{
return BackendMetricsExtractor.MetricsNotFound;
}

public enum ParseFailureReason
{
None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Text;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Documents;
using Newtonsoft.Json;

internal sealed class CosmosClientSideRequestStatistics : CosmosDiagnosticsInternal, IClientSideRequestStatistics
{
private readonly object lockObject = new object();

public CosmosClientSideRequestStatistics(CosmosDiagnosticsContext diagnosticsContext = null)
{
this.RequestStartTimeUtc = DateTime.UtcNow;
this.RequestEndTimeUtc = null;
this.EndpointToAddressResolutionStatistics = new Dictionary<string, AddressResolutionStatistics>();
this.ContactedReplicas = new List<Uri>();
this.FailedReplicas = new HashSet<Uri>();
this.RegionsContacted = new HashSet<Uri>();
this.DiagnosticsContext = diagnosticsContext ?? new CosmosDiagnosticsContextCore();
this.DiagnosticsContext.AddDiagnosticsInternal(this);
}

private DateTime RequestStartTimeUtc { get; }

private DateTime? RequestEndTimeUtc { get; set; }

private Dictionary<string, AddressResolutionStatistics> EndpointToAddressResolutionStatistics { get; }

public List<Uri> ContactedReplicas { get; set; }

public HashSet<Uri> FailedReplicas { get; }

public HashSet<Uri> RegionsContacted { get; }

public TimeSpan RequestLatency
{
get
{
if (this.RequestEndTimeUtc.HasValue)
{
return this.RequestEndTimeUtc.Value - this.RequestStartTimeUtc;
}

return TimeSpan.MaxValue;
}
}

public bool IsCpuOverloaded { get; private set; } = false;

public CosmosDiagnosticsContext DiagnosticsContext { get; }

public void RecordRequest(DocumentServiceRequest request)
{
}

public void RecordResponse(DocumentServiceRequest request, StoreResult storeResult)
{
DateTime responseTime = DateTime.UtcNow;
Uri locationEndpoint = request.RequestContext.LocationEndpointToRoute;
StoreResponseStatistics responseStatistics = new StoreResponseStatistics(
responseTime,
storeResult,
request.ResourceType,
request.OperationType,
locationEndpoint);

if (storeResult?.IsClientCpuOverloaded ?? false)
{
this.IsCpuOverloaded = true;
}

lock (this.lockObject)
{
if (!this.RequestEndTimeUtc.HasValue || responseTime > this.RequestEndTimeUtc)
{
this.RequestEndTimeUtc = responseTime;
}

if (locationEndpoint != null)
{
this.RegionsContacted.Add(locationEndpoint);
}

this.DiagnosticsContext.AddDiagnosticsInternal(responseStatistics);
}
}

public string RecordAddressResolutionStart(Uri targetEndpoint)
{
string identifier = Guid.NewGuid().ToString();
AddressResolutionStatistics resolutionStats = new AddressResolutionStatistics(
startTime: DateTime.UtcNow,
endTime: DateTime.MaxValue,
targetEndpoint: targetEndpoint == null ? "<NULL>" : targetEndpoint.ToString());

lock (this.lockObject)
{
this.EndpointToAddressResolutionStatistics.Add(identifier, resolutionStats);
this.DiagnosticsContext.AddDiagnosticsInternal(resolutionStats);
}

return identifier;
}

public void RecordAddressResolutionEnd(string identifier)
{
if (string.IsNullOrEmpty(identifier))
{
return;
}

DateTime responseTime = DateTime.UtcNow;
lock (this.lockObject)
{
if (!this.EndpointToAddressResolutionStatistics.ContainsKey(identifier))
{
throw new ArgumentException("Identifier {0} does not exist. Please call start before calling end.", identifier);
}

if (!this.RequestEndTimeUtc.HasValue || responseTime > this.RequestEndTimeUtc)
{
this.RequestEndTimeUtc = responseTime;
}

this.EndpointToAddressResolutionStatistics[identifier].EndTime = responseTime;
}
}

public override string ToString()
{
// This is required for the older IClientSideRequestStatistics
// Capture the entire diagnostic context in the toString to avoid losing any information
// for any APIs using the older interface.
return this.DiagnosticsContext.ToString();
}

public void AppendToBuilder(StringBuilder stringBuilder)
{
// This is required for the older IClientSideRequestStatistics
// Capture the entire diagnostic context in the toString to avoid losing any information
// for any APIs using the older interface.
stringBuilder.Append(this.DiagnosticsContext.ToString());
}

public override void Accept(CosmosDiagnosticsInternalVisitor visitor)
{
visitor.Visit(this);
}

public override TResult Accept<TResult>(CosmosDiagnosticsInternalVisitor<TResult> visitor)
{
return visitor.Visit(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
namespace Microsoft.Azure.Cosmos.Diagnostics
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using Newtonsoft.Json;

/// <summary>
/// This represents a single scope in the diagnostics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace Microsoft.Azure.Cosmos
using System.Collections;
using System.Collections.Generic;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Documents;
using static Microsoft.Azure.Cosmos.CosmosClientSideRequestStatistics;

/// <summary>
/// This represents the diagnostics interface used in the SDK.
Expand All @@ -31,6 +33,12 @@ internal abstract class CosmosDiagnosticsContext : CosmosDiagnosticsInternal, IE

internal abstract void AddDiagnosticsInternal(QueryPageDiagnostics queryPageDiagnostics);

internal abstract void AddDiagnosticsInternal(StoreResponseStatistics storeResponseStatistics);

internal abstract void AddDiagnosticsInternal(AddressResolutionStatistics addressResolutionStatistics);

internal abstract void AddDiagnosticsInternal(CosmosClientSideRequestStatistics clientSideRequestStatistics);

internal abstract void AddDiagnosticsInternal(CosmosDiagnosticsContext newContext);

internal abstract void SetSdkUserAgent(string userAgent);
Expand All @@ -44,12 +52,7 @@ IEnumerator IEnumerable.GetEnumerator()

internal static CosmosDiagnosticsContext Create(RequestOptions requestOptions)
{
return requestOptions?.DiagnosticContext ?? CosmosDiagnosticsContext.Create();
}

internal static CosmosDiagnosticsContext Create()
{
return new CosmosDiagnosticsContextCore();
return requestOptions?.DiagnosticContext ?? new CosmosDiagnosticsContextCore();
}
}
}
Loading

0 comments on commit d21afbf

Please sign in to comment.