Skip to content

Commit

Permalink
add partition by
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed Jun 15, 2021
1 parent 8a090de commit c3c0ef5
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 25 deletions.
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ tokio = "1.0"
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }

arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc" }
arrow-flight = { version = "4.0" }

datafusion = { path = "../../../datafusion" }

Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ tokio-stream = "0.1"
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }

arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc" }
arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc" }
arrow = { version = "4.0" }
arrow-flight = { version = "4.0" }

datafusion = { path = "../../../datafusion" }

Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ clap = "2.33"
rustyline = "8.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
datafusion = { path = "../datafusion" }
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc" }
arrow = { version = "4.0" }
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ publish = false


[dev-dependencies]
arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc" }
arrow-flight = { version = "4.0" }
datafusion = { path = "../datafusion" }
prost = "0.7"
tonic = "0.4"
Expand Down
6 changes: 2 additions & 4 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ unicode_expressions = ["unicode-segmentation"]
[dependencies]
ahash = "0.7"
hashbrown = "0.11"
# arrow = { version = "4.0", features = ["prettyprint"] }
# parquet = { version = "4.0", features = ["arrow"] }
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc", features = ["prettyprint"] }
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc", features = ["arrow"] }
arrow = { version = "4.0", features = ["prettyprint"] }
parquet = { version = "4.0", features = ["arrow"] }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
22 changes: 9 additions & 13 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1358,10 +1358,6 @@ mod tests {
"SELECT \
c1, \
c2, \
ROW_NUMBER() OVER (PARTITION BY c2), \
FIRST_VALUE(c2) OVER (PARTITION BY c2), \
LAST_VALUE(c2) OVER (PARTITION BY c2), \
NTH_VALUE(c2, 2) OVER (PARTITION BY c2), \
SUM(c2) OVER (PARTITION BY c2), \
COUNT(c2) OVER (PARTITION BY c2), \
MAX(c2) OVER (PARTITION BY c2), \
Expand All @@ -1380,15 +1376,15 @@ mod tests {
assert_eq!(results.len(), 1);

let expected = vec![
"+----+----+--------------+-----------------+----------------+------------------------+---------+-----------+---------+---------+---------+",
"| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(c2) | LAST_VALUE(c2) | NTH_VALUE(c2,Int64(2)) | SUM(c2) | COUNT(c2) | MAX(c2) | MIN(c2) | AVG(c2) |",
"+----+----+--------------+-----------------+----------------+------------------------+---------+-----------+---------+---------+---------+",
"| 0 | 1 | 4 | 1 | 1 | 1 | 4 | 4 | 1 | 1 | 1 |",
"| 0 | 2 | 4 | 2 | 2 | 2 | 8 | 4 | 2 | 2 | 2 |",
"| 0 | 3 | 4 | 3 | 3 | 3 | 12 | 4 | 3 | 3 | 3 |",
"| 0 | 4 | 4 | 4 | 4 | 4 | 16 | 4 | 4 | 4 | 4 |",
"| 0 | 5 | 4 | 5 | 5 | 5 | 20 | 4 | 5 | 5 | 5 |",
"+----+----+--------------+-----------------+----------------+------------------------+---------+-----------+---------+---------+---------+",
"+----+----+---------+-----------+---------+---------+---------+",
"| c1 | c2 | SUM(c2) | COUNT(c2) | MAX(c2) | MIN(c2) | AVG(c2) |",
"+----+----+---------+-----------+---------+---------+---------+",
"| 0 | 1 | 4 | 4 | 1 | 1 | 1 |",
"| 0 | 2 | 8 | 4 | 2 | 2 | 2 |",
"| 0 | 3 | 12 | 4 | 3 | 3 | 3 |",
"| 0 | 4 | 16 | 4 | 4 | 4 | 4 |",
"| 0 | 5 | 20 | 4 | 5 | 5 | 5 |",
"+----+----+---------+-----------+---------+---------+---------+",
];

// window function shall respect ordering
Expand Down
10 changes: 7 additions & 3 deletions datafusion/src/physical_plan/expressions/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
use arrow::array::{new_empty_array, ArrayRef};
use arrow::array::{new_empty_array, new_null_array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -135,8 +135,12 @@ impl BuiltInWindowFunctionExpr for NthValue {
NthValueKind::Last => (num_rows as usize) - 1,
NthValueKind::Nth(n) => (n as usize) - 1,
};
let value = ScalarValue::try_from_array(value, index)?;
Ok(value.to_array_of_size(num_rows))
Ok(if index >= num_rows {
new_null_array(value.data_type(), num_rows)
} else {
let value = ScalarValue::try_from_array(value, index)?;
value.to_array_of_size(num_rows)
})
}
}

Expand Down
64 changes: 64 additions & 0 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,70 @@ async fn csv_query_window_with_empty_over() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn csv_query_window_with_partition_by() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_aggregate_csv(&mut ctx)?;
let sql = "select \
c9, \
sum(cast(c4 as Int)) over (partition by c3), \
avg(cast(c4 as Int)) over (partition by c3), \
count(cast(c4 as Int)) over (partition by c3), \
max(cast(c4 as Int)) over (partition by c3), \
min(cast(c4 as Int)) over (partition by c3), \
first_value(cast(c4 as Int)) over (partition by c3), \
last_value(cast(c4 as Int)) over (partition by c3), \
nth_value(cast(c4 as Int), 2) over (partition by c3) \
from aggregate_test_100 \
order by c9 \
limit 5";
let actual = execute(&mut ctx, sql).await;
let expected = vec![
vec![
"28774375", "-16110", "-16110", "1", "-16110", "-16110", "-16110", "-16110",
"NULL",
],
vec![
"63044568", "3917", "3917", "1", "3917", "3917", "3917", "3917", "NULL",
],
vec![
"141047417",
"-38455",
"-19227.5",
"2",
"-16974",
"-21481",
"-16974",
"-21481",
"-21481",
],
vec![
"141680161",
"-1114",
"-1114",
"1",
"-1114",
"-1114",
"-1114",
"-1114",
"NULL",
],
vec![
"145294611",
"15673",
"15673",
"1",
"15673",
"15673",
"15673",
"15673",
"NULL",
],
];
assert_eq!(expected, actual);
Ok(())
}

#[tokio::test]
async fn csv_query_window_with_order_by() -> Result<()> {
let mut ctx = ExecutionContext::new();
Expand Down

0 comments on commit c3c0ef5

Please sign in to comment.