Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet Predicate Pushdown Does Not Handle Type Coercion #7925

Closed
tustvold opened this issue Oct 25, 2023 · 11 comments · Fixed by #10716
Closed

Parquet Predicate Pushdown Does Not Handle Type Coercion #7925

tustvold opened this issue Oct 25, 2023 · 11 comments · Fixed by #10716
Assignees
Labels
bug Something isn't working

Comments

@tustvold
Copy link
Contributor

tustvold commented Oct 25, 2023

Describe the bug

Following #6458 SchemaAdapter as used by ParquetExec will automatically coerce a parquet file's schema to that of the table.

This logic does not currently extend to either build_row_filter or PruningPredicate

To Reproduce

Pushdown a predicate on a column that is relying on type coercion and get type errors

Expected behavior

The pruning logic should perform type coercion

Additional context

No response

@tustvold tustvold added the bug Something isn't working label Oct 25, 2023
@alamb
Copy link
Contributor

alamb commented Oct 25, 2023

@tustvold do you have a reproducer for this one?

I tried a query like you described and it does not error:

❯ explain select * from traces where trace_id > 1;

| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |

| logical_plan  | Filter: CAST(traces.trace_id AS Utf8) > Utf|
|               |   TableScan: traces projection=[attributes, duration_nano, end_time_unix_nano, service.name, span.kind, span.name, span_id, time, trace_id, otel.status_code, parent_span_id], partial_filters=[CAST(traces.trace_id AS Utf8) > Utf|
| physical_plan | CoalesceBatchesExec: target_batch_size|
|               |   FilterExec: CAST(trace_id@8 AS Utf|
|               |     ParquetExec: file_groups={16 groups: [[Users/alamb/Downloads/traces/b6a5c8cd-4253-402b-bdc4-8cb829fad874.parquet, Users/alamb/Downloads/traces/beb1feda-848a-49c7-bdeb-d4bae33c2e47.parquet, Users/alamb/Downloads/traces/3b15a726-8939-41c2-a4fc-354ef7efedd1.parquet, Users/alamb/Downloads/traces/6e545a30-8e19-4fd6-ad26-40c002fd0d9c.parquet, Users/alamb/Downloads/traces/a12e1e3c-ea6d-4c47-993d-f33d74b53815.parquet, ...], [Users/alamb/Downloads/traces/b005d801-466f-46ba-818c-76811ae180bb.parquet, Users/alamb/Downloads/traces/fdea6ba3-2709-4ce4-b42f-7176651fbbc4.parquet, Users/alamb/Downloads/traces/5c22e1f4-cfed-4ded-9818-afb3cf4ac7e7.parquet, Users/alamb/Downloads/traces/ea01e29d-3856-4812-8668-6237336f4a01.parquet, Users/alamb/Downloads/traces/6cb810df-4594-4782-bb93-838fb3a02203.parquet, ...], [Users/alamb/Downloads/traces/4519ebee-2485-4641-a289-cc4394b24329.parquet, Users/alamb/Downloads/traces/9af5a0ac-9ce7-4efb-80da-aa315c5c3c0f.parquet, Users/alamb/Downloads/traces/eabf18cc-46c3-488d-a796-60f1ac2e3c0d.parquet, Users/alamb/Downloads/traces/289fa0a0-0525-49d2-b866-0fe575e5f401.parquet, Users/alamb/Downloads/traces/b25a7663-4f91-42b5-8779-6983946653a0.parquet, ...], [Users/alamb/Downloads/traces/24b16332-297d-476e-a3cd-aeb3ba7f4233.parquet, Users/alamb/Downloads/traces/2b60e5cf-cfa2-473c-8f2d-c52c69f31706.parquet, Users/alamb/Downloads/traces/fc5daa2e-3007-472c-ab9d-a64ba834cd0e.parquet, Users/alamb/Downloads/traces/19699a26-f0fe-40e8-ad8a-2c9ee41132ed.parquet, Users/alamb/Downloads/traces/37ec0a6a-0d1c-4762-8d38-20167dbd0939.parquet, ...], [Users/alamb/Downloads/traces/fe77b53d-8a03-4bf0-ac71-378eec7ceccb.parquet, Users/alamb/Downloads/traces/f37b233f-fc41-4a6d-889a-b794b2adc9f5.parquet, Users/alamb/Downloads/traces/ad11d4d2-a913-495c-b2fc-eaaa175670a9.parquet, Users/alamb/Downloads/traces/2437cb4f-adb4-4d91-94c1-388393e15bb3.parquet, Users/alamb/Downloads/traces/9cc8e39e-9aa2-4efa-8348-2ae48f4bb626.parquet, ...], ...]}, projection=[attributes, duration_nano, end_time_unix_nano, service.name, span.kind, span.name, span_id, time, trace_id, otel.status_code, parent_span_id], predicate=CAST(trace_id@8 AS Utf8) > 1 |
|               ||

2 rows in set. Query took 0.043 seconds.

❯ select * from traces where trace_id > 1;
0 rows in set. Query took 0.842 seconds.

@tustvold
Copy link
Contributor Author

tustvold commented Oct 25, 2023

You have an explicit cast in there, the issue originates when the schema provided to the ParquetExec and used for planning doesn't match that of the underlying parquet file. The casts are internal to SchemaAdapter and will not show up in the plan

@alamb
Copy link
Contributor

alamb commented Oct 25, 2023

You have an explicit cast in there, the issue originates when the schema provided to the ParquetExec and used for planning doesn't match that of the underlying parquet file

I see -- thank you.

@alamb
Copy link
Contributor

alamb commented Feb 6, 2024

@mhilton reminded me that we saw this in IOx when we tried to change from Timestamp(Nanos, None) to Timestamp(Nanos, UTF) -- I am writing this as a note mostly to myself

@alamb
Copy link
Contributor

alamb commented Feb 8, 2024

I have been trying to make a reproducer for this issue, but I can not seem to.

-- Create a parquet file to write two distinct row groups
copy (
 values
  (arrow_cast('2024-12-19', 'Timestamp(Nanosecond, Some("UTC"))')),
  (arrow_cast('2024-12-20', 'Timestamp(Nanosecond, Some("UTC"))'))
)
to '/tmp/example.parquet'
(
  'MAX_ROW_GROUP_SIZE' 1,
  DATA_PAGESIZE_LIMIT 1
);

+-------+
| count |
+-------+
| 2     |
+-------+
1 row in set. Query took 0.016 seconds.

This shows there are two row groups with UTC timestamp:

❯ describe '/tmp/example.parquet';
+-------------+------------------------------------+-------------+
| column_name | data_type                          | is_nullable |
+-------------+------------------------------------+-------------+
| column1     | Timestamp(Nanosecond, Some("UTC")) | YES         |
+-------------+------------------------------------+-------------+
1 row in set. Query took 0.002 seconds.

❯ select * from parquet_metadata('/tmp/example.parquet');
+----------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+---------------------+---------------------+------------------+----------------------+---------------------+---------------------+--------------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
| filename             | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type  | stats_min           | stats_max           | stats_null_count | stats_distinct_count | stats_min_value     | stats_max_value     | compression        | encodings                    | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |
+----------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+---------------------+---------------------+------------------+----------------------+---------------------+---------------------+--------------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+
| /tmp/example.parquet | 0            | 1                  | 1                     | 71              | 0         | 93          | 1          | "column1"      | INT64 | 1734566400000000000 | 1734566400000000000 | 0                |                      | 1734566400000000000 | 1734566400000000000 | ZSTD(ZstdLevel(1)) | [PLAIN, RLE, RLE_DICTIONARY] |                   | 4                      | 35               | 89                    | 71                      |
| /tmp/example.parquet | 1            | 1                  | 1                     | 71              | 0         | 258         | 1          | "column1"      | INT64 | 1734652800000000000 | 1734652800000000000 | 0                |                      | 1734652800000000000 | 1734652800000000000 | ZSTD(ZstdLevel(1)) | [PLAIN, RLE, RLE_DICTIONARY] |                   | 169                    | 200              | 89                    | 71                      |
+----------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+---------------------+---------------------+------------------+----------------------+---------------------+---------------------+--------------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+

Now, when I create a table that requires type coercion (in this case defining column1 as Timestamp(Nanos, None)), any queries I run from the SQL layer show row groups are pruned as expected (row_groups_pruned_statistics=1 in the output of explain analyze)

❯ create external table utc_table(column1 timestamp)  stored as parquet location '/tmp/example.parquet';
0 rows in set. Query took 0.001 seconds.

(note the predicate column1='2024-12-19'::timestamp compares column1 to a Timestamp(Nanos, None) so requires coercion)

❯ explain analyze select * from utc_table where column1='2024-12-19'::timestamp;

| plan_type         | plan|

| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=3.302µs|
|                   |   FilterExec: column1@0 = 1734566400000000000, metrics=[output_rows=1, elapsed_compute=46.39µs|
|                   |     RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1, metrics=[fetch_time=1.081666ms, repart_time=1ns, send_time=2.792µs|
|                   |       ParquetExec: file_groups={1 group: [[tmp/example.parquet]]}, projection=[column1], predicate=column1@0 = 1734566400000000000, pruning_predicate=column1_min@0 <= 1734566400000000000 AND 1734566400000000000 <= column1_max@1, required_guarantees=[column1 in (1734566400000000000)], metrics=[output_rows=1, elapsed_compute=1ns, file_open_errors=0, predicate_evaluation_errors=0, page_index_rows_filtered=0, row_groups_pruned_statistics=1, num_predicate_creation_errors=0, bytes_scanned=172, pushdown_rows_filtered=0, row_groups_pruned_bloom_filter=0, file_scan_errors=0, time_elapsed_scanning_until_data=369.125µs, pushdown_eval_time=2ns, time_elapsed_opening=678.666µs, time_elapsed_scanning_total=389.041µs, time_elapsed_processing=585.499µs, page_index_eval_time=19.251µs] |
|                   ||
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set. Query took 0.012 seconds.

The same result comes from

❯ explain analyze select * from '/tmp/example.parquet' where column1='2024-12-19'::timestamp;

| plan_type         | plan|

| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=4.042µs|
|                   |   FilterExec: column1@0 = 1734566400000000000, metrics=[output_rows=1, elapsed_compute=69µs|
|                   |     ParquetExec: file_groups={1 group: [[tmp/example.parquet]]}, projection=[column1], predicate=column1@0 = 1734566400000000000, pruning_predicate=column1_min@0 <= 1734566400000000000 AND 1734566400000000000 <= column1_max@1, required_guarantees=[column1 in (1734566400000000000)], metrics=[output_rows=1, elapsed_compute=1ns, file_open_errors=0, predicate_evaluation_errors=0, page_index_rows_filtered=0, row_groups_pruned_statistics=1, num_predicate_creation_errors=0, bytes_scanned=172, pushdown_rows_filtered=0, row_groups_pruned_bloom_filter=0, file_scan_errors=0, time_elapsed_scanning_until_data=510.375µs, pushdown_eval_time=2ns, time_elapsed_opening=736µs, time_elapsed_scanning_total=596.125µs, time_elapsed_processing=554.584µs, page_index_eval_time=19.126µs] |
|                   ||

1 row in set. Query took 0.015 seconds.

Maybe the problem has something to do with constructing parquet exec directly as we do in InfluxDB

@matthewmturner
Copy link
Contributor

I can pick this up as I believe we are impacted by this as well. Going to work on creating reproducer first and will provide an update then.

@matthewmturner
Copy link
Contributor

As an update, this wasnt on our critical path so its been on the back burner. Unclear if ill be looking into this in the short term.

@alamb
Copy link
Contributor

alamb commented Mar 22, 2024

@matthewmturner did you ever find a reproducer that shows the problem?

@matthewmturner
Copy link
Contributor

@alamb I didnt, but here is the python script I was working on in case it helps anyone.

import datafusion
import pyarrow

ctx = datafusion.SessionContext()


# Ref https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/to_timestamp.rs
ctx.sql(
    "COPY (SELECT * FROM (VALUES (to_timestamp('2023-07-01 00:00:00-04:00', '%Y-%m-%d %H:%M:%S%#z'), to_timestamp('2023-07-01 00:00:00'))) AS data (tz_timestamp, timestamp)) TO 'timestamp.parquet'"
).collect()

ctx.register_parquet(
    "data",
    "timestamp.parquet",
    schema=pyarrow.schema(
        [
            ("tz_timestamp", pyarrow.timestamp("ns", "America/New_York")),
            ("timestamp", pyarrow.timestamp("ns")),
        ]
    ),
)

# Works without pushdown filters on timestamp without timezone
ctx.sql("SELECT * FROM data WHERE timestamp > '2023-01-01 00:00:00'").show()
ctx.sql("SELECT * FROM data WHERE timestamp > '2023-01-01 00:00:00-04:00'").show()

# Works with pushdown filter on timestamp without timezone
ctx.sql("SET datafusion.execution.parquet.pushdown_filters=true").collect()
ctx.sql("SELECT * FROM data WHERE timestamp > '2023-01-01 00:00:00'").show()
ctx.sql("SELECT * FROM data WHERE timestamp > '2023-01-01 00:00:00-04:00'").show()

# Works without pushdown filter on timestamp with timezone
ctx.sql("SET datafusion.execution.parquet.pushdown_filters=false").collect()
ctx.sql("SELECT * FROM data WHERE tz_timestamp > '2023-01-01 00:00:00'").show()
ctx.sql("SELECT * FROM data WHERE tz_timestamp > '2023-01-01 00:00:00-04:00'").show()

# Does NOT work with pushdown filter on timestamp with timezone
ctx.sql("SET datafusion.execution.parquet.pushdown_filters=true").collect()
# ctx.sql("SELECT * FROM data WHERE tz_timestamp > '2023-01-01 00:00:00'").show()
ctx.sql("SELECT * FROM data WHERE tz_timestamp > '2023-01-01 00:00:00-04:00'").show()

@jeffreyssmith2nd
Copy link
Contributor

jeffreyssmith2nd commented May 24, 2024

The case we're running into in InfluxDB when enabling timezones is slightly different. It is a parquet file with Timestamp without a timezone and then querying with either a timezone or in UTC. The odd part is that we have manually set the schema to be 'UTC', even though the backing parquet file is not.

the issue originates when the schema provided to the ParquetExec and used for planning doesn't match that of the underlying parquet file.

I agree with the suggestion that this is the cause of the issue we're encountering.

I have not been able to reproduce the issue natively in DataFusion using the datafusion-cli, as there is no way (that I can see) to change the schema. It does behave as I would expect when the schema matches the files.

This is the script I'm using to generate some parquet files.

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

df = pd.DataFrame({
    'time': [
        pd.Timestamp(year=2024,month=1,day=1,second=0),
        pd.Timestamp(year=2024,month=1,day=1,second=1),
        pd.Timestamp(year=2024,month=1,day=1,second=2),
        pd.Timestamp(year=2024,month=1,day=1,second=3),
    ],
})

table = pa.Table.from_pandas(df)
pq.write_table(table, 'example_no_tz.parquet')


df = pd.DataFrame({
    'time': [
        pd.Timestamp(year=2024,month=1,day=1,second=0,tz='UTC'),
        pd.Timestamp(year=2024,month=1,day=1,second=1,tz='UTC'),
        pd.Timestamp(year=2024,month=1,day=1,second=2,tz='UTC'),
        pd.Timestamp(year=2024,month=1,day=1,second=3,tz='UTC'),
    ],
})


table = pa.Table.from_pandas(df)
pq.write_table(table, 'example_with_tz_utc.parquet')

Some sample queries:

> SET datafusion.execution.parquet.pushdown_filters=true;
0 row(s) fetched.
Elapsed 0.003 seconds.

> select * from './example_no_tz.parquet';
+---------------------+
| time                |
+---------------------+
| 2024-01-01T00:00:00 |
| 2024-01-01T00:00:01 |
| 2024-01-01T00:00:02 |
| 2024-01-01T00:00:03 |
+---------------------+
4 row(s) fetched.
Elapsed 0.003 seconds.

> select * from './example_no_tz.parquet' where time >= '2024-01-01T00:00:02.000';
+---------------------+
| time                |
+---------------------+
| 2024-01-01T00:00:02 |
| 2024-01-01T00:00:03 |
+---------------------+
2 row(s) fetched.
Elapsed 0.009 seconds.

> select * from './example_no_tz.parquet' where time >= ('2024-01-01T00:00:02.000' at time zone 'Europe/Brussels');
+---------------------+
| time                |
+---------------------+
| 2024-01-01T00:00:00 |
| 2024-01-01T00:00:01 |
| 2024-01-01T00:00:02 |
| 2024-01-01T00:00:03 |
+---------------------+
4 row(s) fetched.
Elapsed 0.006 seconds.

If you perform any queries against the example with timezone, where the predicate is not in the same timezone, you get a type coercion error, which I believe makes sense.

> select * from './example_with_tz_utc.parquet';
+----------------------+
| time                 |
+----------------------+
| 2024-01-01T00:00:00Z |
| 2024-01-01T00:00:01Z |
| 2024-01-01T00:00:02Z |
| 2024-01-01T00:00:03Z |
+----------------------+
4 row(s) fetched.
Elapsed 0.004 seconds.

> select * from './example_with_tz_utc.parquet' where time >= '2024-01-01T00:00:02.000';
+----------------------+
| time                 |
+----------------------+
| 2024-01-01T00:00:02Z |
| 2024-01-01T00:00:03Z |
+----------------------+
2 row(s) fetched.
Elapsed 0.007 seconds.

> select * from './example_with_tz_utc.parquet' where time >= ('2024-01-01T00:00:02.000' at time zone 'Europe/Brussels');
Error during planning: Cannot infer common argument type for comparison operation Timestamp(Nanosecond, Some("UTC")) >= Timestamp(Nanosecond, Some("Europe/Brussels"))

@jeffreyssmith2nd
Copy link
Contributor

take

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants