Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inner join involving hive-partitioned parquet dataset and filters on LHS and RHS causes panic #9797

Open
jwimberl opened this issue Mar 25, 2024 · 10 comments
Labels
bug Something isn't working

Comments

@jwimberl
Copy link

Describe the bug

I am attempting to optimize an inner join on two hive-partitioned parquet datasets lhs and rhs, joined on one column join_col. In a base working query, the left-hand side of the inner join is lhs itself and the right-hand side rhs filtered by some numeric columns. It just so happens for this data that equality on join_col implies equality of a less granular partition column join_col_partitioned. However, a modification to the query, in which the left-hand side lhs is first filtered to records with join_col_partitioned among the distinct values present in the right-hand side, results in a repeatable panic.

To Reproduce

I cannot provide the raw data but will provide all the details that I can, with some details obfuscated.

External tables

The tables lhs and rhs in question are created with

CREATE EXTERNAL TABLE lhs
STORED AS parquet
PARTITIONED BY (join_col_chk_first)
LOCATION '/LHS_ROOT/chunks/*/chunk.parquet';

and

CREATE EXTERNAL TABLE rhs
STORED AS parquet
PARTITIONED BY (join_col_chk_first, col1, col2_chk_first)
LOCATION '/RHS_ROOT/*/*/*/chunk.parquet'

The table lhs also contains columns keep_col and join_col, as well as others not referenced in the query; similarly rhs contains columns col2 and join_col. Both tables have the same number of records, 28914441.

Working query

The working base version of the query is

SELECT a.keep_col
FROM lhs AS a
INNER JOIN (
  SELECT * FROM rhs
  WHERE col1=7 AND col2>=0 AND col2<=25000
)
AS b
ON a.join_col = b.join_col;

and returns 375130 records, which is precisely the number of records in SELECT * FROM rhs WHERE col1=7 AND col2>=0 AND col2<=25000.

Modified panic-inducing query

The modification to the query that causes the panic is

SELECT a.keep_col
FROM (
  SELECT * FROM lhs
  WHERE join_col_partitioned IN (
    SELECT DISTINCT join_col_partitioned
    FROM rhs
    WHERE col1=7 AND col2>=0 AND col2<=25000
  )
) AS a
INNER JOIN (
  SELECT * FROM rhs
  WHERE col1=7 AND col2>=0 AND col2<=25000
)
AS b
ON a.join_col = b.join_col;

The nested query SELECT DISTINCT join_col_partitioned FROM rhs WHERE col1=7 AND col2>=0 AND col2<=25000 returns two distinct values of join_col_partitioned.

Running this query dependably produces a panic (repeated 5x or so times).

Logical and physical plans and backtraces

I have included the the output of EXPLAIN for both of these queries in the Additional context section, as well as the regular (RUST_BACKTRACE=1) and full (RUST_BACKTRACE=full) backtraces for the panic.

Expected behavior

