Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial cut of AWS Kinesis streaming #8967

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

dpbevin
Copy link
Contributor

@dpbevin dpbevin commented May 1, 2024

Relates to #8966

An initial attempt at producing a persistent stream implementation for AWS Kinesis streams. This is 100% an early PR to solicit feedback (and gauge interest?).

Tested locally against: https://github.com/etspaceman/kinesis-mock

This approach uses the AmazonKinesisClient from the .NET SDK, not the "KCL Consumer" that requires a running JVM 🤮

Includes

  • Kinesis stream provider
  • A grain-based queue checkpointer (instead of needing to require DynamoDB, EF Core, or some other approach). This checkpointer could be used by EventHub too (instead of requiring Azure Table Storage).

Remaining work

  • A few "TODO"s
  • Tests

Useful references:

Microsoft Reviewers: Open in CodeFlow

@scalalang2
Copy link
Contributor

scalalang2 commented May 15, 2024

this is what I've been waiting for. thanks :)

[PreferLocalPlacement]
public class StreamCheckpointerGrainGrain : Grain, IStreamCheckpointerGrain
{
private readonly IPersistentState<StreamCheckpointerGrainState> _state;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm here to drop a very trivial question : )
Is there any reason not to use the native checkpoint of Kinesis?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scalalang2 to my knowledge, Kinesis doesn't have the ability to "keep track" of the current position for a consumer so that it can resume from a specified position if the consumer is stopped.
Kafka has the capability (I think it's called "Commit").

But if you can point me in the direction of the Kinesis API you have in mind, I'm happy to take a look.

Copy link
Contributor

@scalalang2 scalalang2 May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dpbevin
Thanks for reply : )

to my knowledge, Kinesis doesn't have the ability to "keep track" of the current position for a consumer

You're right, I misunderstood Kinesis natively has concept of "checkpoint".

It uses DynamoDB to save checkpoint sample code official docs,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate the info.

I could have added a DynamoDB checkpointer, which would have been similar to how the Azure EventHub streaming uses Azure Table storage for checkpoints but I felt that a grain-based solution would work in more places and... my intended use of Kinesis streaming won't be able to make use of DynamoDB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree this, thx : )

I felt that a grain-based solution would work in more places

@dpbevin dpbevin force-pushed the dev/bevo/kinesis branch 2 times, most recently from 5b93e76 to 30ea2c9 Compare June 2, 2024 01:25
@dpbevin dpbevin force-pushed the dev/bevo/kinesis branch from 30ea2c9 to be54597 Compare June 2, 2024 01:28
@dpbevin
Copy link
Contributor Author

dpbevin commented Jun 2, 2024

CI build failing due to missing package even though the package can be selected from that feed locally.

Unable to find package AWSSDK.Kinesis. No packages exist with this id in source(s): dotnet-public

@scalalang2
Copy link
Contributor

scalalang2 commented Jun 2, 2024

CI build failing due to missing package even though the package can be selected from that feed locally.

Unable to find package AWSSDK.Kinesis. No packages exist with this id in source(s): dotnet-public

@ReubenBond
Could you please check this out?

@idanbenyehoshua
Copy link

Just wanted to check in and see if there’s been any progress on this. It’s something I’ve been hoping to have for a while.
Currently, I'm running on AWS and using SQS. However, I have another project where Kinesis would be a better fit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants