Skip to content

Spark Structured Streaming - Cannot invoke "org.apache.iceberg.Snapshot.operation()" because "snapshot" is null #6388

@ottensjors

Description

@ottensjors

Apache Iceberg version

1.0.0

Query engine

Spark

Please describe the bug 🐞

Hi team,

i'm currently using apache spark 3.3 with apache-iceberg 1.0.0 and AWS S3 and GlueCatalog-integration. please find my spark-config below:

# add Iceberg dependency
ICEBERG_VERSION=1.0.0
DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:$ICEBERG_VERSION"

DEPENDENCIES+=",org.apache.iceberg:iceberg-hive-runtime:1.0.0,org.apache.hadoop:hadoop-aws:3.3.2"

# add AWS dependnecy
AWS_SDK_VERSION=2.17.257
AWS_MAVEN_GROUP=software.amazon.awssdk
AWS_PACKAGES=(
    "bundle"
    "url-connection-client"
)
for pkg in "${AWS_PACKAGES[@]}"; do
    DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION"
done


# start Spark SQL client shell --> works with aws glue catalog

spark-shell --packages $DEPENDENCIES \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://<s3-bucket>/adhoc-otten/iceberg \
    --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
    --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.hive.metastore.version=2.3.9 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \
    --conf hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory    

the spark-session has successfully integrated with Glue catalog and AWS S3. which is great. I'm able to use all functionallity when it comes to regular dataframes.

However, I'm currently evaluating apache iceberg with spark structured streaming. Unfortunately i'm running into an issue. below you can find the code i use to eveluate it:

val df = spark.readStream.format("iceberg").option("stream-from-timestamp",1670487707799L).option("streaming-skip-overwrite-snapshots","true").load("<table_name>")

df: org.apache.spark.sql.DataFrame = [ITEM_ID: int, ITEM_KEY: bigint ... 5 more fields]

df.writeStream.format("iceberg").outputMode("append").format("iceberg").option("path","default.dimitem_stream").option("checkpointLocation","s3a://<bucket-name>/adhoc-otten/iceberg/default/dimitem_stream/checkpoint").start()

res4: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@22dac77c

it all seems to go well. but after a few seconds the following error manifests itself:

22/12/08 14:00:37 ERROR MicroBatchExecution: Query [id = 4db07e32-5e9e-44d9-8a89-7d1c0f64e716, runId = ce0a9e92-efff-4be8-b9f7-ab8ae9ac179c] terminated with error
org.apache.spark.SparkException: The Spark SQL phase planning failed with an internal error. Please, fill a bug report in, and provide the full stack trace.
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:657)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:647)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.iceberg.Snapshot.operation()" because "snapshot" is null
	at org.apache.iceberg.spark.source.SparkMicroBatchStream.shouldProcess(SparkMicroBatchStream.java:230)
	at org.apache.iceberg.spark.source.SparkMicroBatchStream.planFiles(SparkMicroBatchStream.java:211)
	at org.apache.iceberg.spark.source.SparkMicroBatchStream.planInputPartitions(SparkMicroBatchStream.java:148)
	at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.inputPartitions$lzycompute(MicroBatchScanExec.scala:45)
	at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.inputPartitions(MicroBatchScanExec.scala:45)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:142)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:141)
	at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.supportsColumnar(MicroBatchScanExec.scala:29)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:153)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
	at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)

the main takeaway here is the following line:

Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.iceberg.Snapshot.operation()" because "snapshot" is null

it's probably me doing something stupid and being PICNIC (Problem In Chair Not In Computer).

However i went to the sourcecode of apache iceberg and was unable to find any assertion or message that reflects the above statement or any additional information on the internet.

I was wondering if someone could provide me with some much needed insights to get this resolved.

Thanks in advance!

regards,

Sjors Otten
principal architect

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions