-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Description
Query engine
Spark
Question
I did a migration by adding a new column to the table
val upd: UpdateSchema = table.updateSchema().addColumn("_last_modified_at", Types.TimestampType.withZone())
upd.commit()At that point, I naively expected that I can query _last_modified_at in efficient way and a planner will acquire information that data files are filled with NULLs. Hence, any filter like _last_modified_at IS NOT NULL should implicitly lead to skipping "legacy" files that existed before. Instead, metrics are empty and Spark does a full scan.
Do I expect "too much"?
To be specific, I am writing down steps I did.
writing the column to (new) snapshots
val withLastModified = inputDataFrame.withColumn(LastModifiedAt, current_timestamp())
withLastModified.writeTo(tableId).append()
table.refresh()example with multiple writes before schema updated
val snapshot1 = write(rows, tableId)
val snapshot2 = write(rows, tableId)
val snapshot3 = write(rows, tableId)
val snapshot4 = write(rows, tableId)
migrate(tableId) // .updateSchema().addColumn("_last_modified_at"
val snapshot5 = write(rows, tableId)read table / dataframe
The table looks like that now:
+---+--------------------------+
|id |_last_modified_at |
+---+--------------------------+
|130|2024-07-24 11:34:31.640384|
|936|2024-07-24 11:34:31.640384|
|ddc|null |
|2ed|null |
|ddc|null |
|2ed|null |
|ddc|null |
|2ed|null |
|3db|null |
|14d|null |
+---+--------------------------+
I expect that I can apply filter to _last_modified_at and skip all files that had been written before schema evolution (because values are all NULLs there).
val wholeDf = read(tableId)
val result = wholeDf.where(col("_nio_last_modified_at").isNotNull).explain(true)studying plan
The filter is pushed down, looks good.
== Physical Plan ==
*(1) Project [id#978, _last_modified_at#980]
+- *(1) Filter isnotnull(_last_modified_at#980)
+- *(1) ColumnarToRow
+- BatchScan ds[id#978, _last_modified_at#980] ds (branch=null) [filters=_last_modified_at IS NOT NULL, groupedBy=] RuntimeFilters: []
studying metrics (select * from ds.files)
+------------------------+------------------------+----------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------+
|value_counts |null_value_counts |lower_bounds |upper_bounds |readable_metrics |
+------------------------+------------------------+----------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------+
|{1 -> 2, 2 -> 2, 3 -> 2}|{1 -> 0, 2 -> 0, 3 -> 0}|{1 -> 13d2f66e-ecb8-41, 2 -> �, 3 -> @}D�����}|{1 -> 930da19b-ee0a-4:, 2 -> �, 3 -> @}D�����}|{{91, 2, 0, null, 2024-07-24 11:34:31.640384, 2024-07-24 11:34:31.640384}...|
|{1 -> 2, 2 -> 2} |{1 -> 0, 2 -> 0} |{1 -> 2ead51c7-ab1f-44, 2 -> �} |{1 -> ddc79ca7-18e1-47, 2 -> �} |{{null, null, null, null, null, null}, {44, 2, 0, null, false, false}...|
|{1 -> 2, 2 -> 2} |{1 -> 0, 2 -> 0} |{1 -> 2ead51c7-ab1f-44, 2 -> �} |{1 -> ddc79ca7-18e1-47, 2 -> �} |{{null, null, null, null, null, null}, {44, 2, 0, null, false, false}...|
|{1 -> 2, 2 -> 2} |{1 -> 0, 2 -> 0} |{1 -> 2ead51c7-ab1f-44, 2 -> �} |{1 -> ddc79ca7-18e1-47, 2 -> �} |{{null, null, null, null, null, null}, {44, 2, 0, null, false, false}...|
|{1 -> 2, 2 -> 2} |{1 -> 0, 2 -> 0} |{1 -> 14a5b698-5335-4f, 2 -> �} |{1 -> 3d5f23e7-9365-49, 2 -> �} |{{null, null, null, null, null, null}, {44, 2, 0, null, false, false}...|
+------------------------+------------------------+----------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------+
, 3 -> 0 value for null_value_counts says that non-nullability for the latest insertion is "counted."
I probably expected that there should be 3 -> N for rest of entries.
ScanReport
1.3.1
1.3.1 version logs Scan report with resultDataFiles=5 and skippedDataFiles=0.
o.a.i.metrics.LoggingMetricsReporter - ScanReport{tableName=ds, snapshotId=3871043058194055238, filter=not_null(ref(name="_last_modified_at")), ... resultDataFiles=CounterResult{unit=COUNT, value=5} ... skippedDataFiles=CounterResult{unit=COUNT, value=0}, iceberg-version=Apache Iceberg 1.3.1 (commit 62c34711c3f22e520db65c51255512f6cfe622c4), app-id=local-1721813645616, engine-name=spark}}
So, all files are scanned.
1.6.0
I also reproduced the same case with up-to-data Iceberg/Spark versions. ScanReport is not being logged but I caught it with debugger. Same thing.
Thank you!