Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preview - Change Feed Pull model #1210

Merged
merged 62 commits into from
Feb 25, 2020
Merged

Preview - Change Feed Pull model #1210

merged 62 commits into from
Feb 25, 2020

Conversation

ealsur
Copy link
Member

@ealsur ealsur commented Feb 13, 2020

Change Feed pull model

For updated samples see #1355

Description

This PR adds support to consume the Change Feed through a pull model, under the Preview flag.

FeedToken and parallelization

The PR also introduces the FeedToken concept as a consumption and parallelization unit. The token represents a range inside a Container (it could be the full Container or part) and it also serves as continuation support.
The FeedToken can be used to consume the Change Feed and a user can obtain a list of these tokens by calling:

Task<IReadOnlyList<FeedToken>> Container.GetFeedTokensAsync(CancellationToken cancellationToken = default(CancellationToken));

This method provides a list of Tokens that can be used to parallelize Change Feed consumption FeedIterator. The FeedIterator is used to consume the Change Feed for a particular FeedToken. This practically allows computational distribution of tokens across multiple threads or even machines:

IReadOnlyList<FeedToken> tokens = await container.GetFeedTokensAsync();

// Machine or Thread 1
FeedIterator iteratorForToken = container.GetChangeFeedStreamIterator(tokens[0]);
while (iteratorForToken.HasMoreResults)
{
    // Stream iterator returns a response with status code
    using(ResponseMessage response = await iteratorForToken .ReadNextAsync())
    {
        if(response.IsSuccessStatusCode)
        {
            // Consume response.Content stream
        }
    }
}

// Machine or Thread 2
FeedIterator iteratorForToken = container.GetChangeFeedStreamIterator(tokens[1]);
while (iteratorForToken.HasMoreResults)
{
    // Stream iterator returns a response with status code
    using(ResponseMessage response = await iteratorForToken .ReadNextAsync())
    {
        if(response.IsSuccessStatusCode)
        {
            // Consume response.Content stream
        }
    }
}

The FeedToken can be captured from an existing iterator (FeedIterator.FeedToken) and saved/stored for later use:

FeedIterator iteratorForToken = container.GetChangeFeedStreamIterator(tokens[0]);
while (iteratorForToken.HasMoreResults)
{
    // Stream iterator returns a response with status code
    using(ResponseMessage response = await iteratorForToken .ReadNextAsync())
    {
        if(response.IsSuccessStatusCode)
        {
            // Consume response.Content stream
        }
    }
}

FeedToken lastProcessedToken = iteratorForToken.FeedToken;

// Some time later
FeedIterator iteratorThatResumesFromLastPoint= container.GetChangeFeedStreamIterator(lastProcessedToken);

FeedTokens can also be serialized if needed depending on the storing mechanism:

FeedToken lastProcessedToken = iteratorForToken.FeedToken;
string serialization = lastProcessedToken.ToString();

FeedToken restoredToken = FeedToken.FromString(serialization);

Consume the entire Container

While the GetFeedTokensAsync approach allows parallelization, in some cases the user might just want 1 consumer of the Change Feed, with no parallelization.

In that case, it is as simple as creating a FeedIterator without any FeedToken as input:

FeedIterator iteratorForTheEntireContainer= container.GetChangeFeedStreamIterator();
while (iteratorForToken.HasMoreResults)
{
    // Stream iterator returns a response with status code
    using(ResponseMessage response = await iteratorForToken.ReadNextAsync())
    {
        if(response.IsSuccessStatusCode)
        {
            // Consume response.Content stream
        }
    }
}

This iterator also provides the FeedIterator.FeedToken to store and resume at a later point in time.

Change Feed for a Partition Key

This PR also enables the same construct to read the Change Feed for a particular Partition Key, again, it's an overload of the GetChangeFeedStreamIterator / GetChangeFeedIterator:

FeedIterator iteratorForTheEntireContainer= container.GetChangeFeedStreamIterator(new PartitionKey("myPartitionKeyValueToRead");

Support for POCOs

The Change Feed iterator comes in two flavors, one with Stream support (returns ResponseMessage on ReadNextAsync) and one for POCO Types (returns FeedResponse<T> on ReadNextAsync):

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator();

FeedIterator<MyPOCO> iteratorWithPOCOS = container.GetChangeFeedIterator<MyPOCO>();

Scale support

If a FeedToken represents multiple units of Change Feed consumption, the IEnumerable<FeedToken> FeedToken.Scale() can be used to attempt a Scale out.

This is useful if we initially start reading the Change Feed for the entire Container, and we later want to try and parallelize the consumption:

FeedIterator iteratorForTheEntireContainer= container.GetChangeFeedStreamIterator();
while (iteratorForToken.HasMoreResults)
{
}

// Scale out
FeedToken lastUpdatedFeedToken = iteratorForTheEntireContainer.FeedToken;
IReadOnlyList<FeedToken> childTokens = lastUpdatedFeedToken.Scale();
if (childTokens.Count > 0)
{
    // Start new iterators with each token
}

Partition Key Ranges

For monitoring purposes, it is also now possible to obtain which are the Partition Key Ranges that a particular FeedToken represents, through the Container.GetPartitionKeyRangesAsync method, that takes a FeedToken as parameter:

FeedIterator iterator = container.GetChangeFeedStreamIterator(someInitialFeedToken);
while (iterator.HasMoreResults)
{
    // Stream iterator returns a response with status code
    using(ResponseMessage response = await iterator.ReadNextAsync())
    {
        if(response.IsSuccessStatusCode)
        {
            // Consume response.Content stream
            IEnumerable<string> partitionKeyRanges = await container.GetPartitionKeyRangesAsync(iterator.FeedToken);
        }
    }
}

Type of change

  • New feature (non-breaking change which adds functionality)

Closing issues

Closes #972
Closes #831

@WimVergouwe
Copy link

@ealsur, thanks for all the work on this. I saw you released 3.8.0 yesterday, but this was not included. When can we expect this? This is really blocking us to upgrade from v2...

@ealsur
Copy link
Member Author

ealsur commented Apr 20, 2020

@WimVergouwe We are working on adding more features related to this and the API changed, see the latest linked PR.

@WimVergouwe
Copy link

@ealsur, any update on this? We're still stuck on v2...

@ealsur
Copy link
Member Author

ealsur commented Jul 1, 2020

@WimVergouwe sadly we could not GA just yet, there are new features on the service side which are coming soon (see //Build change feed announcements) so we are making sure our current API model is compatible with them.

@mdbill
Copy link

mdbill commented Aug 26, 2020

Also waiting for this. A quick look at 3.12.0 indicated still in preview.

@zbynek001
Copy link

In v2, we have our own custom logic to process change feed across all partitions and we're also handling partition splits our self.
We're iterating DocumentClient.ReadPartitionKeyRangeFeedAsync and checking partitionIds in PartitionKeyRange.Parents to find the split partitions. But in v3 everything needed is marked as internal.

This is currently blocking us from moving to v3.
Any chance this will change? Or is there another way how to handle partition splits?

@ealsur
Copy link
Member Author

ealsur commented Sep 14, 2020

@zbynek001 The APIs in V3 are split-proof, so when you get the FeedRanges, you can start the iterators and you won't face splits. Reference https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed-pull-model

@zbynek001
Copy link

Yes, but the FeedRange is not scaling well. If we have at the beginning collection with 1 partition, we get 1 FeedRange. If the collection grows over time to thousands of partitions, we still have that one FeedRange, so we cannot spread the processing over several machines.
Another issue is that with FeedRange, we are not in control of switching between partition to process, so processing of one partition might be far ahead then another partition, which is not desired in our case

@ealsur
Copy link
Member Author

ealsur commented Sep 15, 2020

For scaling as the partitions grow, we are having the discussion here #1680

Now for the second point you mention, it's the same behavior with V2 PartitionKeyRangeId, each one can be consumed independently and the continuation is independent. If you have 2 FeedRanges, each one is independent of the other. What you are looking for is probably what we are discussing in the scaling out support.

@benjifarmer
Copy link

I have run into a problem when using the pull model.

I initiate the feed change iterator with a specific partition key, but I get changes from other, very similar, partitions.

For example, my partitions begin with a 12 character code like B3FAZLSZTZHYJL and are followed by another identifier such as "current." If I ask for only the partition "B3FAZLSZTZHYJLcurrent," I will get other changes that begin with "B3FAZLSZTZHYJL." For example, I get the changes for partition B3FAZLSZTZHYJL0006490603.

My guess is that this is due to how the FeedRange interprets the partition key and derives a range. Is there a workaround you could suggest for this?

@ealsur
Copy link
Member Author

ealsur commented Oct 7, 2020

@benjifarmer Can you create an Issue for this? This is a PR comment 😄

@benjifarmer
Copy link

Sure! But I think I may have been mistaken.

When I call GetFeedRangesAsync I see that I only have one physical partition, so this might be the expected behavior? Or should defining a partition key on GetChangeFeedIterator only return changes from the specific partition key regardless of how many physical partitions there are?

@ealsur
Copy link
Member Author

ealsur commented Oct 7, 2020

It should be returning changes just for that Partition Key value. Similar issue: #1796

@benjifarmer
Copy link

I can confirm #1796 is a duplicate of my issue. I have reverted to 3.11.0-preview and everything works fine. I'll be patiently awaiting the next preview release. Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ChangeFeed feature-request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow consumption of Change Feed as a in-memory push model Allow consumption of Change Feed as a pull model
10 participants