Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support non-additive schema evolution for Delta streaming source #1690

Closed

Conversation

jackierwzhang
Copy link
Contributor

Description

Added standard streaming support behind a feature flag using schema tracking log.

TODO:

  • CDC streaming support
  • Safe SQL conf flag support

See more in the issue description: #1630

How was this patch tested?

New unit tests.

}
}
}
iter = stopIndexedFileIteratorAtSchemaChangeBarrier(iter)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just return the result of stopIndexedFileIteratorAtSchemaChangeBarrier(iter)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good eye!


/** Returns matching files that were added on or after startVersion among delta logs. */
def filterAndIndexDeltaLogs(startVersion: Long): ClosableIterator[IndexedFile] = {
// TODO: handle the case when failOnDataLoss = false and we are missing change log files
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you crease an issue for this TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have it noted down myself, it's a minor follow up so won't take another issue I think.

override def commit(end: Offset): Unit = {
super.commit(end)
// IMPORTANT: for future developers, please place any work you would like to do in commit()
// before `updateSchemaTrackingLogAndFailTheStreamIfNeeded(end)` as it may throw an exception.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also put this warning above the updateSchemaTrackingLogAndFailTheStreamIfNeeded method? And explain why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep this in the commit() function because the goal is for future developers who want to put extra stuff in commit()

object DeltaSourceOffset {
object DeltaSourceOffset extends Logging {

private[DeltaSourceOffset] val VERSION_1 = 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ints instead of an enum or sealed trait? wouldn't one of the latter be type-safer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to maintain backward compatibility - we have been using ints for Delta source version in the past. Can have this as a follow up though.

Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with minor comments!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants