-
Notifications
You must be signed in to change notification settings - Fork 468
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
storage: Design doc for subsource deprecation & blue-green source sch…
…ema changes
- Loading branch information
Showing
1 changed file
with
318 additions
and
0 deletions.
There are no files selected for viewing
318 changes: 318 additions & 0 deletions
318
doc/developer/design/20240625_subsource_deprecation_blue_green_sources.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,318 @@ | ||
# Subsource deprecation & source blue-green schema changes | ||
|
||
This design encompases 2 existing proposals also known as | ||
'Subsource Demuxing' and 'Subsources as Tables'. | ||
|
||
Associated: | ||
- https://github.com/MaterializeInc/materialize/issues/20208 | ||
- https://github.com/MaterializeInc/materialize/issues/24843 | ||
- https://github.com/MaterializeInc/materialize/issues/15897 | ||
- https://github.com/MaterializeInc/materialize/issues/17021 | ||
|
||
## The Problems | ||
|
||
TLDR: | ||
- Subsources are confusing and not implemented uniformly across source types | ||
- It's impossible to do a blue-green workflow in sources for handling | ||
schema-changes in upstream systems | ||
|
||
Users currently have to learn about *sources*, *subsources*, *tables*, | ||
*materialized views*, *indexes*, and *sinks* when onboarding to Materialize. | ||
|
||
If they'd like to ingest data from both an upstream Postgres/MySQL database | ||
and a Kafka topic, they also need to learn the fragmented user interface | ||
for dealing with data ingested from upstream data sources. Our existing | ||
model of a *source* does not provide a consistent interface among source types. | ||
|
||
Single-output Kafka sources put their data under the source's primary relation, | ||
whereas multi-output Postgres/MySQL sources do not allow querying the source's primary | ||
relation and instead contain *subsources*, an entirely separate concept. | ||
|
||
Our existing *source* models also make it difficult to deal with upstream schema changes, | ||
such that users need to drop and recreate a *source* or *subsource* and any dependent objects | ||
to begin ingesting data with a new schema. This can cause long-periods of downtime if the | ||
new source needs to reingest the entire upstream history. This can also cause them to | ||
lose their _reclocking_ information which makes it impossible to implement a correct | ||
end-to-end *source* -> *sink* pipeline. | ||
|
||
This change aims to simplify the user experience and create a more unified | ||
model for managing data ingested from an upstream source, while providing | ||
more flexibility in handling upstream schema changes. | ||
|
||
|
||
## Success Criteria | ||
|
||
- The mental overhead & learning time for users to ingest data from an upstream system | ||
is decreased. | ||
- All source types use a consistent interface to retrieve progress information and | ||
ingested data. The primary *source* object the represents the same concept across | ||
all source types. | ||
- The same upstream table/topic can be ingested into multiple collections in Materialize, | ||
each with a potentially different projection of the upstream schema. This enables | ||
a user to do blue-green schema-change workflows. | ||
- *sources* can maintain their progress collection while the set of collections being | ||
ingested are modified, allowing an end-to-end *source* -> *sink* schema-change workflow | ||
that does not emit redundant data (reusable reclocking). | ||
|
||
## Out of Scope | ||
|
||
<!-- | ||
What does a solution to this problem not need to address in order to be | ||
successful? | ||
It's important to be clear about what parts of a problem we won't be solving | ||
and why. This leads to crisper designs, and it aids in focusing the reviewer. | ||
--> | ||
|
||
- Accomplishing 'subsource demuxing' using the concept of *subsources* in-use today. | ||
The goals of https://github.com/MaterializeInc/materialize/issues/24843 will be | ||
accomplished during the process of replacing *subsources* with *tables* (see | ||
solution proposal below). | ||
|
||
- Dealing with Webhook sources. These are fundamentally different than the other | ||
storage-owned source types whose operators implement the `SourceRender` trait. | ||
While they may also be conveted to 'write to tables' in the future, that work | ||
is not in-scope for this proposal. | ||
|
||
|
||
## Solution Proposal | ||
|
||
The concept of a *source* will be unified around a single relation representing | ||
the *progress* of that source's ingestion pipeline. A query of any *source* | ||
object (of any type) will return the data that is currently in the | ||
`<source>_progress` collection. | ||
|
||
We have been investing in the user experience around *tables*, such that | ||
replacing *subsources* with *tables* would reduce the overall | ||
mental overhead for new users. | ||
|
||
By allowing tables to be marked as *read-only*, we can easily migrate | ||
existing subsources to be tables without needing to figure out how to allow | ||
`INSERT/UPDATE/DELETE` statements for them merge with the data | ||
written by their corresponding source. | ||
|
||
The default `CREATE SOURCE ..` statement will just create a top-level source object | ||
that represents a single ingestion of data from an upstream system. To actually | ||
ingest data from that upstream system into a Materialize collection, the user will | ||
use something akin to | ||
`CREATE TABLE <table> (<cols>) OF SOURCE <source> WITH (<upstream_ref>)`. | ||
|
||
We will allow more than one table to reference the same `<upstream_ref>`, using a | ||
potentially different set of columns. This allows a user to handle an upstream | ||
schema change by creating a new table for the same upstream table and then performing | ||
a blue-green swap operation to switch their downstream dependencies to the new table, | ||
and then drop the old one. | ||
|
||
The existing options to `CREATE SOURCE` for Postgres & MySQL sources that specify | ||
which tables to ingest (`FOR ALL TABLES`, `FOR SCHEMAS` and `FOR TABLES`) will be | ||
removed. Instead, all upstream tables to ingest from this source will need to be | ||
explicitly referenced in a `CREATE TABLE` statement. While this may seem controversial, | ||
more often than not these options cause upstream data that does not need to be | ||
ingested into Materialize to be so, and by using explicit statements for each | ||
table to be ingested it makes Materialize configuration much more amenable to | ||
object<->state mappings in tools like DBT and Terraform. | ||
|
||
We can eventually reintroduce syntactic sugar to perform a similar function to | ||
`FOR ALL TABLES` and `FOR SCHEMAS` if necessary. | ||
|
||
|
||
### Implementation Plan | ||
|
||
1. Separate the planning of *sources* and *subsources* such that sources are fully | ||
planned before any linked subsources. Part of this work has been prototyped in: | ||
https://github.com/MaterializeInc/materialize/pull/27320 | ||
|
||
2. Introduce the concept of *read-only tables* that link to a source ingestion | ||
and an upstream 'reference' specifying which upstream table to ingest into this table. | ||
|
||
3. Update the CREATE TABLE statement to allow creation of a read-only table | ||
with a reference to a source and an upstream-reference: | ||
(`CREATE TABLE <new_table> FROM SOURCE <source> (upstream_reference)`) | ||
|
||
4. Update planning for `CREATE TABLE` to include a purification step akin to the | ||
purification for `ALTER SOURCE .. ADD SUBSOURCE`. This will verify the upstream | ||
permissions, schema, etc for the newly added source-fed table. | ||
|
||
5. Update the underlying *source* (ingestion) and source_export/ingestion_export | ||
structures to allow more than one *source_export* to refer to an upstream | ||
reference, and each *source_export* to have its own projection of the upstream data | ||
|
||
5. Update the storage controller to use both *subsources* and *read-only tables* | ||
as *source_exports* for existing multi-output sources (postgres & mysql) | ||
|
||
6. Update the source rendering operators to handle the new structures and allow | ||
outputting the same upstream table to more than one souce export. | ||
|
||
7. Migrate existing sources to the new source model (make all sources 'multi-output' sources): | ||
- For existing sources where the primary relation has the data (e.g. Kafka sources): | ||
- Create a read-only table for the current primary relation called `<source>` | ||
- Rename the source itself `<source>_progress` and drop the existing | ||
`<source>_progress` object | ||
- For existing multi-output sources (e.g. Postgres & MySQL sources): | ||
- Convert subsources to read-only tables with the same name | ||
- Rename the source itself `<source>_progress` and drop the existing | ||
`<source>_progress` object | ||
|
||
8. Remove subsource purification logic from `purify_create_source`, subsource statement parsing, | ||
and related planning code | ||
|
||
|
||
### Core Implementation Details | ||
|
||
#### SQL Parsing & Planning | ||
|
||
Our existing `CREATE SUBSOURCE` statements look like this: | ||
```sql | ||
CREATE SUBSOURCE <name> (<cols>) OF SOURCE <source_name> WITH (EXTERNAL REFERENCE = <upstream table name>) | ||
``` | ||
though these are mostly hidden from users, they are available if a user runs `SHOW CREATE SOURCE <existing_subsource>` | ||
|
||
Our existing `CREATE TABLE` statement looks like: | ||
```sql | ||
CREATE TABLE <name> (<cols>) WITH (<options>) | ||
``` | ||
|
||
We would update the `CREATE TABLE` statement to look similar to `CREATE SUBSOURCE` and | ||
be able to optionally reference an upstream source and reference using `OF SOURCE` and `EXTERNAL REFERENCE`: | ||
```sql | ||
CREATE TABLE <name> (<cols>) OF SOURCE <source_name> WITH (EXTERNAL REFERENCE = <upstream table name>) | ||
``` | ||
`<cols>` can be optionally specified by the user to request a subset of the upstream table's | ||
columns, but will not be permitted to include user-specified column types, since these will | ||
be determined by the upstream source details. | ||
|
||
`CreateTableStatement` would be updated to include the new option type `CreateTableOptionName::ExternalReference` and a `pub of_source: Option<T::ItemName>` | ||
field to reference the optional upstream source. | ||
|
||
We would then introduce a new `TableDataSource` enum and add a field to the `Table` objects | ||
used for the in-memory catalog and SQL planning: | ||
```rust | ||
pub enum TableDataSource { | ||
/// The table owns data created via INSERT/UPDATE/DELETE statements | ||
TableWrites, | ||
|
||
/// The table receives its data from the identified ingestion, specifically | ||
/// the output identified by `external_reference`, using the projection | ||
/// `external_projection` to map the upstream relation to this relation. | ||
/// This table type does not support INSERT/UPDATE/DELETE statements. | ||
IngestionExport { | ||
ingestion_id: GlobalId, | ||
external_reference: UnresolvedItemName, | ||
external_projection: Vec<u8>, | ||
} | ||
} | ||
|
||
pub struct Table { | ||
pub create_sql: Option<String>, | ||
pub desc: RelationDesc, | ||
... | ||
pub data_source: TableDataSource | ||
} | ||
``` | ||
|
||
The planning for `CREATE TABLE` will be adjusted to include a purification step | ||
akin to the purification in `purify_alter_source`. This will also map any | ||
specified columns into a projection of the upstream table's column order. | ||
This will also verify the upstream permissions, schema, etc of the upstream table, | ||
and generate a new `details` for the top-level source, that is merged with the | ||
top-level source during sequencing (how alter source works now). | ||
|
||
|
||
#### Storage Collection Coordination | ||
|
||
The coordinator's `bootstrap_storage_collections` will be updated to handle | ||
tables with the `TableDataSource::IngestionExport { .. }`, mapping them to | ||
a storage controller `CollectionDescription` with a `DataSource::IngestionExport` | ||
rather than the existing `DataSourceOther::TableWrites` value used for all tables. | ||
|
||
The storage controller's `DataSource::IngestionExport` struct will be updated to | ||
include an optional projection, populated based on the table projection mentioned above: | ||
```rust | ||
IngestionExport { | ||
ingestion_id: GlobalId, | ||
// This is an `UnresolvedItemName` because the structure works for PG, | ||
// MySQL, and load generator sources. However, in the future, it should | ||
// be sufficiently genericized to support all multi-output sources we | ||
// support. | ||
external_reference: UnresolvedItemName, | ||
external_projection: Option<Vec<u8>>, | ||
}, | ||
``` | ||
|
||
The `create_collections` method in the storage client currently maps a | ||
`DataSource::IngestionExport` into a storage `SourceExport` struct and then | ||
inserts this into the appropriate `IngestionDescription` of the top-level | ||
source. This is used in source rendering to figure out what upstream | ||
reference should be mapped to a specific output collection. | ||
|
||
The storage `SourceExport` struct will be updated to include an optional | ||
projection field, populated from the `IngestionExport`'s `external_projection`: | ||
```rust | ||
pub struct SourceExport<O: proptest::prelude::Arbitrary, S = ()> { | ||
/// Which output from the ingestion this source refers to. | ||
pub ingestion_output: O, | ||
/// The collection metadata needed to write the exported data | ||
pub storage_metadata: S, | ||
/// How to project the ingestion output to this export's collection. | ||
pub output_projection: Option<Vec<u8>>, | ||
} | ||
``` | ||
|
||
#### Source Rendering | ||
|
||
Currently, when an `IngestionDescription` is used inside `build_ingestion_dataflow` | ||
(the method that renders the various operators needed to run a source ingestion), | ||
it determines the `output_index` of each collection to be output by the ingestion | ||
by calling the `IngestionDescription::source_exports_with_output_indices` method. | ||
|
||
This method maps the `external_reference` (the upstream table name) of each `SourceExport` | ||
to the index of that upstream table in the source's `details` struct (which is created | ||
during purification -- its a vector of upstream table descriptions). | ||
|
||
Then inside each source implementation, we currently assume that each | ||
`SourceExport` uniqely corresponds to a single `output_index`, and that each `output_index` | ||
corresponds to the same index of the upstream table in the source's upstream `details`. | ||
|
||
This assumption is part of what prevents an upstream table from being ingested into | ||
more than one collection. | ||
|
||
We would update each source implementation to assume that more than one `SourceExport` | ||
can share an `output_index` and we would update the `source_exports` map created | ||
by `source_exports_with_output_indices` to include the `details` index explicitly. | ||
|
||
The `build_ingestion_dataflow` method currently demuxes the output collection of | ||
each source by each `output_index` and pushes each data stream to the appropriate | ||
`persist_sink`. We would update this method so that it applies the `output_projection` | ||
for each `SourceExport` on each data stream before being pushed to its `persist_sink`. | ||
|
||
#### Migration of source names (progress collections) & kafka source -> table migration | ||
|
||
TODO | ||
|
||
## Minimal Viable Prototype | ||
|
||
|
||
## Alternatives | ||
|
||
<!-- | ||
What other solutions were considered, and why weren't they chosen? | ||
This is your chance to demonstrate that you've fully discovered the problem. | ||
Alternative solutions can come from many places, like: you or your Materialize | ||
team members, our customers, our prospects, academic research, prior art, or | ||
competitive research. One of our company values is to "do the reading" and | ||
to "write things down." This is your opportunity to demonstrate both! | ||
--> | ||
|
||
## Open questions | ||
|
||
<!-- | ||
What is left unaddressed by this design document that needs to be | ||
closed out? | ||
When a design document is authored and shared, there might still be | ||
open questions that need to be explored. Through the design document | ||
process, you are responsible for getting answers to these open | ||
questions. All open questions should be answered by the time a design | ||
document is merged. | ||
--> |