Skip to content

Commit

Permalink
[Internal] Release: Adds 3.15.1 release branch (#2074)
Browse files Browse the repository at this point in the history
Cherry-picking commits for the release

#2069
#2047
#2071
#2042
Version bump from #2072
  • Loading branch information
ealsur authored Dec 17, 2020
1 parent 8eb44ac commit d0bee5d
Show file tree
Hide file tree
Showing 17 changed files with 1,734 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<ClientOfficialVersion>3.15.0</ClientOfficialVersion>
<ClientOfficialVersion>3.15.1</ClientOfficialVersion>
<ClientPreviewVersion>3.15.2</ClientPreviewVersion>
<DirectVersion>3.15.2</DirectVersion>
<EncryptionVersion>1.0.0-preview6</EncryptionVersion>
Expand Down
1,219 changes: 1,219 additions & 0 deletions Microsoft.Azure.Cosmos/contracts/API_3.15.1.txt

Large diffs are not rendered by default.

18 changes: 8 additions & 10 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ namespace Microsoft.Azure.Cosmos
/// <seealso cref="BatchAsyncStreamer"/>
internal class BatchAsyncContainerExecutor : IDisposable
{
private const int DefaultDispatchTimerInSeconds = 1;
private const int TimerWheelBucketCount = 20;
private static readonly TimeSpan TimerWheelResolution = TimeSpan.FromMilliseconds(50);

Expand All @@ -52,11 +51,6 @@ public BatchAsyncContainerExecutor(
int maxServerRequestOperationCount,
int maxServerRequestBodyLength)
{
if (cosmosContainer == null)
{
throw new ArgumentNullException(nameof(cosmosContainer));
}

if (maxServerRequestOperationCount < 1)
{
throw new ArgumentOutOfRangeException(nameof(maxServerRequestOperationCount));
Expand All @@ -67,7 +61,7 @@ public BatchAsyncContainerExecutor(
throw new ArgumentOutOfRangeException(nameof(maxServerRequestBodyLength));
}

this.cosmosContainer = cosmosContainer;
this.cosmosContainer = cosmosContainer ?? throw new ArgumentNullException(nameof(cosmosContainer));
this.cosmosClientContext = cosmosClientContext;
this.maxServerRequestBodyLength = maxServerRequestBodyLength;
this.maxServerRequestOperationCount = maxServerRequestOperationCount;
Expand All @@ -89,7 +83,7 @@ public virtual async Task<TransactionalBatchOperationResult> AddAsync(

string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.retryOptions));
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, this.retryOptions));
operation.AttachContext(context);
streamer.Add(operation);
return await context.OperationTask;
Expand Down Expand Up @@ -136,10 +130,13 @@ internal virtual async Task ValidateOperationAsync(
await operation.MaterializeResourceAsync(this.cosmosClientContext.SerializerCore, cancellationToken);
}

private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions)
private static IDocumentClientRetryPolicy GetRetryPolicy(
ContainerInternal containerInternal,
RetryOptions retryOptions)
{
return new BulkPartitionKeyRangeGoneRetryPolicy(
new ResourceThrottleRetryPolicy(
containerInternal,
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
retryOptions.MaxRetryWaitTimeInSeconds));
}
Expand Down Expand Up @@ -185,6 +182,7 @@ private async Task ReBatchAsync(
CancellationToken cancellationToken)
{
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
streamer.Add(operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal class ItemBatchOperationContext : IDisposable
{
public string PartitionKeyRangeId { get; }
public string PartitionKeyRangeId { get; private set; }

public BatchAsyncBatcher CurrentBatcher { get; set; }

Expand Down Expand Up @@ -74,6 +74,11 @@ public void Fail(
this.Dispose();
}

public void ReRouteOperation(string newPartitionKeyRangeId)
{
this.PartitionKeyRangeId = newPartitionKeyRangeId;
}

public void Dispose()
{
this.CurrentBatcher = null;
Expand Down
81 changes: 47 additions & 34 deletions Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

/// <summary>
Expand All @@ -17,77 +18,89 @@ namespace Microsoft.Azure.Cosmos
/// <see cref="ItemBatchOperationContext"/>
internal sealed class BulkPartitionKeyRangeGoneRetryPolicy : IDocumentClientRetryPolicy
{
private const int MaxRetries = 1;

private readonly IDocumentClientRetryPolicy nextRetryPolicy;
private readonly ContainerInternal container;

private int retriesAttempted;

public BulkPartitionKeyRangeGoneRetryPolicy(IDocumentClientRetryPolicy nextRetryPolicy)
public BulkPartitionKeyRangeGoneRetryPolicy(
ContainerInternal container,
IDocumentClientRetryPolicy nextRetryPolicy)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.nextRetryPolicy = nextRetryPolicy;
}

public Task<ShouldRetryResult> ShouldRetryAsync(
public async Task<ShouldRetryResult> ShouldRetryAsync(
Exception exception,
CancellationToken cancellationToken)
{
DocumentClientException clientException = exception as DocumentClientException;

ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(
clientException?.StatusCode,
clientException?.GetSubStatus(),
clientException?.ResourceAddress);

if (shouldRetryResult != null)
if (exception is CosmosException clientException)
{
return Task.FromResult(shouldRetryResult);
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
clientException.StatusCode,
(SubStatusCodes)clientException.SubStatusCode,
cancellationToken);

if (shouldRetryResult != null)
{
return shouldRetryResult;
}

if (this.nextRetryPolicy == null)
{
return ShouldRetryResult.NoRetry();
}
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
return await this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
}

public Task<ShouldRetryResult> ShouldRetryAsync(
public async Task<ShouldRetryResult> ShouldRetryAsync(
ResponseMessage cosmosResponseMessage,
CancellationToken cancellationToken)
{
ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(cosmosResponseMessage?.StatusCode,
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode,
cosmosResponseMessage?.GetResourceAddress());
cancellationToken);
if (shouldRetryResult != null)
{
return Task.FromResult(shouldRetryResult);
return shouldRetryResult;
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
return ShouldRetryResult.NoRetry();
}

return this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
return await this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
}

public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.nextRetryPolicy.OnBeforeSendRequest(request);
}

private ShouldRetryResult ShouldRetryInternal(
private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
HttpStatusCode? statusCode,
SubStatusCodes? subStatusCode,
string resourceIdOrFullName)
CancellationToken cancellationToken)
{
if (statusCode == HttpStatusCode.Gone
&& (subStatusCode == SubStatusCodes.PartitionKeyRangeGone || subStatusCode == SubStatusCodes.NameCacheIsStale)
&& this.retriesAttempted < MaxRetries)
if (statusCode == HttpStatusCode.Gone)
{
this.retriesAttempted++;
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
if (subStatusCode == SubStatusCodes.PartitionKeyRangeGone
|| subStatusCode == SubStatusCodes.CompletingSplit
|| subStatusCode == SubStatusCodes.CompletingPartitionMigration)
{
PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
string containerRid = await this.container.GetRIDAsync(cancellationToken: cancellationToken);
await partitionKeyRangeCache.TryGetOverlappingRangesAsync(containerRid, FeedRangeEpk.FullRange.Range, forceRefresh: true);
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}

if (subStatusCode == SubStatusCodes.NameCacheIsStale)
{
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public override void Visit(ChangeFeedStartFromTime startFromTime)
{
this.requestMessage.Headers.Add(
HttpConstants.HttpHeaders.IfModifiedSince,
startFromTime.StartTime.ToString("o", CultureInfo.InvariantCulture));
startFromTime.StartTime.ToString("r", CultureInfo.InvariantCulture));
}

startFromTime.FeedRange.Accept(FeedRangeRequestMessagePopulatorVisitor.Singleton, this.requestMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public override void Visit(CosmosDiagnosticsContext cosmosDiagnosticsContext)
this.jsonWriter.WritePropertyName("FailedRequestCount");
this.jsonWriter.WriteValue(cosmosDiagnosticsContext.GetFailedResponseCount());

this.jsonWriter.WritePropertyName("Operation");
this.jsonWriter.WriteValue(cosmosDiagnosticsContext.OperationName);

this.jsonWriter.WriteEndObject();

this.jsonWriter.WritePropertyName("Context");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ public Task<ShouldRetryResult> ShouldRetryAsync(
CancellationToken cancellationToken)
{
HttpRequestMessage httpRequestMessage = this.getHttpReqestMessage();

this.diagnosticsContext.AddDiagnosticsInternal(
new PointOperationStatistics(
activityId: Trace.CorrelationManager.ActivityId.ToString(),
statusCode: HttpStatusCode.InternalServerError,
statusCode: HttpStatusCode.RequestTimeout,
subStatusCode: SubStatusCodes.Unknown,
responseTimeUtc: DateTime.UtcNow,
requestCharge: 0,
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/Util/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ internal static ResponseMessage ToCosmosResponseMessage(this DocumentClientExcep
PointOperationStatistics pointOperationStatistics = new PointOperationStatistics(
activityId: cosmosException.Headers.ActivityId,
statusCode: cosmosException.StatusCode,
subStatusCode: (int)SubStatusCodes.Unknown,
subStatusCode: cosmosException.Headers.SubStatusCode,
responseTimeUtc: DateTime.UtcNow,
requestCharge: cosmosException.Headers.RequestCharge,
errorMessage: documentClientException.ToString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,5 +238,59 @@ public async Task TestWithStartTime_Beginning()
Assert.IsTrue(isStartOk, "Timed out waiting for docs to process");
Assert.AreEqual("doc0.doc1.doc2.doc3.doc4.", accumulator);
}

[TestMethod]
public async Task TestWithStartTime_CustomTime()
{
int partitionKey = 0;

foreach (int id in Enumerable.Range(0, 5))
{
await this.Container.CreateItemAsync<dynamic>(new { id = $"doc{id}", pk = partitionKey });
}

await Task.Delay(1000);

DateTime now = DateTime.UtcNow;

await Task.Delay(1000);

foreach (int id in Enumerable.Range(5, 5))
{
await this.Container.CreateItemAsync<dynamic>(new { id = $"doc{id}", pk = partitionKey });
}

ManualResetEvent allDocsProcessed = new ManualResetEvent(false);

int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
{
Assert.IsTrue(docs.Count > 0);
processedDocCount += docs.Count;
foreach (dynamic doc in docs)
{
accumulator += doc.id.ToString() + ".";
}
if (processedDocCount == 5)
{
allDocsProcessed.Set();
}
return Task.CompletedTask;
})
.WithStartTime(now)
.WithInstanceName("random")
.WithLeaseContainer(this.LeaseContainer).Build();

await processor.StartAsync();
// Letting processor initialize and pickup changes
bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime);
await processor.StopAsync();
Assert.IsTrue(isStartOk, "Timed out waiting for docs to process");
Assert.AreEqual("doc5.doc6.doc7.doc8.doc9.", accumulator);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ internal static void ValidateCosmosDiagnosticsContext(
Assert.IsNotNull(jObject["DiagnosticVersion"].ToString());
JToken summary = jObject["Summary"];
Assert.IsNotNull(summary["UserAgent"].ToString());
Assert.IsNotNull(summary["Operation"].ToString());
Assert.IsTrue(summary["UserAgent"].ToString().Contains("cosmos-netstandard-sdk"));
Assert.IsNotNull(summary["StartUtc"].ToString());
Assert.IsNotNull(summary["TotalElapsedTimeInMs"].ToString());
Expand Down
Loading

0 comments on commit d0bee5d

Please sign in to comment.