-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Spark 3.4 #1720
Support Spark 3.4 #1720
Conversation
*/ | ||
private def needsSchemaAdjustmentByName(query: LogicalPlan, targetAttrs: Seq[Attribute], | ||
deltaTable: DeltaTableV2): Boolean = { | ||
// TODO: update this to allow columns with default expressions to not be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will come in a follow-up PR with tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean? Are there any current tests covering this function... or is the entire thing TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last thing we want is some half-incorrectly-implemented functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We add the code to support the new functionality (not specifying generated columns in insert into by name statements), but we don't allow it in this PR. Basically in Spark 3.3, insert into by name only worked when all the columns were specified and otherwise would throw an error. We maintain that behavior here (lines 809-812) so no change in behavior (so existing tests should suffice.)
Once we remove the check on 809-812 we will have the full support for insert into with generated columns. I'll remove it and add the tests in a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have to now do schema validation / evolution for by-name queries it's easier to add all this code now, but artificially block new functionality so the tests can come in a follow-up PR and lessen the amount of code in this PR.
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils} | ||
import org.apache.spark.sql.catalyst.trees.Origin | ||
|
||
class DeltaAnalysisException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We add these parameters so we don't lose information in our exceptions thrown in deltaMerge.scala
that used to use a.failAnalysis(...)
// ParseException(errorClass, ...) | ||
// Instead of passing just a message here, we could enforce creating an errorClass for each | ||
// invocation and make this DeltaParseException(errorClass, ctx) | ||
class DeltaParseException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Spark 3.3 there is a constructor ParseException(message, ctx)
but in 3.4 it is ParseException(errorClass, ctx)
(and that error class would need to be a Spark error class).
Here we enable creating a ParseException
from a message and context.
@@ -198,8 +198,7 @@ case class WriteIntoDelta( | |||
} | |||
} | |||
val rearrangeOnly = options.rearrangeOnly | |||
// TODO: use `SQLConf.READ_SIDE_CHAR_PADDING` after Spark 3.4 is released. | |||
val charPadding = sparkSession.conf.get("spark.sql.readSideCharPadding", "false") == "true" | |||
val charPadding = sparkSession.conf.get(SQLConf.READ_SIDE_CHAR_PADDING.key, "false") == "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default in Spark is "true" so we cannot use sparkSession.conf.get(SQLConf.READ_SIDE_CHAR_PADDING)
@@ -424,7 +424,12 @@ object DeltaMergeInto { | |||
// Note: This will throw error only on unresolved attribute issues, | |||
// not other resolution errors like mismatched data types. | |||
val cols = "columns " + plan.children.flatMap(_.output).map(_.sql).mkString(", ") | |||
a.failAnalysis(msg = s"cannot resolve ${a.sql} in $mergeClauseType given $cols") | |||
// todo: added a new Delta error for this to avoid rewriting tests, but existing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be a follow up PR for this? If so, please clarify what need to be done. This is a little vague for a TODO
// todo: added a new Delta error for this to avoid rewriting tests, but existing | |
// TODO: added a new Delta error for this to avoid rewriting tests, but existing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah just a note to myself / reviewers on the other option. I think I prefer using the new Delta error I will remove this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's like rewriting 50+ tests otherwise...
@@ -85,6 +86,18 @@ class DeltaAnalysis(session: SparkSession) | |||
} | |||
|
|||
|
|||
// INSERT INTO by name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this new functionality? if not, how did the the corresponding command work before with this case statement? What changed with Spark 3.4?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark 3.3:
- insert into by name queries
- if all columns specified --> rearrange order and convert to an insert by ordinal query
- not all column specified --> exception (missing columns)
In Spark 3.4 insert into by name queries for DSV2 are not converted to by ordinal apache/spark#39334
So before insert by name queries would be converted to insert by ordinal and would match the above AppendDelta
pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mentioned in the PR description albeit a little less thoroughly :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
understood. thankyou for explaining in detail
@@ -48,3 +58,17 @@ class DeltaUnsupportedOperationException( | |||
override def getErrorClass: String = errorClass | |||
def getMessageParametersArray: Array[String] = messageParameters | |||
} | |||
|
|||
// todo: we had to add this since in Spark 3.4 ParseException(message, ...) was replaced by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: todo's are generally capitalized. please fix them all over this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry also didn't plan to merge this todo. Mostly a note to myself or reviewers. I think it's easiest not to add an error class for every invocation in DeltaSqlParser
but I'm not super clear on how much we want to stick to enforcing using the Spark error framework. I'm happy to do it if we want to stick to the framework
@@ -759,15 +830,16 @@ class DeltaAnalysis(session: SparkSession) | |||
Cast(input, dt, Option(timeZone), ansiEnabled = false) | |||
case SQLConf.StoreAssignmentPolicy.ANSI => | |||
(input: Expression, dt: DataType, name: String) => { | |||
AnsiCast(input, dt, Option(timeZone)) | |||
val cast = Cast(input, dt, Option(timeZone), ansiEnabled = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this change for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compile error; AnsiCast
removed in 3.4 and consolidated into one Cast
class
* - A generated column references itself | ||
* - A generated column references another generated column | ||
*/ | ||
def validateColumnReferences( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there test coverage for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes in GeneratedColumnSuite
, tests fail without this change.
@@ -73,10 +73,12 @@ def tearDown(self) -> None: | |||
shutil.rmtree(self.tempPath) | |||
|
|||
def test_maven_jar_loaded(self) -> None: | |||
packages: List[str] = self.spark.conf.get("spark.jars.packages").split(",") | |||
|
|||
packagesConf: Optional[str] = self.spark.conf.get("spark.jars.packages") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this change?
this seems a weird piece of code, could use more inline docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3.4 changed the return type from conf.get
from str
--> Optional[str]
mypi checks fail without this specific assert statement
python/delta/tests/test_pip_utils.py
Outdated
packages: List[str] = self.spark.conf.get("spark.jars.packages").split(",") | ||
|
||
packagesConf: Optional[str] = self.spark.conf.get("spark.jars.packages") | ||
assert packagesConf is not None # mypi needs this to assign type str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert packagesConf is not None # mypi needs this to assign type str | |
assert packagesConf is not None # mypi needs this to assign type str from Optional[str] |
@tdas does this help? since the types are with variable definition i don't know if docs mentioning the return type from conf.get will help
I found all this python typing a little confusing.... I.e. type Optiona[str] is either type str OR None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe there is a better way to do this I had to google this
val matchedRowsDf = targetDf | ||
.withColumn(FILE_NAME_COL, col(s"${METADATA_NAME}.${FILE_PATH}")) | ||
.withColumn(FILE_NAME_COL, uriEncode(col(s"${METADATA_NAME}.${FILE_PATH}"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this change for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this fix? does it have test coverage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests fail without the fix. Adding a TODO in the code here to provide a more robust solution for 3.4.0 and 3.4.1
3955b07
to
ba265f6
Compare
Closed by 5c3f4d3 |
Description
Makes changes to support Spark 3.4. These include compile necessary changes, and test and code changes due to changes in Spark behavior.
Some of the bigger changes include
class ErrorInfo
to private. This means the current approach inDeltaThrowableHelper
can no longer work. We now useErrorClassJsonReader
(these are the changes toDeltaThrowableHelper
andDeltaThrowableSuite
message: String
toerrorClass: String
which does not cause a compile error, but instead causes a "SparkException-error not found" when called. Some things affected includeParseException(...)
,a.failAnalysis(..)
.DeltaAnalysis.scala
we need to perform schema validation checks and schema evolution for such queries; right now we only match when!isByName
GeneratedColumn.scala
DelegatingCatalogExtension
deprecatescreateTable(..., schema: StructType, ...)
in favor ofcreateTable(..., columns: Array[Column], ...)
_metadata.file_path
is not always encoded. We updateDeleteWithDeletionVectorsHelper.scala
to accomodate for this.REPLACE WHERE
. [TESTS IN FOLLOW-UP PR]Resolves #1696
How was this patch tested?
Existing tests should suffice since there are no major Delta behavior changes besides support for
REPLACE WHERE
for which we have added tests.Does this PR introduce any user-facing changes?
Yes. Spark 3.4 will be supported.
REPLACE WHERE
is supported in SQL.