I expected the second query to return the exact data as the first query (due to the aforementioned fact that for these datasets, equality on join_col implies equality on join_col_partitioned, and not to result in a panic.

Additional context

Environment

  • Rocky 8
  • DataFusion 34.0.0
  • Arrow 49.0.0

Explanation for working query

Output of EXPLAIN SELECT a.keep_col FROM lhs AS a INNER JOIN (SELECT * FROM rhs WHERE col1=7 AND col2>=0 AND col2<=25000) AS b ON a.join_col = b.join_col;:

plan_type: [["logical_plan","physical_plan"]]
plan: [["Projection: a.keep_col
  Inner Join: a.join_col = b.join_col
    SubqueryAlias: a
      TableScan: lhs projection=[keep_col, join_col]
    SubqueryAlias: b
      Projection: rhs.join_col
        Filter: rhs.col2 >= Int64(0) AND rhs.col2 <= Int64(25000)
          TableScan: rhs projection=[join_col, col2], full_filters=[CAST(rhs.col1 AS Utf8) = Utf8("7")], partial_filters=[rhs.col2 >= Int64(0), rhs.col2 <= Int64(25000)]","ProjectionExec: expr=[keep_col@0 as keep_col]
  CoalesceBatchesExec: target_batch_size=8192
    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_col@1, join_col@0)]
      CoalesceBatchesExec: target_batch_size=8192
        RepartitionExec: partitioning=Hash([join_col@1], 96), input_partitions=96
          ParquetExec: file_groups={96 groups: [[LHS_ROOT/join_col_chk_first=0/chunk.parquet:0..26277405], [LHS_ROOT/join_col_chk_first=0/chunk.parquet:26277405..52554810], [LHS_ROOT/join_col_chk_first=0/chunk.parquet:52554810..78832215], [LHS_ROOT/join_col_chk_first=0/chunk.parquet:78832215..95615950, LHS_ROOT/join_col_chk_first=1000000/chunk.parquet:0..9493670], [LHS_ROOT/join_col_chk_first=1000000/chunk.parquet:9493670..35771075], ...]}, projection=[keep_col, join_col]
      CoalesceBatchesExec: target_batch_size=8192
        RepartitionExec: partitioning=Hash([join_col@0], 96), input_partitions=96
          ProjectionExec: expr=[join_col@0 as join_col]
            CoalesceBatchesExec: target_batch_size=8192
              FilterExec: col2@1 >= 0 AND col2@1 <= 25000
                RepartitionExec: partitioning=RoundRobinBatch(96), input_partitions=81
                  ParquetExec: file_groups={81 groups: [[RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=1/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=10000001/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=1000001/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=11000001/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=12000001/chunk.parquet], ...]}, projection=[join_col, col2], predicate=CAST(col1@7 AS Utf8) = 7 AND col2@3 >= 0 AND col2@3 <= 25000, pruning_predicate=col2_max@0 >= 0 AND col2_min@1 <= 25000

Explanation for failing query

Output of EXPLAIN SELECT a.keep_col FROM (SELECT * FROM lhs WHERE join_col_partitioned IN (SELECT DISTINCT join_col_partitioned FROM rhs WHERE col1=7 AND col2>=0 AND col2<=25000)) AS a INNER JOIN (SELECT * FROM rhs WHERE col1=7 AND col2>=0 AND col2<=25000) AS b ON a.join_col = b.join_col;:

  plan_type: [["logical_plan","physical_plan"]]
  plan: [["Projection: a.keep_col
    Inner Join: a.join_col = b.join_col
      SubqueryAlias: a
        Projection: lhs.keep_col, lhs.join_col
          LeftSemi Join: lhs.join_col_chk_first = __correlated_sq_1.join_col_chk_first
            TableScan: lhs projection=[keep_col, join_col, join_col_chk_first]
            SubqueryAlias: __correlated_sq_1
              Aggregate: groupBy=[[rhs.join_col_chk_first]], aggr=[[]]
                Projection: rhs.join_col_chk_first
                  Filter: rhs.col2 >= Int64(0) AND rhs.col2 <= Int64(25000)
                    TableScan: rhs projection=[col2, join_col_chk_first], full_filters=[CAST(rhs.col1 AS Utf8) = Utf8("7")], partial_filters=[rhs.col2 >= Int64(0), rhs.col2 <= Int64(25000)]
      SubqueryAlias: b
        Projection: rhs.join_col
          Filter: rhs.col2 >= Int64(0) AND rhs.col2 <= Int64(25000)
            TableScan: rhs projection=[join_col, col2], full_filters=[CAST(rhs.col1 AS Utf8) = Utf8("7")], partial_filters=[rhs.col2 >= Int64(0), rhs.col2 <= Int64(25000)]","ProjectionExec: expr=[keep_col@0 as keep_col]
    CoalesceBatchesExec: target_batch_size=8192
      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_col@1, join_col@0)]
        CoalesceBatchesExec: target_batch_size=8192
          RepartitionExec: partitioning=Hash([join_col@1], 96), input_partitions=96
            ProjectionExec: expr=[keep_col@0 as keep_col, join_col@1 as join_col]
              CoalesceBatchesExec: target_batch_size=8192
                HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(join_col_chk_first@2, join_col_chk_first@0)]
                  CoalesceBatchesExec: target_batch_size=8192
                    RepartitionExec: partitioning=Hash([join_col_chk_first@2], 96), input_partitions=96
                      ParquetExec: file_groups={96 groups: [[LHS_ROOT/join_col_chk_first=0/chunk.parquet:0..26277405], [LHS_ROOT/join_col_chk_first=0/chunk.parquet:26277405..52554810], [LHS_ROOT/join_col_chk_first=0/chunk.parquet:52554810..78832215], [LHS_ROOT/join_col_chk_first=0/chunk.parquet:78832215..95615950, LHS_ROOT/join_col_chk_first=1000000/chunk.parquet:0..9493670], [LHS_ROOT/join_col_chk_first=1000000/chunk.parquet:9493670..35771075], ...]}, projection=[keep_col, join_col, join_col_chk_first]
                  AggregateExec: mode=FinalPartitioned, gby=[join_col_chk_first@0 as join_col_chk_first], aggr=[]
                    CoalesceBatchesExec: target_batch_size=8192
                      RepartitionExec: partitioning=Hash([join_col_chk_first@0], 96), input_partitions=96
                        AggregateExec: mode=Partial, gby=[join_col_chk_first@0 as join_col_chk_first], aggr=[]
                          ProjectionExec: expr=[join_col_chk_first@1 as join_col_chk_first]
                            CoalesceBatchesExec: target_batch_size=8192
                              FilterExec: col2@0 >= 0 AND col2@0 <= 25000
                                RepartitionExec: partitioning=RoundRobinBatch(96), input_partitions=81
                                  ParquetExec: file_groups={81 groups: [[RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=1/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=10000001/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=1000001/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=11000001/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=12000001/chunk.parquet], ...]}, projection=[col2, join_col_chk_first], predicate=CAST(col1@7 AS Utf8) = 7 AND col2@3 >= 0 AND col2@3 <= 25000, pruning_predicate=col2_max@0 >= 0 AND col2_min@1 <= 25000
        CoalesceBatchesExec: target_batch_size=8192
          RepartitionExec: partitioning=Hash([join_col@0], 96), input_partitions=96
            ProjectionExec: expr=[join_col@0 as join_col]
              CoalesceBatchesExec: target_batch_size=8192
                FilterExec: col2@1 >= 0 AND col2@1 <= 25000
                  RepartitionExec: partitioning=RoundRobinBatch(96), input_partitions=81
                    ParquetExec: file_groups={81 groups: [[RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=1/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=10000001/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=1000001/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=11000001/chunk.parquet], [RHS_ROOT/join_col_chk_first=25000000/col1=7/col2_chk_first=12000001/chunk.parquet], ...]}, projection=[join_col, col2], predicate=CAST(col1@7 AS Utf8) = 7 AND col2@3 >= 0 AND col2@3 <= 25000, pruning_predicate=col2_max@0 >= 0 AND col2_min@1 <= 25000
  "]]

Regular backtrace

Mar 25 14:56:41 <hostname> sh[942819]: thread 'tokio-runtime-worker' panicked at /home/username/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-49.0.0/src/transform/utils.rs:42:56:
Mar 25 14:56:41 <hostname> sh[942819]: offset overflow
Mar 25 14:56:41 <hostname> sh[942819]: stack backtrace:
Mar 25 14:56:41 <hostname> sh[942819]:    0: rust_begin_unwind
Mar 25 14:56:41 <hostname> sh[942819]:              at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/panicking.rs:645:5
Mar 25 14:56:41 <hostname> sh[942819]:    1: core::panicking::panic_fmt
Mar 25 14:56:41 <hostname> sh[942819]:              at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/panicking.rs:72:14
Mar 25 14:56:41 <hostname> sh[942819]:    2: core::panicking::panic_display
Mar 25 14:56:41 <hostname> sh[942819]:              at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/panicking.rs:196:5
Mar 25 14:56:41 <hostname> sh[942819]:    3: core::panicking::panic_str
Mar 25 14:56:41 <hostname> sh[942819]:              at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/panicking.rs:171:5
Mar 25 14:56:41 <hostname> sh[942819]:    4: core::option::expect_failed
Mar 25 14:56:41 <hostname> sh[942819]:              at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/option.rs:1980:5
Mar 25 14:56:41 <hostname> sh[942819]:    5: arrow_data::transform::utils::extend_offsets
Mar 25 14:56:41 <hostname> sh[942819]:    6: arrow_data::transform::variable_size::build_extend::{{closure}}
Mar 25 14:56:41 <hostname> sh[942819]:    7: arrow_data::transform::MutableArrayData::extend
Mar 25 14:56:41 <hostname> sh[942819]:    8: arrow_select::concat::concat_fallback
Mar 25 14:56:41 <hostname> sh[942819]:    9: arrow_select::concat::concat
Mar 25 14:56:41 <hostname> sh[942819]:   10: arrow_select::concat::concat_batches
Mar 25 14:56:41 <hostname> sh[942819]:   11: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
Mar 25 14:56:41 <hostname> sh[942819]:   12: <futures_util::future::future::shared::Shared<Fut> as core::future::future::Future>::poll
Mar 25 14:56:41 <hostname> sh[942819]:   13: datafusion_physical_plan::joins::utils::OnceFut<T>::get
Mar 25 14:56:41 <hostname> sh[942819]:   14: <datafusion_physical_plan::joins::hash_join::HashJoinStream as futures_core::stream::Stream>::poll_next
Mar 25 14:56:41 <hostname> sh[942819]:   15: <datafusion_physical_plan::coalesce_batches::CoalesceBatchesStream as futures_core::stream::Stream>::poll_next
Mar 25 14:56:41 <hostname> sh[942819]:   16: datafusion_physical_plan::repartition::RepartitionExec::pull_from_input::{{closure}}
Mar 25 14:56:41 <hostname> sh[942819]:   17: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
Mar 25 14:56:41 <hostname> sh[942819]:   18: tokio::runtime::task::core::Core<T,S>::poll
Mar 25 14:56:41 <hostname> sh[942819]:   19: tokio::runtime::task::harness::Harness<T,S>::poll
Mar 25 14:56:41 <hostname> sh[942819]:   20: tokio::runtime::scheduler::multi_thread::worker::Context::run_task
Mar 25 14:56:41 <hostname> sh[942819]:   21: tokio::runtime::scheduler::multi_thread::worker::run
Mar 25 14:56:41 <hostname> sh[942819]:   22: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
Mar 25 14:56:41 <hostname> sh[942819]:   23: tokio::runtime::task::core::Core<T,S>::poll
Mar 25 14:56:41 <hostname> sh[942819]:   24: tokio::runtime::task::harness::Harness<T,S>::poll
Mar 25 14:56:41 <hostname> sh[942819]:   25: tokio::runtime::blocking::pool::Inner::run
Mar 25 14:56:41 <hostname> sh[942819]: note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Full backtrace

Mar 25 15:02:03 <hostname> sh[957703]: thread 'tokio-runtime-worker' panicked at /home/username/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-49.0.0/src/transform/utils.rs:42:56:
Mar 25 15:02:03 <hostname> sh[957703]: offset overflow
Mar 25 15:02:03 <hostname> sh[957703]: stack backtrace:
Mar 25 15:02:03 <hostname> sh[957703]: thread 'tokio-runtime-worker' panicked at /home/username/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-49.0.0/src/transform/utils.rs:42:56:
Mar 25 15:02:03 <hostname> sh[957703]: offset overflow
Mar 25 15:02:03 <hostname> sh[957703]:    0:     0x560a5f972a96 - std::backtrace_rs::backtrace::libunwind::trace::hbee8a7973eeb6c93
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/../../backtrace/src/backtrace/libunwind.rs:104:5
Mar 25 15:02:03 <hostname> sh[957703]:    1:     0x560a5f972a96 - std::backtrace_rs::backtrace::trace_unsynchronized::hc8ac75eea3aa6899
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5
Mar 25 15:02:03 <hostname> sh[957703]:    2:     0x560a5f972a96 - std::sys_common::backtrace::_print_fmt::hc7f3e3b5298b1083
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/sys_common/backtrace.rs:68:5
Mar 25 15:02:03 <hostname> sh[957703]:    3:     0x560a5f972a96 - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::hbb235daedd7c6190
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/sys_common/backtrace.rs:44:22
Mar 25 15:02:03 <hostname> sh[957703]:    4:     0x560a5f99fe00 - core::fmt::rt::Argument::fmt::h76c38a80d925a410
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/fmt/rt.rs:142:9
Mar 25 15:02:03 <hostname> sh[957703]:    5:     0x560a5f99fe00 - core::fmt::write::h3ed6aeaa977c8e45
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/fmt/mod.rs:1120:17
Mar 25 15:02:03 <hostname> sh[957703]:    6:     0x560a5f96f9ff - std::io::Write::write_fmt::h78b18af5775fedb5
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/io/mod.rs:1810:15
Mar 25 15:02:03 <hostname> sh[957703]:    7:     0x560a5f972874 - std::sys_common::backtrace::_print::h5d645a07e0fcfdbb
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/sys_common/backtrace.rs:47:5
Mar 25 15:02:03 <hostname> sh[957703]:    8:     0x560a5f972874 - std::sys_common::backtrace::print::h85035a511aafe7a8
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/sys_common/backtrace.rs:34:9
Mar 25 15:02:03 <hostname> sh[957703]:    9:     0x560a5f9740f7 - std::panicking::default_hook::{{closure}}::hcce8cea212785a25
Mar 25 15:02:03 <hostname> sh[957703]:   10:     0x560a5f973e59 - std::panicking::default_hook::hf5fcb0f213fe709a
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/panicking.rs:292:9
Mar 25 15:02:03 <hostname> sh[957703]:   11:     0x560a5f974588 - std::panicking::rust_panic_with_hook::h095fccf1dc9379ee
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/panicking.rs:779:13
Mar 25 15:02:03 <hostname> sh[957703]:   12:     0x560a5f974462 - std::panicking::begin_panic_handler::{{closure}}::h032ba12139b353db
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/panicking.rs:657:13
Mar 25 15:02:03 <hostname> sh[957703]:   13:     0x560a5f972f96 - std::sys_common::backtrace::__rust_end_short_backtrace::h9259bc2ff8fd0f76
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/sys_common/backtrace.rs:171:18
Mar 25 15:02:03 <hostname> sh[957703]:   14:     0x560a5f9741c0 - rust_begin_unwind
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/panicking.rs:645:5
Mar 25 15:02:03 <hostname> sh[957703]:   15:     0x560a5d65e4e5 - core::panicking::panic_fmt::h784f20a50eaab275
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/panicking.rs:72:14
Mar 25 15:02:03 <hostname> sh[957703]:   16:     0x560a5d65e4a3 - core::panicking::panic_display::h251010ce5e0560d5
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/panicking.rs:196:5
Mar 25 15:02:03 <hostname> sh[957703]:   17:     0x560a5d65e4a3 - core::panicking::panic_str::h6f89534c81f0edc4
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/panicking.rs:171:5
Mar 25 15:02:03 <hostname> sh[957703]:   18:     0x560a5d65e4a3 - core::option::expect_failed::hc85eb6037a3050f7
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/option.rs:1980:5
Mar 25 15:02:03 <hostname> sh[957703]:   19:     0x560a5f924a82 - arrow_data::transform::utils::extend_offsets::hdf2f06f3dfb426cd
Mar 25 15:02:03 <hostname> sh[957703]:   20:     0x560a5f9279c6 - arrow_data::transform::variable_size::build_extend::{{closure}}::h31904a480ae969de
Mar 25 15:02:03 <hostname> sh[957703]:   21:     0x560a5f9183b5 - arrow_data::transform::MutableArrayData::extend::hde96a1dcf5382372
Mar 25 15:02:03 <hostname> sh[957703]:   22:     0x560a5f89081f - arrow_select::concat::concat_fallback::h58848d6223cb12c9
Mar 25 15:02:03 <hostname> sh[957703]:   23:     0x560a5f89049e - arrow_select::concat::concat::h2c60d37437746b3e
Mar 25 15:02:03 <hostname> sh[957703]:   24:     0x560a5e101518 - arrow_select::concat::concat_batches::hbc4b4e1ab862a29d
Mar 25 15:02:03 <hostname> sh[957703]:   25:     0x560a5e178464 - <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll::h3e1df0aac2ab9d77
Mar 25 15:02:03 <hostname> sh[957703]:   26:     0x560a5e02f8f2 - <futures_util::future::future::shared::Shared<Fut> as core::future::future::Future>::poll::h7c9852d5bb7e9534
Mar 25 15:02:03 <hostname> sh[957703]:   27:     0x560a5e16e2bf - datafusion_physical_plan::joins::utils::OnceFut<T>::get::hd3b2e25c093ada6d
Mar 25 15:02:03 <hostname> sh[957703]:   28:     0x560a5e195dd9 - <datafusion_physical_plan::joins::hash_join::HashJoinStream as futures_core::stream::Stream>::poll_next::h9c658418141705f6
Mar 25 15:02:03 <hostname> sh[957703]:   29:     0x560a5e18a696 - <datafusion_physical_plan::coalesce_batches::CoalesceBatchesStream as futures_core::stream::Stream>::poll_next::h219998518ccf857d
Mar 25 15:02:03 <hostname> sh[957703]:   30:     0x560a5e00bada - datafusion_physical_plan::repartition::RepartitionExec::pull_from_input::{{closure}}::h1ed64d4cd49cce5e
Mar 25 15:02:03 <hostname> sh[957703]:   31:     0x560a5e000993 - tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut::h38ddf14813d65bab
Mar 25 15:02:03 <hostname> sh[957703]:   32:     0x560a5e03d6ee - tokio::runtime::task::core::Core<T,S>::poll::h3df6db59b96cdc95
Mar 25 15:02:03 <hostname> sh[957703]:   33:     0x560a5e040779 - tokio::runtime::task::harness::Harness<T,S>::poll::h56978e4fa718a3b4
Mar 25 15:02:03 <hostname> sh[957703]:   34:     0x560a5f0e3ad8 - tokio::runtime::scheduler::multi_thread::worker::Context::run_task::hb25f4c4bb31b0772
Mar 25 15:02:03 <hostname> sh[957703]:   35:     0x560a5f0e35b5 - tokio::runtime::scheduler::multi_thread::worker::run::h9904ff19a8f264dd
Mar 25 15:02:03 <hostname> sh[957703]:   36:     0x560a5f0dfcd2 - tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut::h76fc62ff172daba6
Mar 25 15:02:03 <hostname> sh[957703]:   37:     0x560a5f0f2af9 - tokio::runtime::task::core::Core<T,S>::poll::h256060c96f596608
Mar 25 15:02:03 <hostname> sh[957703]:   38:     0x560a5f0ded9a - tokio::runtime::task::harness::Harness<T,S>::poll::h0b57f37a1c3a1ec2
Mar 25 15:02:03 <hostname> sh[957703]:   39:     0x560a5f0ee780 - tokio::runtime::blocking::pool::Inner::run::ha371e4f6189503f2
Mar 25 15:02:03 <hostname> sh[957703]:   40:     0x560a5f0e84fc - std::sys_common::backtrace::__rust_begin_short_backtrace::h25663fb0bf4a4475
Mar 25 15:02:03 <hostname> sh[957703]:   41:     0x560a5f0f0a79 - core::ops::function::FnOnce::call_once{{vtable.shim}}::ha0885a2c26e1b6a6
Mar 25 15:02:03 <hostname> sh[957703]:   42:     0x560a5f9783d5 - <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once::h12de4fc57affb195
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/alloc/src/boxed.rs:2015:9
Mar 25 15:02:03 <hostname> sh[957703]:   43:     0x560a5f9783d5 - <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once::h3c619f45059d5cf1
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/alloc/src/boxed.rs:2015:9
Mar 25 15:02:03 <hostname> sh[957703]:   44:     0x560a5f9783d5 - std::sys::unix::thread::Thread::new::thread_start::hbac657605e4b7389
Mar 25 15:02:03 <hostname> sh[957703]:                                at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/sys/unix/thread.rs:108:17
Mar 25 15:02:03 <hostname> sh[957703]:   45:     0x7f094d03d1ca - start_thread
Mar 25 15:02:03 <hostname> sh[957703]:   46:     0x7f094c723e73 - __clone
Mar 25 15:02:03 <hostname> sh[957703]:   47:                0x0 - <unknown>
@jwimberl jwimberl added the bug Something isn't working label Mar 25, 2024
@alamb
Copy link
Contributor

alamb commented Apr 3, 2024

Hi @jwimberl -- I wonder if this is related to #7848 which was fixed in #8020 by @korowa

What was happening there was that the entire join output was created in a single record batch
Looks like 34.0.0 was released prior to #8020

Any chance you can try this with a newer version fo DataFusion?

@jwimberl
Copy link
Author

jwimberl commented Apr 4, 2024

Yes, is 36.0.0 the first version to include this fix?

@alamb
Copy link
Contributor

alamb commented Apr 4, 2024

Yes, is 36.0.0 the first version to include this fix?

Yes, that is my understanding of the release notes: https://github.com/apache/arrow-datafusion/blob/main/dev/changelog/36.0.0.md#3600-2024-02-16

@jwimberl
Copy link
Author

jwimberl commented Apr 4, 2024

I'm not able to run the query with 36.0.0. Perhaps there was some breaking change in 35.0.0 or 36.0.0 (can't find one that looks relevant in the release notes) that affected the syntax for how hive-partitioned parquet datasets are registered? Because now, after loading one such dataset as an external table, via e.g.

CREATE EXTERNAL TABLE test
STORED AS PARQUET
PARTITIONED BY (part1, part2)
LOCATION '/path/to/dataset/*/*/chunk.parquet'

the columns of the table are only the hive partition columns part and part2, and the columns of the parquet files themselves are not present in the session context table. Is the [ (<column_definition>) ] part of the external table syntax now required in cases like this?

@alamb
Copy link
Contributor

alamb commented Apr 4, 2024

I am not quite sure what is going on here -- maybe @devinjdangelo or @metesynnada remembers

37.0.0 also has significant changes in these areas so maybe things will work in 37.0.0 🤔

@jwimberl
Copy link
Author

jwimberl commented Apr 4, 2024

If it is helpful I can try to produce a self-contained repro

@alamb
Copy link
Contributor

alamb commented Apr 4, 2024

If it is helpful I can try to produce a self-contained repro

Yes that would be most helpful. I am not sure how to make this ticket actionable otherwise

@jwimberl
Copy link
Author

jwimberl commented Apr 4, 2024

Perhaps the issue was with the syntax for the partitioned parquet file location(s). I did not change the wildcard based syntax I had been using before, shown at the top of this issue. However, I see in the docs now that the location only need be the parent directory of the top-level partition:

https://arrow.apache.org/datafusion/user-guide/sql/ddl.html#cautions-when-using-the-with-order-clause

Using a version of that example NYC taxi data (I actually just downloaded one file), I did verify that the wildcard based location doesn't produce an error during table creation but does not work. The error is slightly different -- the table is completely empty -- but maybe it's the same issue

## $ wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
--2024-04-04 18:38:48--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.32.192.124, 13.32.192.116, 13.32.192.190, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.32.192.124|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 49961641 (48M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-01.parquet’

yellow_tripdata_2024-01.parquet           100%[=====================================================================================>]  47.65M  79.9MB/s    in 0.6s

2024-04-04 18:38:48 (79.9 MB/s) - ‘yellow_tripdata_2024-01.parquet’ saved [49961641/49961641]

## $ mkdir -p year=2024/month=01
## $ mv yellow_tripdata_2024-01.parquet year\=2024/month\=01/tripdata.parquet
## $ python3
Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:53:32) [GCC 12.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import datafusion as df
>>> df.__version__
'36.0.0'
>>> ctx = df.SessionContext()
>>> ctx.sql("""
... CREATE EXTERNAL TABLE taxi
... STORED AS PARQUET
... PARTITIONED BY (year, month)
... LOCATION '/path/to/nyctaxi';
... """)
DataFrame()
++
++
>>> ctx.sql("SELECT COUNT(*) FROM taxi;")
DataFrame()
+----------+
| COUNT(*) |
+----------+
| 2964624  |
+----------+
>>> ctx.sql("""
... CREATE EXTERNAL TABLE taxi2
... STORED AS PARQUET
... PARTITIONED BY (year, month)
... LOCATION '/path/to/nyctaxi/*/*/tripdata.parquet';
... """)
DataFrame()
++
++
>>> ctx.sql("SELECT COUNT(*) FROM taxi2;")
DataFrame()
+----------+
| COUNT(*) |
+----------+
| 0        |
+----------+
>>>

I don't know if the wildcard syntax I used was what this DDL documentation page used to show or if I got the syntax wrong from the get-go, and it just happened to be an unsupported format that worked up until now? Naturally I'm attempting the query again with my original dataset, but since its a rather large amount of data re-recreating the external table will take some time.

@jwimberl
Copy link
Author

jwimberl commented Apr 4, 2024

OK, with DataFusion 36.0.0, I still get a panic when running this query in its original context -- which is a rust module using the datafusion crate and its dependencies. However, I don't get it when using either DataFusion 34.0.0 or 36.0.0 python modules (which I understand are wrappers around the same Rust code). Perhaps this points to some interaction that occurs for the set of dependency versions I'm using that differs from the set involved in the python module? I can provide the cargo tree that I am using, build tool versions, or other information that might be helpful.

@jwimberl
Copy link
Author

jwimberl commented Apr 5, 2024

Oops, apologies for the misleading noise in my previous comment -- I can't exactly reproduce what I did wrong but I must have run some subtly different query when using the DataFusion python modules. The panic occurs identically in 34 and 36, whether using the python module or my project using the equivalent crates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants