Skip to content

Commit

Permalink
Avoid Arc::clone when serializing physical expressions (apache#12235)
Browse files Browse the repository at this point in the history
`Arc::clone` is indispensable when passing shared references between
threads. For synchronous code`&` and `&Arc` can be (and often are) used,
with the latter being future-compatible, should the code start to need
`Arc::clone` (e.g. due to parallelization).
  • Loading branch information
findepi authored Aug 29, 2024
1 parent 827d7e1 commit 0f16849
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 97 deletions.
56 changes: 25 additions & 31 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
let expr = exec
.expr()
.iter()
.map(|expr| serialize_physical_expr(Arc::clone(&expr.0), extension_codec))
.map(|expr| serialize_physical_expr(&expr.0, extension_codec))
.collect::<Result<Vec<_>>>()?;
let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
return Ok(protobuf::PhysicalPlanNode {
Expand Down Expand Up @@ -1163,7 +1163,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
protobuf::FilterExecNode {
input: Some(Box::new(input)),
expr: Some(serialize_physical_expr(
Arc::clone(exec.predicate()),
exec.predicate(),
extension_codec,
)?),
default_filter_selectivity: exec.default_selectivity() as u32,
Expand Down Expand Up @@ -1220,8 +1220,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.on()
.iter()
.map(|tuple| {
let l = serialize_physical_expr(tuple.0.to_owned(), extension_codec)?;
let r = serialize_physical_expr(tuple.1.to_owned(), extension_codec)?;
let l = serialize_physical_expr(&tuple.0, extension_codec)?;
let r = serialize_physical_expr(&tuple.1, extension_codec)?;
Ok::<_, DataFusionError>(protobuf::JoinOn {
left: Some(l),
right: Some(r),
Expand All @@ -1233,10 +1233,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.filter()
.as_ref()
.map(|f| {
let expression = serialize_physical_expr(
f.expression().to_owned(),
extension_codec,
)?;
let expression =
serialize_physical_expr(f.expression(), extension_codec)?;
let column_indices = f
.column_indices()
.iter()
Expand Down Expand Up @@ -1294,8 +1292,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.on()
.iter()
.map(|tuple| {
let l = serialize_physical_expr(tuple.0.to_owned(), extension_codec)?;
let r = serialize_physical_expr(tuple.1.to_owned(), extension_codec)?;
let l = serialize_physical_expr(&tuple.0, extension_codec)?;
let r = serialize_physical_expr(&tuple.1, extension_codec)?;
Ok::<_, DataFusionError>(protobuf::JoinOn {
left: Some(l),
right: Some(r),
Expand All @@ -1307,10 +1305,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.filter()
.as_ref()
.map(|f| {
let expression = serialize_physical_expr(
f.expression().to_owned(),
extension_codec,
)?;
let expression =
serialize_physical_expr(f.expression(), extension_codec)?;
let column_indices = f
.column_indices()
.iter()
Expand Down Expand Up @@ -1348,7 +1344,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.map(|expr| {
Ok(protobuf::PhysicalSortExprNode {
expr: Some(Box::new(serialize_physical_expr(
expr.expr.to_owned(),
&expr.expr,
extension_codec,
)?)),
asc: !expr.options.descending,
Expand All @@ -1368,7 +1364,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.map(|expr| {
Ok(protobuf::PhysicalSortExprNode {
expr: Some(Box::new(serialize_physical_expr(
expr.expr.to_owned(),
&expr.expr,
extension_codec,
)?)),
asc: !expr.options.descending,
Expand Down Expand Up @@ -1475,14 +1471,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.group_expr()
.null_expr()
.iter()
.map(|expr| serialize_physical_expr(expr.0.to_owned(), extension_codec))
.map(|expr| serialize_physical_expr(&expr.0, extension_codec))
.collect::<Result<Vec<_>>>()?;

let group_expr = exec
.group_expr()
.expr()
.iter()
.map(|expr| serialize_physical_expr(expr.0.to_owned(), extension_codec))
.map(|expr| serialize_physical_expr(&expr.0, extension_codec))
.collect::<Result<Vec<_>>>()?;

let limit = exec.limit().map(|value| protobuf::AggLimit {
Expand Down Expand Up @@ -1581,7 +1577,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
let predicate = exec
.predicate()
.map(|pred| serialize_physical_expr(Arc::clone(pred), extension_codec))
.map(|pred| serialize_physical_expr(pred, extension_codec))
.transpose()?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
Expand Down Expand Up @@ -1653,7 +1649,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.map(|expr| {
let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
expr: Some(Box::new(serialize_physical_expr(
expr.expr.to_owned(),
&expr.expr,
extension_codec,
)?)),
asc: !expr.options.descending,
Expand Down Expand Up @@ -1722,7 +1718,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.map(|expr| {
let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
expr: Some(Box::new(serialize_physical_expr(
expr.expr.to_owned(),
&expr.expr,
extension_codec,
)?)),
asc: !expr.options.descending,
Expand Down Expand Up @@ -1761,10 +1757,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.filter()
.as_ref()
.map(|f| {
let expression = serialize_physical_expr(
f.expression().to_owned(),
extension_codec,
)?;
let expression =
serialize_physical_expr(f.expression(), extension_codec)?;
let column_indices = f
.column_indices()
.iter()
Expand Down Expand Up @@ -1806,13 +1800,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
let window_expr = exec
.window_expr()
.iter()
.map(|e| serialize_physical_window_expr(Arc::clone(e), extension_codec))
.map(|e| serialize_physical_window_expr(e, extension_codec))
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;

let partition_keys = exec
.partition_keys
.iter()
.map(|e| serialize_physical_expr(Arc::clone(e), extension_codec))
.map(|e| serialize_physical_expr(e, extension_codec))
.collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;

return Ok(protobuf::PhysicalPlanNode {
Expand All @@ -1836,13 +1830,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
let window_expr = exec
.window_expr()
.iter()
.map(|e| serialize_physical_window_expr(Arc::clone(e), extension_codec))
.map(|e| serialize_physical_window_expr(e, extension_codec))
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;

let partition_keys = exec
.partition_keys
.iter()
.map(|e| serialize_physical_expr(Arc::clone(e), extension_codec))
.map(|e| serialize_physical_expr(e, extension_codec))
.collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;

let input_order_mode = match &exec.input_order_mode {
Expand Down Expand Up @@ -1886,7 +1880,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
let expr: PhysicalSortExpr = requirement.to_owned().into();
let sort_expr = protobuf::PhysicalSortExprNode {
expr: Some(Box::new(serialize_physical_expr(
expr.expr.to_owned(),
&expr.expr,
extension_codec,
)?)),
asc: !expr.options.descending,
Expand Down Expand Up @@ -2025,7 +2019,7 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync {

fn try_encode_expr(
&self,
_node: Arc<dyn PhysicalExpr>,
_node: &Arc<dyn PhysicalExpr>,
_buf: &mut Vec<u8>,
) -> Result<()> {
not_impl_err!("PhysicalExtensionCodec is not provided")
Expand Down
Loading

0 comments on commit 0f16849

Please sign in to comment.