Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint stats maxValues is incorrect #2571

Open
echai58 opened this issue Jun 4, 2024 · 18 comments
Open

Checkpoint stats maxValues is incorrect #2571

echai58 opened this issue Jun 4, 2024 · 18 comments
Labels
binding/rust Issues for the Rust crate bug Something isn't working

Comments

@echai58
Copy link

echai58 commented Jun 4, 2024

Environment

Delta-rs version: 0.15.3

Binding: python


Bug

What happened:
I have a delta table where if I do to_pyarrow_table on a timestamp column for <= datetime.datetime(2023, 3, 30, 0, 0, 0, 0), it leaves in a row that has value 2023-03-30 00:00:00.000902. I see that when inspecting the fragments of the pyarrow_dataset, there is an expression saying (timestamp <= 2023-03-30 00:00:00.000000)), which is an incorrect max_value for this file. Because of this, it takes advantage of this incorrect expression to do the filtering, thus assuming all values in the file satisfy the predicate.

Looking through the delta-rs code, it seems like this is parsed from the maxValues field of the checkpoint file. I looked at this checkpoint file, and I do indeed see the incorrect maxValues value:

datetime.datetime(2023, 3, 30, 0, 0)

It seems to not include the microseconds field. I see in another timestamp column in this table, the maxValues field has the microsecond precision.

Can someone help look into why this could happen?

@echai58 echai58 added the bug Something isn't working label Jun 4, 2024
@ion-elgreco
Copy link
Collaborator

Please add a reproducible example :)

@echai58
Copy link
Author

echai58 commented Jun 4, 2024

@ion-elgreco

t = pa.Table.from_pandas(pd.DataFrame({
    "p": [1],
    "a": [datetime.datetime(2023, 3, 29, 23,59,59,807126)], 
    }), schema=pa.schema([("p", pa.int64()), ("a", pa.timestamp("us"))]))


write_deltalake(
    "test",
    t,
    mode="error",
    partition_by=["p"],
)

t = pa.Table.from_pandas(pd.DataFrame({
    "p": [1],
    "a": [datetime.datetime(2023, 3, 30, 0,0,0,902)], 
    }), schema=pa.schema([("p", pa.int64()), ("a", pa.timestamp("us"))]))


write_deltalake(
    "test",
    t,
    mode="append",
    partition_by=["p"],
)

# this works 
print(
    DeltaTable("test").to_pyarrow_table(
        filters = [
            ("a", "<=", datetime.datetime(2023, 3, 30, 0, 0, 0, 0)),
        ]
    )
)

# combination of compact + create checkpoint breaks it
DeltaTable("test").optimize.compact()
DeltaTable("test").create_checkpoint()

print(
    DeltaTable("test").to_pyarrow_table(
        filters = [
            ("a", "<=", datetime.datetime(2023, 3, 30, 0, 0, 0, 0)),
        ]
    )
)

It seems like it requires a combination of compact + checkpoint that breaks it. I tested this against a newer version of delta-rs (0.17.3), and it seems to be fixed. Do you know which PR fixed this (I'm unable to do an upgrade due to some breaking changes I haven't handled yet, but I could do a local version release including the fix, for now)?

@ion-elgreco
Copy link
Collaborator

I am not entirely sure, throughout the releases there have been multiple occasions Ive touched timestamp types, I suggest you just pip install each release and check when it fails, if you can tell me which release it got fixed, I might be able to tell faster

@echai58
Copy link
Author

echai58 commented Jun 4, 2024

@ion-elgreco

Going through the versions, it seems the fix was implicit with the addition of timestampNtz, and it is still an issue for timestamps with a timezone.

Adding utc timezones to my previous example and running on version 0.17.3,

t = pa.Table.from_pandas(pd.DataFrame({
    "p": [1],
    "a": [datetime.datetime(2023, 3, 29, 23,59,59,807126)], 
    }), schema=pa.schema([("p", pa.int64()), ("a", pa.timestamp("us", tz="utc"))]))


write_deltalake(
    "test",
    t,
    mode="error",
    partition_by=["p"],
)

t = pa.Table.from_pandas(pd.DataFrame({
    "p": [1],
    "a": [datetime.datetime(2023, 3, 30, 0,0,0,902)], 
    }), schema=pa.schema([("p", pa.int64()), ("a", pa.timestamp("us", tz="utc"))]))


write_deltalake(
    "test",
    t,
    mode="append",
    partition_by=["p"],
)

# this works 
print(
    DeltaTable("test").to_pyarrow_table(
        filters = [
            ("a", "<=", datetime.datetime(2023, 3, 30, 0, 0, 0, 0, tzinfo=datetime.timezone.utc)),
        ]
    )
)

# combination of compact + create checkpoint breaks it
DeltaTable("test").optimize.compact()
DeltaTable("test").create_checkpoint()

print(
    DeltaTable("test").to_pyarrow_table(
        filters = [
            ("a", "<=", datetime.datetime(2023, 3, 30, 0, 0, 0, 0, tzinfo=datetime.timezone.utc)),
        ]
    )
)

I still see the bad output from the second print:

pyarrow.Table
p: int64
a: timestamp[us, tz=UTC]
----
p: [[1,1]]
a: [[2023-03-30 00:00:00.000902,2023-03-29 23:59:59.807126]]

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Jun 4, 2024

Hmm maybe the precision is lost after checkpointing.

@echai58 can you manually read the contents of the checkpoint with pyarrow and then grab the column with the stats for that add action, I'm curiously what's written there now

@echai58
Copy link
Author

echai58 commented Jun 4, 2024

I previously stated that the issue was only when you compacted, then checkpointed, but that turns out not to be true.

@ion-elgreco
This is the contents of the checkpoint file, without compacting:

{'path': 'p=1/0-a8fd92bc-ce65-4e49-8785-bca8809f6a1b-0.parquet',
 'size': 589.0,
 'modificationTime': 1717517089571.0,
 'dataChange': False,
 'stats': '{"numRecords": 1, "minValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "maxValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "nullCount": {"a": 0}}',
 'partitionValues': [('p', '1')],
 'tags': [],
 'deletionVector': None,
 'stats_parsed': {'numRecords': 1.0,
  'minValues': {'a': datetime.datetime(2023, 3, 29, 23, 59, 59, 807000, tzinfo=<UTC>)},
  'maxValues': {'a': datetime.datetime(2023, 3, 29, 23, 59, 59, 807000, tzinfo=<UTC>)},
  'nullCount': {'a': 0.0}},
 'partitionValues_parsed': {'p': 1.0}}

This is the file if I compact before I checkpoint:

{'path': 'p=1/part-00001-cf75aa9a-cee7-48f2-b305-9274eac6a056-c000.zstd.parquet',
 'size': 600.0,
 'modificationTime': 1717517182549.0,
 'dataChange': False,
 'stats': '{"numRecords":2,"minValues":{"a":"2023-03-29T23:59:59.807126Z"},"maxValues":{"a":"2023-03-30T00:00:00.000902Z"},"nullCount":{"a":0}}',
 'partitionValues': [('p', '1')],
 'tags': [],
 'deletionVector': None,
 'stats_parsed': {'numRecords': 2.0,
  'minValues': {'a': datetime.datetime(2023, 3, 29, 23, 59, 59, 807000, tzinfo=<UTC>)},
  'maxValues': {'a': datetime.datetime(2023, 3, 30, 0, 0, tzinfo=<UTC>)},
  'nullCount': {'a': 0.0}},
 'partitionValues_parsed': {'p': 1.0}}

And this is the metadata from the compact log, which has the correct maxValues

{"add":{"path":"p=1/part-00001-cf75aa9a-cee7-48f2-b305-9274eac6a056-c000.zstd.parquet","partitionValues":{"p":"1"},"size":600,"modificationTime":1717517182549,"dataChange":false,"stats":"{\"numRecords\":2,\"minValues\":{\"a\":\"2023-03-29T23:59:59.807126Z\"},\"maxValues\":{\"a\":\"2023-03-30T00:00:00.000902Z\"},\"nullCount\":{\"a\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Jun 4, 2024

Wait, it's not being maintained through checkpointing, right? The maxValues looks incorrect in the checkpoint file.

Maybe I'm lost on the order here, why is the checkpoint not referencing the same path?

@echai58
Copy link
Author

echai58 commented Jun 4, 2024

Wait, it's not being maintained through checkpointing, right? The maxValues looks incorrect in the checkpoint file.

Maybe I'm lost on the order here, why is the checkpoint not referencing the same path?

I reran the script twice, once with the compact and once without. The path in the compact log matches the path in the second checkpoint parquet I pasted. Sorry for the confusion.

@ion-elgreco
Copy link
Collaborator

That "stats_parsed" col has lost some precision, I am trying to follow through the code where this happens. But that might be the main culpriit

@echai58
Copy link
Author

echai58 commented Jun 4, 2024

I agree in the case where I first compacted before checkpointing, it seems the precision is lost in stats_parsed.

But in the first case, where I did not compact first, it seems it computed the incorrect maxValues in the first place?

 'stats': '{"numRecords": 1, "minValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "maxValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "nullCount": {"a": 0}}'

has the incorrect value, even before parsing it.

@ion-elgreco
Copy link
Collaborator

I agree in the case where I first compacted before checkpointing, it seems the precision is lost in stats_parsed.

But in the first case, where I did not compact first, it seems it computed the incorrect maxValues in the first place?

 'stats': '{"numRecords": 1, "minValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "maxValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "nullCount": {"a": 0}}'

has the incorrect value, even before parsing it.

The last one seems fine because it's only 1 record, so 1 timestamp

@echai58
Copy link
Author

echai58 commented Jun 4, 2024

I agree in the case where I first compacted before checkpointing, it seems the precision is lost in stats_parsed.
But in the first case, where I did not compact first, it seems it computed the incorrect maxValues in the first place?

 'stats': '{"numRecords": 1, "minValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "maxValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "nullCount": {"a": 0}}'

has the incorrect value, even before parsing it.

The last one seems fine because it's only 1 record, so 1 timestamp

Ohh right yeah, that's the add for the other file, sorry.

{'path': 'p=1/1-4f0e9148-4290-4cb9-9555-cfa6691eee5e-0.parquet',
 'size': 589.0,
 'modificationTime': 1717520084596.0,
 'dataChange': False,
 'stats': '{"numRecords": 1, "minValues": {"a": "2023-03-30T00:00:00.000902+00:00"}, "maxValues": {"a": "2023-03-30T00:00:00.000902+00:00"}, "nullCount": {"a": 0}}',
 'partitionValues': [('p', '1')],
 'tags': [],
 'deletionVector': None,
 'stats_parsed': {'numRecords': 1.0,
  'minValues': {'a': datetime.datetime(2023, 3, 30, 0, 0, tzinfo=<UTC>)},
  'maxValues': {'a': datetime.datetime(2023, 3, 30, 0, 0, tzinfo=<UTC>)},
  'nullCount': {'a': 0.0}},
 'partitionValues_parsed': {'p': 1.0}}

The add for the other one sees the same problem, where the parsing seems to lose precision.

@roeap
Copy link
Collaborator

roeap commented Jun 4, 2024

@echai58 @ion-elgreco - this is literally a footnote, but timestamps are truncated to miliseconds when computing stats. Could this be what we are seeing here?

@echai58
Copy link
Author

echai58 commented Jun 4, 2024

@echai58 @ion-elgreco - this is literally a footnote, but timestamps are truncated to miliseconds when computing stats. Could this be what we are seeing here?

Ah, yeah that would explain it.

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Jun 4, 2024

@echai58 @ion-elgreco - this is literally a footnote, but timestamps are truncated to miliseconds when computing stats. Could this be what we are seeing here?

Uhm so the protocol states it should be milliseconds :s. That's quite wrong because we are then passing rounded stats to the pyarrow.dataset fragments which in turn will return wrong results

@echai58
Copy link
Author

echai58 commented Jun 4, 2024

Instead of truncating, could we round up to the next millisecond when computing the stats? That would prevent this edge case from returning wrong results

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Jun 4, 2024

Rounding up or down can result in retrieving less or more records than expected. So both doesn't work

@echai58
Copy link
Author

echai58 commented Jun 4, 2024

@ion-elgreco isn't rounding up always safe? Because if the predicate is <= the rounded up value, it performs the filtering on the loaded in data?

@rtyler rtyler added binding/rust Issues for the Rust crate labels Jun 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants