Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: deprecate StreamChunkWithState for source state #14384

Closed
tabVersion opened this issue Jan 5, 2024 · 1 comment · Fixed by #14524
Closed

refactor: deprecate StreamChunkWithState for source state #14384

tabVersion opened this issue Jan 5, 2024 · 1 comment · Fixed by #14524

Comments

@tabVersion
Copy link
Contributor

Background and Motivation

In previous implementations, we use StreamChunkWithState, containing the payload as StreamChunk and an offset mapping (from split_id to the latest offset per partition). We truncate the offset per batch and only keep the last one, which brings us the limitation that we cannot do truncation anywhere after going into source exec while maintaining the semantic of exactly once.

based on additional columns(#14215), I propose to derive partition (file for fs source) and offset for all sources and read the partition and offset info from the stream chunk itself rather than a separate struct. It allows not to treat the chunk as a whole, both throttling (#13800) and reusable source (risingwavelabs/rfcs#72) can benefit from it.

After the refactor, we load split_id and offset from spec columns and no longer need StreamChunkWithState.

Compatibility

I don't want to make breaking changes to the table catalog, but additional columns are required to be part of the schema.
@BugenZhao and I found a way that we can do it when building source schema, manually adding the two columns into it and pruning the columns before yielding.
Note that if users specify include partition or include offset in SQL, then they are in the table schema. We don't have to manually add them and prune them.

This approach does not require changing data in meta, all changes happen in runtime and the new "hidden columns" do not have to be materialized, ie. a table with connector.

Implementation

  • manually add the two columns (SourceDesc::column_catalogs_to_source_column_descs)
    • cover both SourceDesc and FsSourceDesc
  • for both trad_source and fs_source, change load the state from chunk
    let state: HashMap<_, _> = mapping
    .iter()
    .flat_map(|(split_id, offset)| {
    let origin_split_impl = self
    .stream_source_core
    .as_mut()
    .unwrap()
    .stream_source_splits
    .get_mut(split_id);
    origin_split_impl.map(|split_impl| {
    split_impl.update_in_place(offset.clone())?;
    Ok::<_, anyhow::Error>((
    split_id.clone(),
    split_impl.clone(),
    ))
    })
    })
    .try_collect()?;
  • prune the added columns before yielding (don't forget to check if the column is user requested. If so, no need to prune)
@github-actions github-actions bot added this to the release-1.7 milestone Jan 5, 2024
@xxchan xxchan changed the title refractor: deprecate StreamChunkWithState for source state refactor: deprecate StreamChunkWithState for source state Jan 11, 2024
@xxchan
Copy link
Member

xxchan commented Jan 11, 2024

If I understand it correctly, the motivation is: Since we can (and will always) include partition and offset in the chunk payload, the additional offset state in StreamChunkWithState and thus the whole struct isn't necessary any more.

I think it's a good to have refactoring but not urgent. It's also easy to change.

The compatibility issue is also mainly about whether always including partition and offset in the chunk is problematic, less related with StreamChunkWithState itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants