Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
768b3e9
impl map_from_entries
Dec 14, 2025
c68c342
Revert "impl map_from_entries"
Dec 16, 2025
d887555
Merge branch 'apache:main' into main
kazantsev-maksim Dec 16, 2025
231aa90
Merge branch 'apache:main' into main
kazantsev-maksim Dec 17, 2025
9500bbb
Merge branch 'apache:main' into main
kazantsev-maksim Dec 24, 2025
9577481
Merge branch 'apache:main' into main
kazantsev-maksim Dec 28, 2025
3791557
Merge branch 'apache:main' into main
kazantsev-maksim Jan 2, 2026
7c2f082
Merge branch 'apache:main' into main
kazantsev-maksim Jan 3, 2026
609a605
Merge branch 'apache:main' into main
kazantsev-maksim Jan 6, 2026
a151b2c
Merge branch 'apache:main' into main
kazantsev-maksim Jan 7, 2026
ad3e7f5
Merge branch 'apache:main' into main
kazantsev-maksim Jan 10, 2026
ea92e4b
Merge branch 'apache:main' into main
kazantsev-maksim Jan 14, 2026
8dfeca3
Merge branch 'apache:main' into main
kazantsev-maksim Jan 17, 2026
559741e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 20, 2026
ebda14e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 21, 2026
408152e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 23, 2026
d7857b2
Merge branch 'apache:main' into main
kazantsev-maksim Jan 24, 2026
aef41be
Merge branch 'apache:main' into main
kazantsev-maksim Jan 29, 2026
5ac1c58
Merge branch 'apache:main' into main
kazantsev-maksim Jan 30, 2026
9ae8e23
Merge branch 'apache:main' into main
kazantsev-maksim Feb 1, 2026
5ca3888
Merge branch 'apache:main' into main
kazantsev-maksim Feb 4, 2026
160a817
Merge branch 'apache:main' into main
kazantsev-maksim Feb 5, 2026
88fc313
Merge branch 'apache:main' into main
kazantsev-maksim Feb 7, 2026
2716f1d
Use datafuion impl of array_repeat function
Feb 13, 2026
e14c180
Merge branch 'apache:main' into main
kazantsev-maksim Feb 13, 2026
ad3f72f
Merge remote-tracking branch 'origin/main' into array_repeat_df
Feb 13, 2026
6b3f807
resolve conflicts
Feb 13, 2026
c5b95be
Merge branch 'main' into array_repeat_df
mbutrovich Feb 13, 2026
0fe6893
delete test
Feb 14, 2026
30d68ea
Merge remote-tracking branch 'origin/array_repeat_df' into array_repe…
Feb 14, 2026
62b741a
Merge branch 'main' into array_repeat_df
kazantsev-maksim Feb 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 0 additions & 128 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3905,134 +3905,6 @@ mod tests {
});
}

