Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed: Fixes pull model request count for NotModified scenario #2405

Merged
merged 17 commits into from
Apr 23, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -69,54 +69,59 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
ChangeFeedPage backendPage = crossFeedRangePage.Page;
if (backendPage is ChangeFeedNotModifiedPage)
{
using (ITrace drainNotModifedPages = changeFeedMoveNextTrace.StartChild("Drain NotModified Pages", TraceComponent.ChangeFeed, TraceLevel.Info))
// Keep draining the cross partition enumerator until
// We get a non 304 page or we loop back to the same range or run into an exception
FeedRangeInternal originalRange = this.crossPartitionEnumerator.CurrentRange;
FeedRangeInternal nextRange = this.crossPartitionEnumerator.NextRange;
// No point on draining when the state has 1 range
if (!originalRange.Equals(nextRange))
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
// Keep draining the cross partition enumerator until
// We get a non 304 page or we loop back to the same range or run into an exception
FeedRangeInternal originalRange = this.crossPartitionEnumerator.CurrentRange;
double totalRequestCharge = backendPage.RequestCharge;
do
using (ITrace drainNotModifedPages = changeFeedMoveNextTrace.StartChild("Drain NotModified Pages", TraceComponent.ChangeFeed, TraceLevel.Info))
{
if (!await this.crossPartitionEnumerator.MoveNextAsync(drainNotModifedPages))
double totalRequestCharge = backendPage.RequestCharge;
do
{
throw new InvalidOperationException("ChangeFeed should always have a next page.");
if (!await this.crossPartitionEnumerator.MoveNextAsync(drainNotModifedPages))
{
throw new InvalidOperationException("ChangeFeed should always have a next page.");
}

monadicCrossPartitionPage = this.crossPartitionEnumerator.Current;
if (monadicCrossPartitionPage.Failed)
{
// Buffer the exception, since we need to return the request charge so far.
this.bufferedException = TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>.FromException(monadicCrossPartitionPage.Exception);
}
else
{
crossFeedRangePage = monadicCrossPartitionPage.Result;
backendPage = crossFeedRangePage.Page;
totalRequestCharge += backendPage.RequestCharge;
}
}
while (!(backendPage is ChangeFeedSuccessPage
|| this.crossPartitionEnumerator.NextRange.Equals(originalRange)
|| this.bufferedException.HasValue));

monadicCrossPartitionPage = this.crossPartitionEnumerator.Current;
if (monadicCrossPartitionPage.Failed)
// Create a page with the aggregated request charge
if (backendPage is ChangeFeedSuccessPage changeFeedSuccessPage)
{
// Buffer the exception, since we need to return the request charge so far.
this.bufferedException = TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>.FromException(monadicCrossPartitionPage.Exception);
backendPage = new ChangeFeedSuccessPage(
changeFeedSuccessPage.Content,
totalRequestCharge,
changeFeedSuccessPage.ActivityId,
changeFeedSuccessPage.AdditionalHeaders,
changeFeedSuccessPage.State);
}
else
{
crossFeedRangePage = monadicCrossPartitionPage.Result;
backendPage = crossFeedRangePage.Page;
totalRequestCharge += backendPage.RequestCharge;
backendPage = new ChangeFeedNotModifiedPage(
totalRequestCharge,
backendPage.ActivityId,
backendPage.AdditionalHeaders,
backendPage.State);
}
}
while (!(backendPage is ChangeFeedSuccessPage
|| this.crossPartitionEnumerator.CurrentRange.Equals(originalRange)
|| this.bufferedException.HasValue));

// Create a page with the aggregated request charge
if (backendPage is ChangeFeedSuccessPage changeFeedSuccessPage)
{
backendPage = new ChangeFeedSuccessPage(
changeFeedSuccessPage.Content,
totalRequestCharge,
changeFeedSuccessPage.ActivityId,
changeFeedSuccessPage.AdditionalHeaders,
changeFeedSuccessPage.State);
}
else
{
backendPage = new ChangeFeedNotModifiedPage(
totalRequestCharge,
backendPage.ActivityId,
backendPage.AdditionalHeaders,
backendPage.State);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public CrossPartitionRangePageAsyncEnumerator(

public FeedRangeInternal CurrentRange { get; private set; }

public FeedRangeInternal NextRange { get; private set; }
ealsur marked this conversation as resolved.
Show resolved Hide resolved

public ValueTask<bool> MoveNextAsync()
{
return this.MoveNextAsync(NoOpTrace.Singleton);
Expand All @@ -120,6 +122,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
{
this.Current = default;
this.CurrentRange = default;
this.NextRange = default;
return false;
}

Expand Down Expand Up @@ -185,6 +188,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)

this.Current = TryCatch<CrossFeedRangePage<TPage, TState>>.FromException(currentPaginator.Current.Exception);
this.CurrentRange = currentPaginator.FeedRangeState.FeedRange;
this.NextRange = CrossPartitionRangePageAsyncEnumerator<TPage, TState>.GetNextRange(enumerators);
return true;
}

Expand Down Expand Up @@ -214,6 +218,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
this.Current = TryCatch<CrossFeedRangePage<TPage, TState>>.FromResult(
new CrossFeedRangePage<TPage, TState>(currentPaginator.Current.Result, crossPartitionState));
this.CurrentRange = currentPaginator.FeedRangeState.FeedRange;
this.NextRange = CrossPartitionRangePageAsyncEnumerator<TPage, TState>.GetNextRange(enumerators);
return true;
}
}
Expand All @@ -236,6 +241,17 @@ private static bool IsSplitException(Exception exeception)
&& (cosmosException.SubStatusCode == (int)Documents.SubStatusCodes.PartitionKeyRangeGone);
}

private static FeedRangeInternal GetNextRange(IQueue<PartitionRangePageAsyncEnumerator<TPage, TState>> enumerators)
{
if (enumerators == null
|| enumerators.Count == 0)
{
return default;
}

return enumerators.Peek()?.FeedRangeState.FeedRange;
}

private interface IQueue<T> : IEnumerable<T>
{
T Peek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,23 +185,8 @@
│ │ [Client Side Request Stats]
│ │ Redacted To Not Change The Baselines From Run To Run
│ │ )
│ ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ └── [05C1E7FFFFFFFA,FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ └── Microsoft.Azure.Cosmos.Handlers.RequestInvokerHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ ├── Get Partition Key Range Cache(00000000-0000-0000-0000-000000000000) Routing-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ ├── Get Collection Cache(00000000-0000-0000-0000-000000000000) Routing-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ ├── Try Get Overlapping Ranges(00000000-0000-0000-0000-000000000000) Routing-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ └── Microsoft.Azure.Cosmos.Handlers.DiagnosticsHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ └── Microsoft.Azure.Cosmos.Handlers.RetryHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ └── Microsoft.Azure.Cosmos.Handlers.RouterHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ └── Microsoft.Azure.Cosmos.Handlers.TransportHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ └── Microsoft.Azure.Documents.ServerStoreModel Transport Request(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ │ (
│ │ [Client Side Request Stats]
│ │ Redacted To Not Change The Baselines From Run To Run
│ │ )
│ └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ └── [,05C1CFFFFFFFF8) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ └── [05C1E7FFFFFFFA,FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ └── Microsoft.Azure.Cosmos.Handlers.RequestInvokerHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ ├── Get Partition Key Range Cache(00000000-0000-0000-0000-000000000000) Routing-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
│ ├── Get Collection Cache(00000000-0000-0000-0000-000000000000) Routing-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds
Expand Down Expand Up @@ -1548,119 +1533,6 @@
]
}
]
},
{
"name": "MoveNextAsync",
"id": "00000000-0000-0000-0000-000000000000",
"caller info": {
"member": "MemberName",
"file": "FilePath",
"line": 42
},
"start time": "12:00:00:000",
"duration in milliseconds": 0,
"children": [
{
"name": "[,05C1CFFFFFFFF8) move next",
"id": "00000000-0000-0000-0000-000000000000",
"caller info": {
"member": "MemberName",
"file": "FilePath",
"line": 42
},
"start time": "12:00:00:000",
"duration in milliseconds": 0,
"children": [
{
"name": "Microsoft.Azure.Cosmos.Handlers.RequestInvokerHandler",
"id": "00000000-0000-0000-0000-000000000000",
"start time": "12:00:00:000",
"duration in milliseconds": 0,
"children": [
{
"name": "Get Partition Key Range Cache",
"id": "00000000-0000-0000-0000-000000000000",
"caller info": {
"member": "MemberName",
"file": "FilePath",
"line": 42
},
"start time": "12:00:00:000",
"duration in milliseconds": 0
},
{
"name": "Get Collection Cache",
"id": "00000000-0000-0000-0000-000000000000",
"caller info": {
"member": "MemberName",
"file": "FilePath",
"line": 42
},
"start time": "12:00:00:000",
"duration in milliseconds": 0
},
{
"name": "Try Get Overlapping Ranges",
"id": "00000000-0000-0000-0000-000000000000",
"caller info": {
"member": "MemberName",
"file": "FilePath",
"line": 42
},
"start time": "12:00:00:000",
"duration in milliseconds": 0
},
{
"name": "Microsoft.Azure.Cosmos.Handlers.DiagnosticsHandler",
"id": "00000000-0000-0000-0000-000000000000",
"start time": "12:00:00:000",
"duration in milliseconds": 0,
"children": [
{
"name": "Microsoft.Azure.Cosmos.Handlers.RetryHandler",
"id": "00000000-0000-0000-0000-000000000000",
"start time": "12:00:00:000",
"duration in milliseconds": 0,
"children": [
{
"name": "Microsoft.Azure.Cosmos.Handlers.RouterHandler",
"id": "00000000-0000-0000-0000-000000000000",
"start time": "12:00:00:000",
"duration in milliseconds": 0,
"children": [
{
"name": "Microsoft.Azure.Cosmos.Handlers.TransportHandler",
"id": "00000000-0000-0000-0000-000000000000",
"start time": "12:00:00:000",
"duration in milliseconds": 0,
"children": [
{
"name": "Microsoft.Azure.Documents.ServerStoreModel Transport Request",
"id": "00000000-0000-0000-0000-000000000000",
"caller info": {
"member": "MemberName",
"file": "FilePath",
"line": 42
},
"start time": "12:00:00:000",
"duration in milliseconds": 0,
"data": {
"Client Side Request Stats": "Redacted To Not Change The Baselines From Run To Run"
}
}
]
}
]
}
]
}
]
}
]
}
]
}
]
}
]
}
Expand Down
Loading