Skip to content

Commit

Permalink
Pose some potential complications regarding kafka sources
Browse files Browse the repository at this point in the history
  • Loading branch information
rjobanp committed Aug 21, 2024
1 parent 3362661 commit 6bc6d06
Showing 1 changed file with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ the web console to expose a guided 'create source' workflow for users where
they can select from a list of upstream tables and the console can generate
the appropriate `CREATE TABLE .. FROM SOURCE` statements for those selected.

Each `CREATE TABLE .. FROM SOURCE` statement will refer to a `SourceExport` for
a given source, and each newly-added `SourceExport` for a Source will attempt to ingest
a full snapshot of its relevant upstream data upon creation. This allows the
new `SourceExport` to correctly backfill any newly added columns or defaults set
in the upstream system, rather than returning Null values for any 'old' rows.
The aim is to maintain correctness and consistency with the upstream when a
query selects from new columns on rows that have not been updated since the
new `SourceExport` was added.

### Implementation Plan

1. Separate the planning of _sources_ and _subsources_ such that sources are fully
Expand Down Expand Up @@ -344,6 +353,43 @@ the `details` it contains. It will be up to each source implementation to map th
relevant upstream table to the correct `SourceExport`s using the `SourceExportDetails`
and output to the correct `output_index`.

#### Kafka Sources

Kafka Sources will require more refactoring to move to a model where a single Kafka Source
can output to more than one `SourceExport`.

Since Kafka Sources only ever refer to a single Kafka Topic, a user will likely only ever
add a new 'Table' to a Kafka Source to handle a schema change in the messages published
to that topic.

When a new `SourceExport` is added to a Kafka Source, the new export needs to be hydrated
from the upstream topic to ensure it has a complete view of the topic contents.

This creates several potential issues regarding the semantics of Kafka Sources:

- If a new `SourceExport` is added, its resume upper will be the minimum value. If this
resume upper is merged with the existing resume upper for the source (which is what
happens in Postgres and MySQL sources), it will cause the consumer to begin reading
from each partition at the lowest offset available (or the 'start offset' defined
on the Kafka Source). This means that we will potentially do a long rehydration
to catch up to the previous resume-upper, such that existing `SourceExports` will not
see any new data until the rehydration completes.
- There is one existing Consumer Group ID used by the Kafka Source, and the source
commits the consumer offset to the kafka broker each time the progress collection
advances through reclocking. Since the Consumer may need to read from earlier
offsets to complete the snapshot of the new `SourceExport`, it's unclear how this
may affect external progress tracking and broker behavior.
- Metadata columns are attached to the source data rows in the source render impl,
such that they would be more work to split out on a per-`SourceExport` basis.
Fortunately, decoding and envelopes are applied per-output, so are already set
up to be something that can be easily modified per `SourceExport`. It is unclear
if users would want to adjust the metadata columns
(partition, offset, headers, timestamp) included on a per `SourceExport` basis.

Open Question:

- Do we need to do anything special to handle any of the above?

#### Migration of source statements and collections

We would generate a catalog migration to generate new `CREATE TABLE` statements where
Expand Down

0 comments on commit 6bc6d06

Please sign in to comment.