#[test]
fn test_array_repeat() {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let planner = PhysicalPlanner::new(Arc::from(session_ctx), 0);

// Mock scan operator with 3 INT32 columns
let op_scan = Operator {
plan_id: 0,
children: vec![],
op_struct: Some(OpStruct::Scan(spark_operator::Scan {
fields: vec![
spark_expression::DataType {
type_id: 3, // Int32
type_info: None,
},
spark_expression::DataType {
type_id: 3, // Int32
type_info: None,
},
spark_expression::DataType {
type_id: 3, // Int32
type_info: None,
},
],
source: "".to_string(),
arrow_ffi_safe: false,
})),
};

// Mock expression to read a INT32 column with position 0
let array_col = spark_expression::Expr {
expr_struct: Some(Bound(spark_expression::BoundReference {
index: 0,
datatype: Some(spark_expression::DataType {
type_id: 3,
type_info: None,
}),
})),
};

// Mock expression to read a INT32 column with position 1
let array_col_1 = spark_expression::Expr {
expr_struct: Some(Bound(spark_expression::BoundReference {
index: 1,
datatype: Some(spark_expression::DataType {
type_id: 3,
type_info: None,
}),
})),
};

// Make a projection operator with array_repeat(array_col, array_col_1)
let projection = Operator {
children: vec![op_scan],
plan_id: 0,
op_struct: Some(OpStruct::Projection(spark_operator::Projection {
project_list: vec![spark_expression::Expr {
expr_struct: Some(ExprStruct::ScalarFunc(spark_expression::ScalarFunc {
func: "array_repeat".to_string(),
args: vec![array_col, array_col_1],
return_type: None,
fail_on_error: false,
})),
}],
})),
};

// Create a physical plan
let (mut scans, datafusion_plan) =
planner.create_plan(&projection, &mut vec![], 1).unwrap();

// Start executing the plan in a separate thread
// The plan waits for incoming batches and emitting result as input comes
let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap();

let runtime = tokio::runtime::Runtime::new().unwrap();
// create async channel
let (tx, mut rx) = mpsc::channel(1);

// Send data as input to the plan being executed in a separate thread
runtime.spawn(async move {
// create data batch
// 0, 1, 2
// 3, 4, 5
// 6, null, null
let a = Int32Array::from(vec![Some(0), Some(3), Some(6)]);
let b = Int32Array::from(vec![Some(1), Some(4), None]);
let c = Int32Array::from(vec![Some(2), Some(5), None]);
let input_batch1 = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), Arc::new(c)], 3);
let input_batch2 = InputBatch::EOF;

let batches = vec![input_batch1, input_batch2];

for batch in batches.into_iter() {
tx.send(batch).await.unwrap();
}
});

// Wait for the plan to finish executing and assert the result
runtime.block_on(async move {
loop {
let batch = rx.recv().await.unwrap();
scans[0].set_input_batch(batch);
match poll!(stream.next()) {
Poll::Ready(Some(batch)) => {
assert!(batch.is_ok(), "got error {}", batch.unwrap_err());
let batch = batch.unwrap();
let expected = [
"+--------------+",
"| col_0 |",
"+--------------+",
"| [0] |",
"| [3, 3, 3, 3] |",
"| |",
"+--------------+",
];
assert_batches_eq!(expected, &[batch]);
}
Poll::Ready(None) => {
break;
}
_ => {}
}
}
});
}

/// Executes a `test_data_query` SQL query
/// and saves the result into a temp folder using parquet format
/// Read the file back to the memory using a custom schema
Expand Down
216 changes: 0 additions & 216 deletions native/spark-expr/src/array_funcs/array_repeat.rs

This file was deleted.

2 changes: 0 additions & 2 deletions native/spark-expr/src/array_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
// under the License.

mod array_insert;
mod array_repeat;
mod get_array_struct_fields;
mod list_extract;
mod size;

pub use array_insert::ArrayInsert;
pub use array_repeat::spark_array_repeat;
pub use get_array_struct_fields::GetArrayStructFields;
pub use list_extract::ListExtract;
pub use size::{spark_size, SparkSizeFunc};
10 changes: 3 additions & 7 deletions native/spark-expr/src/comet_scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use crate::math_funcs::abs::abs;
use crate::math_funcs::checked_arithmetic::{checked_add, checked_div, checked_mul, checked_sub};
use crate::math_funcs::modulo_expr::spark_modulo;
use crate::{
spark_array_repeat, spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor,
spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad,
spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkContains, SparkDateDiff,
spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan,
spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex,
spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkContains, SparkDateDiff,
SparkDateTrunc, SparkMakeDate, SparkSizeFunc, SparkStringSpace,
};
use arrow::datatypes::DataType;
Expand Down Expand Up @@ -169,10 +169,6 @@ pub fn create_comet_physical_fun_with_eval_mode(
let func = Arc::new(spark_isnan);
make_comet_scalar_udf!("isnan", func, without data_type)
}
"array_repeat" => {
let func = Arc::new(spark_array_repeat);
make_comet_scalar_udf!("array_repeat", func, without data_type)
}
"spark_modulo" => {
let func = Arc::new(spark_modulo);
make_comet_scalar_udf!("spark_modulo", func, without data_type, fail_on_error)
Expand Down
Loading
Loading