Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion datafusion/ffi/src/udwf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,4 +363,70 @@ impl From<&FFI_SortOptions> for SortOptions {
}

#[cfg(test)]
mod tests {}
#[cfg(feature = "integration-tests")]
mod tests {
use crate::tests::create_record_batch;
use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF};
use arrow::array::{create_array, ArrayRef};
use datafusion::functions_window::lead_lag::{lag_udwf, WindowShift};
use datafusion::logical_expr::expr::Sort;
use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF, WindowUDFImpl};
use datafusion::prelude::SessionContext;
use std::sync::Arc;

fn create_test_foreign_udwf(
original_udwf: impl WindowUDFImpl + 'static,
) -> datafusion::common::Result<WindowUDF> {
let original_udwf = Arc::new(WindowUDF::from(original_udwf));

let local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into();

let foreign_udwf: ForeignWindowUDF = (&local_udwf).try_into()?;
Ok(foreign_udwf.into())
}

#[test]
fn test_round_trip_udwf() -> datafusion::common::Result<()> {
let original_udwf = lag_udwf();
let original_name = original_udwf.name().to_owned();

// Convert to FFI format
let local_udwf: FFI_WindowUDF = Arc::clone(&original_udwf).into();

// Convert back to native format
let foreign_udwf: ForeignWindowUDF = (&local_udwf).try_into()?;
let foreign_udwf: WindowUDF = foreign_udwf.into();

assert_eq!(original_name, foreign_udwf.name());
Ok(())
}

#[tokio::test]
async fn test_lag_udwf() -> datafusion::common::Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified these tests fail without the code change in this PR

    thread 'udwf::tests::test_lag_udwf' panicked at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-schema-55.1.0/src/schema.rs:382:10:
    index out of bounds: the len is 0 but the index is 0
    note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

    thread 'udwf::tests::test_lag_udwf' panicked at library/core/src/panicking.rs:218:5:
    panic in a function that cannot unwind
    stack backtrace:
       0:        0x108eec990 - std::backtrace_rs::backtrace::libunwind::trace::hd09570c029a6744a
                                   at /rustc/17067e9ac6d7ecb70e50f92c1944e545188d2359/library/std/src/../../backtrace/src/backtrace/libunwind.rs:117:9
       1:        0x108eec990 - std::backtrace_rs::backtrace::trace_unsynchronized::h8d2fa64833f91cb3

let udwf = create_test_foreign_udwf(WindowShift::lag())?;

let ctx = SessionContext::default();
let df = ctx.read_batch(create_record_batch(-5, 5))?;

let df = df.select(vec![
col("a"),
udwf.call(vec![col("a")])
.order_by(vec![Sort::new(col("a"), true, true)])
.build()
.unwrap()
.alias("lag_a"),
])?;

df.clone().show().await?;

let result = df.collect().await?;
let expected =
create_array!(Int32, [None, Some(-5), Some(-4), Some(-3), Some(-2)])
as ArrayRef;

assert_eq!(result.len(), 1);
assert_eq!(result[0].column(1), &expected);

Ok(())
}
}
27 changes: 17 additions & 10 deletions datafusion/ffi/src/udwf/partition_evaluator_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,24 @@ impl TryFrom<PartitionEvaluatorArgs<'_>> for FFI_PartitionEvaluatorArgs {
})
.collect();

let max_column = required_columns.keys().max().unwrap_or(&0).to_owned();
let fields: Vec<_> = (0..max_column)
.map(|idx| match required_columns.get(&idx) {
Some((name, data_type)) => Field::new(*name, (*data_type).clone(), true),
None => Field::new(
format!("ffi_partition_evaluator_col_{idx}"),
DataType::Null,
true,
),
let max_column = required_columns.keys().max();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is time to simply pass in the real schema to PartitionEvaluatorArgs 🤔

let fields: Vec<_> = max_column
.map(|max_column| {
(0..(max_column + 1))
.map(|idx| match required_columns.get(&idx) {
Some((name, data_type)) => {
Field::new(*name, (*data_type).clone(), true)
}
None => Field::new(
format!("ffi_partition_evaluator_col_{idx}"),
DataType::Null,
true,
),
})
.collect()
})
.collect();
.unwrap_or_default();

let schema = Arc::new(Schema::new(fields));

let codec = DefaultPhysicalExtensionCodec {};
Expand Down