Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot read/write Delta into ADLS Gen2 #2832

Open
2 of 8 tasks
ArunPesari2 opened this issue Apr 1, 2024 · 3 comments
Open
2 of 8 tasks

Cannot read/write Delta into ADLS Gen2 #2832

ArunPesari2 opened this issue Apr 1, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@ArunPesari2
Copy link

Cannot read/write Delta into ADLS Gen2

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

Could not read and write Delta format into ADLS Gen2

Steps to reproduce

I have added a file to reproduce
list_of_steps.txt

Observed results

Py4JJavaError: An error occurred while calling o408.save. : java.lang.ClassCastException: class org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore$VersionedFileStatus cannot be cast to class org.apache.spark.sql.execution.datasources.FileStatusWithMetadata (org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore$VersionedFileStatus and org.apache.spark.sql.execution.datasources.FileStatusWithMetadata are in unnamed module of loader 'app') at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.FileSourceScanLike.$anonfun$setFilesNumAndSizeMetric$2(DataSourceScanExec.scala:466) at org.apache.spark.sql.execution.FileSourceScanLike.$anonfun$setFilesNumAndSizeMetric$2$adapted(DataSourceScanExec.scala:466) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.execution.FileSourceScanLike.setFilesNumAndSizeMetric(DataSourceScanExec.scala:466) at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions(DataSourceScanExec.scala:257) at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions$(DataSourceScanExec.scala:251) at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:506) at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:506) at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions(DataSourceScanExec.scala:286) at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions$(DataSourceScanExec.scala:267) at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:506) at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:506) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:553) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:537) at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:575) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191) at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527) at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455) at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51) at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:242) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:445) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3573) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320) at org.apache.spark.sql.Dataset.collect(Dataset.scala:3573) at org.apache.spark.sql.delta.Snapshot.protocolAndMetadataReconstruction(Snapshot.scala:215) at org.apache.spark.sql.delta.Snapshot.x$1$lzycompute(Snapshot.scala:134) at org.apache.spark.sql.delta.Snapshot.x$1(Snapshot.scala:129) at org.apache.spark.sql.delta.Snapshot._metadata$lzycompute(Snapshot.scala:129) at org.apache.spark.sql.delta.Snapshot._metadata(Snapshot.scala:129) at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:197) at org.apache.spark.sql.delta.Snapshot.toString(Snapshot.scala:390) at java.base/java.lang.String.valueOf(Unknown Source) at java.base/java.lang.StringBuilder.append(Unknown Source) at org.apache.spark.sql.delta.Snapshot.$anonfun$new$1(Snapshot.scala:393) at org.apache.spark.sql.delta.Snapshot.$anonfun$logInfo$1(Snapshot.scala:370) at org.apache.spark.internal.Logging.logInfo(Logging.scala:60) at org.apache.spark.internal.Logging.logInfo$(Logging.scala:59) at org.apache.spark.sql.delta.Snapshot.logInfo(Snapshot.scala:370) at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:393) at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshot$4(SnapshotManagement.scala:356) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment(SnapshotManagement.scala:480) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment$(SnapshotManagement.scala:468) at org.apache.spark.sql.delta.DeltaLog.createSnapshotFromGivenOrEquivalentLogSegment(DeltaLog.scala:74) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:349) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:343) at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:74) at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshotAtInitInternal$1(SnapshotManagement.scala:304) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal(SnapshotManagement.scala:301) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal$(SnapshotManagement.scala:298) at org.apache.spark.sql.delta.DeltaLog.createSnapshotAtInitInternal(DeltaLog.scala:74) at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:293) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74) at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:288) at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:287) at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:74) at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:57) at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:80) at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:790) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:785) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:595) at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:595) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112) at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:595) at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:784) at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:802) at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257) at com.google.common.cache.LocalCache.get(LocalCache.java:4000) at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789) at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:801) at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:811) at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:643) at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:172) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Unknown Source)

Expected results

Delta format data to be inserted into ADLS gen2

Further details

let us know if we raise this concern at Other portal

Environment information

  • Delta Lake version:0.4.2
  • delta -spark 3.1.0
  • Spark version:3.51
  • Scala version:2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@ArunPesari2 ArunPesari2 added the bug Something isn't working label Apr 1, 2024
@sherlockbeard
Copy link
Contributor

@ArunPesari2 try with this
replace the line
.config('spark.jars.packages','io.delta:delta-core_2.12:2.2.0')
with
.config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.3.1,io.delta:delta-spark_2.12:3.1.0')

Spark version:3.5.* requires Delta Lake version 3.1.*

@fei819
Copy link

fei819 commented Nov 16, 2024

we met with same issue in spark 3.5 and delta 3.0. does any delta 3.* fix this, or should we roll back to spark 3.4? thanks

@fei819
Copy link

fei819 commented Nov 26, 2024

we met with same issue in spark 3.5 and delta 3.0. does any delta 3.* fix this, or should we roll back to spark 3.4? thanks

it turns out we messed up the dependency jar between delta and hadoop for spark 3.5 in our environment, we tracked back to the root of cause by adding -verbose:class to spark.driver.extraJavaOptions, and print out all class loading path. so the fix is to remove incompatible jars from our docker image and add back the correct one under /opt/spark/jars path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants