Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connectors: Add Kinesis Source and Sink #234

Merged
merged 13 commits into from
Aug 14, 2023
Merged

connectors: Add Kinesis Source and Sink #234

merged 13 commits into from
Aug 14, 2023

Conversation

jacksonrnewhouse
Copy link
Contributor

This adds support for Kinesis via a source and sink. Sources and Sinks are configured via the name of the stream.
Source also takes a "source.offset" of either "Earliest" or "Latest", similar to kafka.

Kinesis Source

Execution Flow

The source operates by having an active future for every open shard the operator owns. These futures are all advanced via a FuturesUnordered, and the futures are all just labeled bits of compute, without any data that should be owned by the overall control loop. This lets us take accurate snapshots without waiting for the futures to complete. Unfortunately, the futures produced by calling something like async fn my_async_function(&self) are always owned by self. The BoxedFuture gets around this, at the cost of some complexity.

In order to find the affiliated shard state, the futures are returned with a name, which corresponds to the shard_id.

This approach is similar to what I implemented for the Filesystem Sink. Might be worth figuring out how to better structure it or if there's a replacement we prefer.

Semantics

The operator consumes each shard of data in order, similar to the Kafka source. The set of shards that a subtask is responsible for is determined solely by the hash of the shard_id. Flink defaults to this behavior, but also has an option to evenly divide the hash space.

There are no order guarantees for how data is read off of different shards. While we eagerly fetch from all live shards, it is possible that one might fall behind. Since watermarks are at the subtask level, it is possible that a shard that falls behind will have its data dropped. Finally, Kinesis has the notion of "parent" and "child" shards, with the child shard having data after the parents. Ensuring this while letting child shards be on different subtasks is not currently possible, and no effort was made to ensure this.

Kinesis Sink

Execution Flow

The Kinesis sink operates in batches. Right now it waits until one of the following conditions: 500 messages, 4.5MB of data, or 1s has passed. The first two are dictated by the constraints on the PutRecords method, while the timeout is there to ensure we don't have messages linger in the sink.

The PutRecords method allows for partial success of writes, often because of capacity limits on some but not all of the shards in the stream. Once we decide to flush we currently try repeatedly to finish the batch. While this is happening no new messages are consumed, leading to back-pressure.

Semantics

Because the batch puts can succeed only some of the records, the output order is not guaranteed. Additionally, because Kinesis requires a key, if there isn't one in the Pipeline DAG we add a UUID4.

Right now if there's a key over 256 bytes we won't be able to write. I think our SQL solution will, in general, use the UUID4, but when we add partitioning support, we should have a plan for this.

@jacksonrnewhouse jacksonrnewhouse force-pushed the kinesis branch 3 times, most recently from 68d4789 to a091165 Compare August 9, 2023 00:17
Copy link
Member

@mwylde mwylde left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial review of just the source operator. I'll spent some more time later today trying to fully understand the control flow there, and digging into the sink.

arroyo-connectors/resources/kinesis.svg Outdated Show resolved Hide resolved
arroyo-connectors/src/kinesis.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/source/mod.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/source/mod.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/source/mod.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/source/mod.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/source/mod.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/source/mod.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/source/mod.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/source/mod.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/sink/mod.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/sink/mod.rs Outdated Show resolved Hide resolved
arroyo-worker/src/connectors/kinesis/sink/mod.rs Outdated Show resolved Hide resolved
@jacksonrnewhouse jacksonrnewhouse enabled auto-merge (squash) August 14, 2023 22:20
@jacksonrnewhouse jacksonrnewhouse merged commit 539ec5b into master Aug 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants