Skip to content

Spark: readStream from Iceberg doesn't progress anymore after running Maintenance (rewrite_data_files and rewrite_manifests) #10117

@tenstriker

Description

@tenstriker

Apache Iceberg version

1.4.3

Query engine

Spark

Please describe the bug 🐞

Spark version : 3.5_2.12

We have iceberg spark streaming read job reading iceberg table for new append only changes with following config:

    stream = (
        spark.readStream.format("iceberg")
        .option("stream-from-timestamp", str(config.stream_from_timestamp))
        .option("streaming-skip-overwrite-snapshots", "true")
        .option("streaming-skip-delete-snapshots", "true")
        .option("streaming-max-rows-per-micro-batch", str(MAX_ROWS_PER_BATCH))
        .load(table)
        .writeStream.queryName(config.streaming_query_name)
        .foreachBatch(partial(my_func, config))
        .option("checkpointLocation", config.checkpoint_location)
        .trigger(processingTime=f"{config.trigger_interval_seconds} seconds")
        .start()
    )

This job runs fine until over the weekend we ran a Maintenance job on iceberg tables for Very first time. Maintenance job calls 4 stored procedure in sequence:

  1. expire_snapshots (retains last 100)
  2. rewrite_data_files (creates replace snapshot internally)
  3. rewrite_manifests (creates replace snapshot internally)
  4. remove_orpahan_files

Maintenance job runs fine. compaction output is as expected. Step 2 and 3 creates 2 replace snapshots as I pointed out above. But since upstream is continuously writing to this table there will be lot of append snapshots that would start following it.
All in all when I query the table (which uses latest metadata.json) all data is returned as expected. ie. table state is up to date and consistent.

However, spark streaming Read Job which should only process append snapshot is also encountering replace snapshot at every trigger and somehow it can't go pass the replace snapshot and continue scanning other append snapshots.

Code in SparkMicroBatchStream seem to skip replace snapshot but then it's not doing anything or stuck at something else. I can't pinpoint exact location of this happening:

here's some useful logs:

MicroBatchExecution:64 - Starting Trigger Calculation
BaseMetastoreTableOperations:199 - Refreshing table metadata from new version: gs://...my_table/metadata/30605-26bfa08d-ee35-4af5-9fe0-.metadata.json (every trigger gets correct metadata json file)

/*
this is where following things should happen but it doesn't 
CheckpointFileManager:60 Writing atomically to ...offset
MicroBatchExecution:60 Committed offsets for batch ... 
MicroBatchExecution:64 - walCommit took 789 ms
MicroBatchExecution:64 - queryPlanning took 215 ms

Instead as you see in following ExecutionStats is empty because nothing was writtern.
*/
MicroBatchExecution:64 - latestOffset took 451 ms
MicroBatchExecution:64 - triggerExecution took 452 ms
MicroBatchExecution:64 - Execution stats: ExecutionStats(Map(),List(),Map())
MicroBatchExecution:60 - Streaming query has been idle and waiting for new data more than 10000 ms.
 

In spark Offset Directory last offset that was also committed was of one from the first replace snapshot. (result of rewrite_data_files) so it seems like iceberg couldn't compute any further StreamingOffset after that point and so nothing runs.
Even restarting streaming Read job stays stuck. It can't seem to get offset from any of the append snapshots that came after replace

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingstale

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions