Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
forsaken628 committed Dec 18, 2024
1 parent f21df7a commit 5a56a73
Show file tree
Hide file tree
Showing 6 changed files with 0 additions and 193 deletions.
120 changes: 0 additions & 120 deletions src/query/functions/src/aggregates/aggregate_udf_script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,123 +263,3 @@ pub fn create_aggregate_udf_function(
return_type,
}))
}

#[cfg(test)]
mod tests {

use arrow_array::ArrayRef;
use arrow_array::Int32Array;
use arrow_cast::pretty::pretty_format_columns;
use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::Schema;
use arrow_udf_js::CallMode;

use super::*;

#[test]
fn test_weighted_avg() {
let mut runtime = arrow_udf_js::Runtime::new().unwrap();
runtime
.add_aggregate(
"weighted_avg",
DataType::Struct(
vec![
Field::new("state_0", DataType::Int32, true),
Field::new("state_1", DataType::Int32, true),
]
.into(),
),
DataType::Float32,
CallMode::CalledOnNullInput,
r#"
export function create_state() {
return {sum: 0, weight: 0};
}
export function accumulate(state, value, weight) {
state.sum += value * weight;
state.weight += weight;
return state;
}
export function retract(state, value, weight) {
state.sum -= value * weight;
state.weight -= weight;
return state;
}
export function merge(state1, state2) {
state1.sum += state2.sum;
state1.weight += state2.weight;
return state1;
}
export function finish(state) {
return state.sum / state.weight;
}
"#,
)
.unwrap();

let schema = Schema::new(vec![
Field::new("value", DataType::Int32, true),
Field::new("weight", DataType::Int32, true),
]);
let arg0 = Int32Array::from(vec![Some(1), None, Some(3), Some(5)]);
let arg1 = Int32Array::from(vec![Some(2), None, Some(4), Some(6)]);
let input =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(arg0), Arc::new(arg1)]).unwrap();

let state = runtime.create_state("weighted_avg").unwrap();
check_array(
std::slice::from_ref(&state),
r#"+---------------------+
| array |
+---------------------+
| {sum: 0, weight: 0} |
+---------------------+"#,
);

let state = runtime.accumulate("weighted_avg", &state, &input).unwrap();
check_array(
std::slice::from_ref(&state),
r#"+-----------------------+
| array |
+-----------------------+
| {sum: 44, weight: 12} |
+-----------------------+"#,
);

let states = arrow_select::concat::concat(&[&state, &state]).unwrap();
let state = runtime.merge("weighted_avg", &states).unwrap();
check_array(
std::slice::from_ref(&state),
r#"+-----------------------+
| array |
+-----------------------+
| {sum: 88, weight: 24} |
+-----------------------+"#,
);

let output = runtime.finish("weighted_avg", &state).unwrap();
check_array(
&[output],
r#"+-----------+
| array |
+-----------+
| 3.6666667 |
+-----------+"#,
);
}

/// Compare the actual output with the expected output.
#[track_caller]
fn check_array(actual: &[ArrayRef], expect: &str) {
assert_eq!(
expect,
&pretty_format_columns("array", actual).unwrap().to_string(),
);
}

#[test]
fn test_serialize() {
// todo
}
}
8 changes: 0 additions & 8 deletions src/query/functions/tests/it/aggregates/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,11 +1118,3 @@ fn test_agg_st_collect(file: &mut impl Write, simulator: impl AggregationSimulat
simulator,
);
}

#[test]
fn test_agg2() {
let mut mint = Mint::new("tests/it/aggregates/testdata");
let file = &mut mint.new_goldenfile("agg.temp.txt").unwrap();

run_agg_ast(file, "udf(a,b)", get_example().as_slice(), eval_aggr);
}
36 changes: 0 additions & 36 deletions src/query/sql/src/executor/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ use crate::executor::physical_plans::RangeJoinType;
use crate::executor::physical_plans::RowFetch;
use crate::executor::physical_plans::Sort;
use crate::executor::physical_plans::TableScan;
// use crate::executor::physical_plans::Udaf;
use crate::executor::physical_plans::Udf;
use crate::executor::physical_plans::UnionAll;
use crate::executor::physical_plans::Window;
Expand Down Expand Up @@ -355,7 +354,6 @@ fn to_format_tree(
PhysicalPlan::CommitSink(plan) => commit_sink_to_format_tree(plan, metadata, profs),
PhysicalPlan::ProjectSet(plan) => project_set_to_format_tree(plan, metadata, profs),
PhysicalPlan::Udf(plan) => udf_to_format_tree(plan, metadata, profs),
// PhysicalPlan::Udaf(plan) => udaf_to_format_tree(plan, metadata, profs),
PhysicalPlan::RangeJoin(plan) => range_join_to_format_tree(plan, metadata, profs),
PhysicalPlan::CopyIntoTable(plan) => copy_into_table(plan),
PhysicalPlan::CopyIntoLocation(plan) => copy_into_location(plan),
Expand Down Expand Up @@ -1747,40 +1745,6 @@ fn udf_to_format_tree(
Ok(FormatTreeNode::with_children("Udf".to_string(), children))
}

// fn udaf_to_format_tree(
// plan: &Udaf,
// metadata: &Metadata,
// profs: &HashMap<u32, PlanProfile>,
// ) -> Result<FormatTreeNode<String>> {
// let mut children = vec![FormatTreeNode::new(format!(
// "output columns: [{}]",
// format_output_columns(plan.output_schema()?, metadata, true)
// ))];

// if let Some(info) = &plan.stat_info {
// let items = plan_stats_info_to_format_tree(info);
// children.extend(items);
// }

// append_profile_info(&mut children, profs, plan.plan_id);

// children.extend(vec![FormatTreeNode::new(format!(
// "udaf functions: {}",
// plan.udf_funcs
// .iter()
// .map(|func| {
// let arg_exprs = func.arg_exprs.join(", ");
// format!("{}({})", func.func_name, arg_exprs)
// })
// .collect::<Vec<_>>()
// .join(", ")
// ))]);

// children.extend(vec![to_format_tree(&plan.input, metadata, profs)?]);

// Ok(FormatTreeNode::with_children("Udaf".to_string(), children))
// }

fn format_output_columns(
output_schema: DataSchemaRef,
metadata: &Metadata,
Expand Down
12 changes: 0 additions & 12 deletions src/query/sql/src/executor/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ use crate::executor::physical_plans::RowFetch;
use crate::executor::physical_plans::Shuffle;
use crate::executor::physical_plans::Sort;
use crate::executor::physical_plans::TableScan;
// use crate::executor::physical_plans::Udaf;
use crate::executor::physical_plans::Udf;
use crate::executor::physical_plans::UnionAll;
use crate::executor::physical_plans::Window;
Expand Down Expand Up @@ -97,7 +96,6 @@ pub enum PhysicalPlan {
ExpressionScan(ExpressionScan),
CacheScan(CacheScan),
Udf(Udf),
// Udaf(Udaf),
RecursiveCteScan(RecursiveCteScan),

/// For insert into ... select ... in cluster
Expand Down Expand Up @@ -263,11 +261,6 @@ impl PhysicalPlan {
*next_id += 1;
plan.input.adjust_plan_id(next_id);
}
// PhysicalPlan::Udaf(plan) => {
// plan.plan_id = *next_id;
// *next_id += 1;
// plan.input.adjust_plan_id(next_id);
// }
PhysicalPlan::DistributedInsertSelect(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
Expand Down Expand Up @@ -430,7 +423,6 @@ impl PhysicalPlan {
PhysicalPlan::ExpressionScan(v) => v.plan_id,
PhysicalPlan::CacheScan(v) => v.plan_id,
PhysicalPlan::Udf(v) => v.plan_id,
// PhysicalPlan::Udaf(v) => v.plan_id,
PhysicalPlan::MutationSource(v) => v.plan_id,
PhysicalPlan::ColumnMutation(v) => v.plan_id,
PhysicalPlan::Mutation(v) => v.plan_id,
Expand Down Expand Up @@ -487,7 +479,6 @@ impl PhysicalPlan {
PhysicalPlan::CacheScan(plan) => plan.output_schema(),
PhysicalPlan::RecursiveCteScan(plan) => plan.output_schema(),
PhysicalPlan::Udf(plan) => plan.output_schema(),
// PhysicalPlan::Udaf(plan) => plan.output_schema(),
PhysicalPlan::MutationSource(plan) => plan.output_schema(),
PhysicalPlan::MutationSplit(plan) => plan.output_schema(),
PhysicalPlan::MutationManipulate(plan) => plan.output_schema(),
Expand Down Expand Up @@ -562,7 +553,6 @@ impl PhysicalPlan {
PhysicalPlan::CacheScan(_) => "CacheScan".to_string(),
PhysicalPlan::Recluster(_) => "Recluster".to_string(),
PhysicalPlan::Udf(_) => "Udf".to_string(),
// PhysicalPlan::Udaf(_) => "Udaf".to_string(),
PhysicalPlan::Duplicate(_) => "Duplicate".to_string(),
PhysicalPlan::Shuffle(_) => "Shuffle".to_string(),
PhysicalPlan::ChunkFilter(_) => "Filter".to_string(),
Expand Down Expand Up @@ -626,7 +616,6 @@ impl PhysicalPlan {
PhysicalPlan::MutationOrganize(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::AddStreamColumn(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::Udf(plan) => Box::new(std::iter::once(plan.input.as_ref())),
// PhysicalPlan::Udaf(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::AsyncFunction(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::CopyIntoLocation(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::Duplicate(plan) => Box::new(std::iter::once(plan.input.as_ref())),
Expand Down Expand Up @@ -663,7 +652,6 @@ impl PhysicalPlan {
PhysicalPlan::ProjectSet(plan) => plan.input.try_find_single_data_source(),
PhysicalPlan::RowFetch(plan) => plan.input.try_find_single_data_source(),
PhysicalPlan::Udf(plan) => plan.input.try_find_single_data_source(),
// PhysicalPlan::Udaf(plan) => plan.input.try_find_single_data_source(),
PhysicalPlan::AsyncFunction(plan) => plan.input.try_find_single_data_source(),
PhysicalPlan::CopyIntoLocation(plan) => plan.input.try_find_single_data_source(),
PhysicalPlan::UnionAll(_)
Expand Down
16 changes: 0 additions & 16 deletions src/query/sql/src/executor/physical_plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ use crate::executor::physical_plans::RowFetch;
use crate::executor::physical_plans::Shuffle;
use crate::executor::physical_plans::Sort;
use crate::executor::physical_plans::TableScan;
// use crate::executor::physical_plans::Udaf;
use crate::executor::physical_plans::Udf;
use crate::executor::physical_plans::UnionAll;
use crate::executor::physical_plans::Window;
Expand Down Expand Up @@ -109,7 +108,6 @@ pub trait PhysicalPlanReplacer {
PhysicalPlan::CacheScan(plan) => self.replace_cache_scan(plan),
PhysicalPlan::Recluster(plan) => self.replace_recluster(plan),
PhysicalPlan::Udf(plan) => self.replace_udf(plan),
// PhysicalPlan::Udaf(plan) => self.replace_udaf(plan),
PhysicalPlan::AsyncFunction(plan) => self.replace_async_function(plan),
PhysicalPlan::Duplicate(plan) => self.replace_duplicate(plan),
PhysicalPlan::Shuffle(plan) => self.replace_shuffle(plan),
Expand Down Expand Up @@ -532,17 +530,6 @@ pub trait PhysicalPlanReplacer {
}))
}

// fn replace_udaf(&mut self, plan: &Udaf) -> Result<PhysicalPlan> {
// let input = self.replace(&plan.input)?;
// Ok(PhysicalPlan::Udaf(Udaf {
// plan_id: plan.plan_id,
// input: Box::new(input),
// udf_funcs: plan.udf_funcs.clone(),
// stat_info: plan.stat_info.clone(),
// script_udf: plan.script_udf,
// }))
// }

fn replace_async_function(&mut self, plan: &AsyncFunction) -> Result<PhysicalPlan> {
let input = self.replace(&plan.input)?;
Ok(PhysicalPlan::AsyncFunction(AsyncFunction {
Expand Down Expand Up @@ -750,9 +737,6 @@ impl PhysicalPlan {
PhysicalPlan::Udf(plan) => {
Self::traverse(&plan.input, pre_visit, visit, post_visit);
}
// PhysicalPlan::Udaf(plan) => {
// Self::traverse(&plan.input, pre_visit, visit, post_visit);
// }
PhysicalPlan::AsyncFunction(plan) => {
Self::traverse(&plan.input, pre_visit, visit, post_visit);
}
Expand Down
1 change: 0 additions & 1 deletion src/query/sql/src/planner/plans/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ pub enum RelOperator {
ExpressionScan(ExpressionScan),
CacheScan(CacheScan),
Udf(Udf),
// Udaf(Udaf),
RecursiveCteScan(RecursiveCteScan),
AsyncFunction(AsyncFunction),
Mutation(Mutation),
Expand Down

0 comments on commit 5a56a73

Please sign in to comment.