From 2f5bc94a9206ab6b0bf9f443c4a756d1f01d566e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Jul 2021 07:52:20 -0600 Subject: [PATCH] MINOR: Remove unused Ballista query execution code path (#732) * Remove unused code path * Remove proto defs --- ballista/rust/core/proto/ballista.proto | 6 -- ballista/rust/core/src/client.rs | 51 ------------ .../core/src/serde/scheduler/from_proto.rs | 18 ---- ballista/rust/core/src/serde/scheduler/mod.rs | 2 - .../rust/core/src/serde/scheduler/to_proto.rs | 4 - ballista/rust/executor/src/flight_service.rs | 82 +------------------ 6 files changed, 2 insertions(+), 161 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 1c2328eed44d..0575460cfca3 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -699,12 +699,6 @@ message KeyValuePair { message Action { oneof ActionType { - // Execute a logical query plan - LogicalPlanNode query = 1; - - // Execute one partition of a physical query plan - ExecutePartition execute_partition = 2; - // Fetch a partition from an executor PartitionId fetch_partition = 3; } diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index c8267c8194c2..2df414578355 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -75,57 +75,6 @@ impl BallistaClient { Ok(Self { flight_client }) } - /// Execute one partition of a physical query plan against the executor - pub async fn execute_partition( - &mut self, - job_id: String, - stage_id: usize, - partition_id: Vec, - plan: Arc, - ) -> Result> { - let action = Action::ExecutePartition(ExecutePartition { - job_id, - stage_id, - partition_id, - plan, - shuffle_locations: Default::default(), - }); - let stream = self.execute_action(&action).await?; - let batches = collect(stream).await?; - - batches - .iter() - .map(|batch| { - if batch.num_rows() != 1 { - Err(BallistaError::General( - "execute_partition received wrong number of rows".to_owned(), - )) - } else { - let path = batch - .column(0) - .as_any() - .downcast_ref::() - .expect( - "execute_partition expected column 0 to be a StringArray", - ); - - let stats = batch - .column(1) - .as_any() - .downcast_ref::() - .expect( - "execute_partition expected column 1 to be a StructArray", - ); - - Ok(ExecutePartitionResult::new( - path.value(0), - PartitionStats::from_arrow_struct_array(stats), - )) - } - }) - .collect::>>() - } - /// Fetch a partition from an executor pub async fn fetch_partition( &mut self, diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs b/ballista/rust/core/src/serde/scheduler/from_proto.rs index 4631b2e4d863..73f8f53956de 100644 --- a/ballista/rust/core/src/serde/scheduler/from_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs @@ -32,24 +32,6 @@ impl TryInto for protobuf::Action { fn try_into(self) -> Result { match self.action_type { - Some(ActionType::ExecutePartition(partition)) => { - Ok(Action::ExecutePartition(ExecutePartition::new( - partition.job_id, - partition.stage_id as usize, - partition.partition_id.iter().map(|n| *n as usize).collect(), - partition - .plan - .as_ref() - .ok_or_else(|| { - BallistaError::General( - "PhysicalPlanNode in ExecutePartition is missing" - .to_owned(), - ) - })? - .try_into()?, - HashMap::new(), - ))) - } Some(ActionType::FetchPartition(partition)) => { Ok(Action::FetchPartition(partition.try_into()?)) } diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index cbe1a31227c6..fa2c1b890e84 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -35,8 +35,6 @@ pub mod to_proto; /// Action that can be sent to an executor #[derive(Debug, Clone)] pub enum Action { - /// Execute a query and store the results in memory - ExecutePartition(ExecutePartition), /// Collect a shuffle partition FetchPartition(PartitionId), } diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs b/ballista/rust/core/src/serde/scheduler/to_proto.rs index 40ca907a8a71..c3f2046305cf 100644 --- a/ballista/rust/core/src/serde/scheduler/to_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs @@ -29,10 +29,6 @@ impl TryInto for Action { fn try_into(self) -> Result { match self { - Action::ExecutePartition(partition) => Ok(protobuf::Action { - action_type: Some(ActionType::ExecutePartition(partition.try_into()?)), - settings: vec![], - }), Action::FetchPartition(partition_id) => Ok(protobuf::Action { action_type: Some(ActionType::FetchPartition(partition_id.into())), settings: vec![], diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs index 99424b6e8db4..7325287f074f 100644 --- a/ballista/rust/executor/src/flight_service.rs +++ b/ballista/rust/executor/src/flight_service.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use crate::executor::Executor; use ballista_core::error::BallistaError; use ballista_core::serde::decode_protobuf; -use ballista_core::serde::scheduler::{Action as BallistaAction, PartitionStats}; +use ballista_core::serde::scheduler::Action as BallistaAction; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, @@ -33,18 +33,13 @@ use arrow_flight::{ PutResult, SchemaResult, Ticket, }; use datafusion::arrow::{ - datatypes::{DataType, Field, Schema}, - error::ArrowError, - ipc::reader::FileReader, - ipc::writer::IpcWriteOptions, + error::ArrowError, ipc::reader::FileReader, ipc::writer::IpcWriteOptions, record_batch::RecordBatch, }; -use datafusion::physical_plan::displayable; use futures::{Stream, StreamExt}; use log::{info, warn}; use std::io::{Read, Seek}; use tokio::sync::mpsc::channel; -use tokio::task::JoinHandle; use tokio::{ sync::mpsc::{Receiver, Sender}, task, @@ -92,79 +87,6 @@ impl FlightService for BallistaFlightService { decode_protobuf(&ticket.ticket).map_err(|e| from_ballista_err(&e))?; match &action { - BallistaAction::ExecutePartition(partition) => { - info!( - "ExecutePartition: job={}, stage={}, partition={:?}\n{}", - partition.job_id, - partition.stage_id, - partition.partition_id, - displayable(partition.plan.as_ref()).indent().to_string() - ); - - let mut tasks: Vec>> = vec![]; - for &part in &partition.partition_id { - let partition = partition.clone(); - let executor = self.executor.clone(); - tasks.push(tokio::spawn(async move { - let results = executor - .execute_partition( - partition.job_id.clone(), - partition.stage_id, - part, - partition.plan.clone(), - ) - .await?; - let results = vec![results]; - - let mut flights: Vec> = vec![]; - let options = arrow::ipc::writer::IpcWriteOptions::default(); - - let mut batches: Vec> = results - .iter() - .flat_map(|batch| create_flight_iter(batch, &options)) - .collect(); - - // append batch vector to schema vector, so that the first message sent is the schema - flights.append(&mut batches); - - Ok(flights) - })); - } - - // wait for all partitions to complete - let results = futures::future::join_all(tasks).await; - - // get results - let mut flights: Vec> = vec![]; - - // add an initial FlightData message that sends schema - let options = arrow::ipc::writer::IpcWriteOptions::default(); - let stats = PartitionStats::default(); - let schema = Arc::new(Schema::new(vec![ - Field::new("path", DataType::Utf8, false), - stats.arrow_struct_repr(), - ])); - let schema_flight_data = - arrow_flight::utils::flight_data_from_arrow_schema( - schema.as_ref(), - &options, - ); - flights.push(Ok(schema_flight_data)); - - // collect statistics from each executed partition - for result in results { - let result = result.map_err(|e| { - Status::internal(format!("Ballista Error: {:?}", e)) - })?; - let batches = result.map_err(|e| { - Status::internal(format!("Ballista Error: {:?}", e)) - })?; - flights.extend_from_slice(&batches); - } - - let output = futures::stream::iter(flights); - Ok(Response::new(Box::pin(output) as Self::DoGetStream)) - } BallistaAction::FetchPartition(partition_id) => { // fetch a partition that was previously executed by this executor info!("FetchPartition {:?}", partition_id);