-
Notifications
You must be signed in to change notification settings - Fork 3k
Closed
Description
Apache Iceberg version
1.3.1
Query engine
Spark
Please describe the bug 🐞
@singhpk234 I think you might know how to fix this.
The implementation of streaming-skip-overwrite-snapshots is not what I expected. At this location it does skip over any rewrite snapshots, but only a single file at the time.
Suppose you trigger every minute and that you encounter a rewrite snapshot with 300 files. This means it will take 300 x 1 minute (300 minutes) to finally skip over the snapshot and start progressing again.
I think that once a rewrite snapshot is detected we should exhaust all the positions (all the files) in that commit to position ourselves for the next commit.
This is how my writeStream is configured.
# connect to source table
df = spark.readStream.format("iceberg")
if reset_checkpoint:
# current time in milliseconds
ts = int(time.time() * 1000)
print(f"Reading {source_table} from ts {ts}")
df = df.option("stream-from-timestamp", ts)
df = (
df
.option("split-size", 16 * 1024 * 1024)
.option("streaming-skip-delete-snapshots", True)
.option("streaming-skip-overwrite-snapshots", True)
.option("streaming-max-files-per-micro-batch", 200)
.option("streaming-max-rows-per-micro-batch", 2000000)
.load(source_table)
.withWatermark(
"timestamp", "10 minutes"
) # enable watermark so that spark keeps track of min/max/avg/watermark eventTime.
# Note we do not use the watermark to evict rows from an aggregation window, only to keeps track of eventTime metrics.
)
```
Metadata
Metadata
Assignees
Labels
No labels