Skip to content

Conversation

JiaqiWang18
Copy link
Contributor

@JiaqiWang18 JiaqiWang18 commented Sep 30, 2025

What changes were proposed in this pull request?

Prototype PR for explicit checkpoint location that follows below format:

checkpoints-root/
      ├── myst/                            # Table "myst"
      │    ├── flow1/                      # Flow to myst
      │    │    ├── 0/                    # Versioned checkpoint (0)
      │    │    │    ├── commits/
      │    │    │    ├── offsets/
      │    │    │    └── sources/
      │    │    └── 1/                    # Versioned checkpoint (1)
      │    │
      │    └── flow2/                     # Another flow to myst
      │         ├── 0/                    # Versioned checkpoint (0)
      │         │    ├── commits/
      │         │    ├── offsets/
      │         │    └── sources/
      │         └── 1/                    # Versioned checkpoint (1)
      │
      └── mysink/                         # Sink "mysink"
            └── flowA/                     # Flow to mysink
                 ├── 0/                    # Versioned checkpoint (0)
                 │    ├── commits/
                 │    ├── offsets/
                 │    └── sources/
                 └── 1/                    # Versioned checkpoint (1)

Backend changes should be mostly finalized, all tests should pass.

For user-facing API, use a spark.sql.pipelines.storageRoot for now, as it is easier to revert.

Why are the changes needed?

To unblock development of sinks
Path toward supporting multi-flow

Does this PR introduce any user-facing change?

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

val resolvedGraph = resolveGraph()
if (context.fullRefreshTables.nonEmpty) {
State.reset(resolvedGraph, context)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with explicit storage location for checkpoint, we shouldn't need to create the tables and obtain its path beforehand. resolvedGraph should suffice.

case (true, _) => // Already performed reset for full refresh mv/st - no-op
case (false, true) => // Incremental refresh of a st - no-op
case (false, false) => // Incremental refresh of a mv - truncate
context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this match is to avoid calling TRUNCATE twice for full refresh MVs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants