Skip to content

Commit

Permalink
Informative Error Message for LAG and LEAD functions (#3963)
Browse files Browse the repository at this point in the history
* panic is changed to error for lag and lead functions

* remove unnecessary changes

* remove unnecessary changes
  • Loading branch information
mustafasrepo authored Oct 27, 2022
1 parent c505739 commit 4b5d720
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
24 changes: 15 additions & 9 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,19 @@ pub fn create_window_expr(
fn get_scalar_value_from_args(
args: &[Arc<dyn PhysicalExpr>],
index: usize,
) -> Option<ScalarValue> {
args.get(index).map(|v| {
v.as_any()
) -> Result<Option<ScalarValue>> {
Ok(if let Some(field) = args.get(index) {
let tmp = field
.as_any()
.downcast_ref::<Literal>()
.unwrap()
.ok_or_else(|| DataFusionError::NotImplemented(
format!("There is only support Literal types for field at idx: {} in Window Function", index),
))?
.value()
.clone()
.clone();
Some(tmp)
} else {
None
})
}

Expand All @@ -98,20 +104,20 @@ fn create_built_in_window_expr(
let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?;
let arg = coerced_args[0].clone();
let data_type = args[0].data_type(input_schema)?;
let shift_offset = get_scalar_value_from_args(&coerced_args, 1)
let shift_offset = get_scalar_value_from_args(&coerced_args, 1)?
.map(|v| v.try_into())
.and_then(|v| v.ok());
let default_value = get_scalar_value_from_args(&coerced_args, 2);
let default_value = get_scalar_value_from_args(&coerced_args, 2)?;
Arc::new(lag(name, data_type, arg, shift_offset, default_value))
}
BuiltInWindowFunction::Lead => {
let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?;
let arg = coerced_args[0].clone();
let data_type = args[0].data_type(input_schema)?;
let shift_offset = get_scalar_value_from_args(&coerced_args, 1)
let shift_offset = get_scalar_value_from_args(&coerced_args, 1)?
.map(|v| v.try_into())
.and_then(|v| v.ok());
let default_value = get_scalar_value_from_args(&coerced_args, 2);
let default_value = get_scalar_value_from_args(&coerced_args, 2)?;
Arc::new(lead(name, data_type, arg, shift_offset, default_value))
}
BuiltInWindowFunction::NthValue => {
Expand Down
20 changes: 20 additions & 0 deletions datafusion/core/tests/sql/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,26 @@ async fn window_frame_groups_query() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn window_frame_lag() -> Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
// execute the query
let df = ctx
.sql(
"SELECT c2,
lag(c2, c2, c2) OVER () as lag1
FROM aggregate_test_100;",
)
.await?;
let err = df.collect().await.unwrap_err();
assert_eq!(
err.to_string(),
"This feature is not implemented: There is only support Literal types for field at idx: 1 in Window Function".to_owned()
);
Ok(())
}

#[tokio::test]
async fn window_frame_creation() -> Result<()> {
let ctx = SessionContext::new();
Expand Down

0 comments on commit 4b5d720

Please sign in to comment.