-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[1105] Delta Lake Change Data Feed support - streaming reads #1154
Conversation
- update DeltaSource to use the above trait - add option in DeltaOption - add DeltaCDCStreamSuite GitOrigin-RevId: a02b7c9f66e1c6659c39855ad26bc4bfd35e33b3
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
override val schema: StructType = | ||
ColumnWithDefaultExprUtils.removeDefaultExpressions(deltaLog.snapshot.metadata.schema) | ||
override val schema: StructType = { | ||
val schemaWithoutCDC = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a comment on why we removing some fields from the schema
deltaTable.delete("value = 1") // version 4 | ||
}, | ||
ProcessAllAvailable(), | ||
CheckAnswer((1, "insert", 3), (2, "insert", 3), (1, "delete", 4)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is CheckNewAnswer
better here to just check what the new values in the output are?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of our streaming tests use CheckAnswer
, not CheckNewAnswer
, so i'm perfectly fine sticking with CheckAnswer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flyby comment: CheckNewAnswer
is often easier to read than the ever growing list in CheckAnswer
, so I slightly prefer that. but its okay, not blocking this PR for that. got ahead and merge it.
deltaTable.delete("id = 3") // version 2 | ||
}, | ||
ProcessAllAvailable(), | ||
CheckAnswer((4, "insert", 1), (5, "insert", 1), (3, "delete", 2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here.
This PR adds CDF + Streaming functionality, as part of the ongoing CDF project. The bulk of this PR is a) adding `DeltaSourceCDCSupport`. This PR adds on the necessary CDF functionality to DeltaSource. b) updating DeltaSource to use the various `DeltaSourceCDCSupport` APIs when CDF is enabled c) adding a new test suite Closes delta-io#1154 GitOrigin-RevId: e7e8f6d48f99a63e7c5d35a5d0173a9cc26cf274
This PR adds CDF + Streaming functionality, as part of the ongoing CDF project. The bulk of this PR is a) adding `DeltaSourceCDCSupport`. This PR adds on the necessary CDF functionality to DeltaSource. b) updating DeltaSource to use the various `DeltaSourceCDCSupport` APIs when CDF is enabled c) adding a new test suite Closes delta-io#1154 GitOrigin-RevId: e7e8f6d48f99a63e7c5d35a5d0173a9cc26cf274
Description
This PR adds CDF + Streaming functionality, as part of the ongoing CDF project #1105.
The bulk of this PR is
a) adding
DeltaSourceCDCSuppor
t. This PR adds on the necessary CDF functionality to DeltaSource.b) updating
DeltaSource
to use the variousDeltaSourceCDCSupport
APIs when CDF is enabledc) adding a new test suite
How was this patch tested?
New UTs.
Does this PR introduce any user-facing changes?
No.