Skip to content

Commit

Permalink
MINOR: Remove unused Ballista query execution code path (#732)
Browse files Browse the repository at this point in the history
* Remove unused code path

* Remove proto defs
  • Loading branch information
andygrove authored Jul 16, 2021
1 parent e65d49c commit 2f5bc94
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 161 deletions.
6 changes: 0 additions & 6 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -699,12 +699,6 @@ message KeyValuePair {
message Action {

oneof ActionType {
// Execute a logical query plan
LogicalPlanNode query = 1;

// Execute one partition of a physical query plan
ExecutePartition execute_partition = 2;

// Fetch a partition from an executor
PartitionId fetch_partition = 3;
}
Expand Down
51 changes: 0 additions & 51 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,57 +75,6 @@ impl BallistaClient {
Ok(Self { flight_client })
}

/// Execute one partition of a physical query plan against the executor
pub async fn execute_partition(
&mut self,
job_id: String,
stage_id: usize,
partition_id: Vec<usize>,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<ExecutePartitionResult>> {
let action = Action::ExecutePartition(ExecutePartition {
job_id,
stage_id,
partition_id,
plan,
shuffle_locations: Default::default(),
});
let stream = self.execute_action(&action).await?;
let batches = collect(stream).await?;

batches
.iter()
.map(|batch| {
if batch.num_rows() != 1 {
Err(BallistaError::General(
"execute_partition received wrong number of rows".to_owned(),
))
} else {
let path = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect(
"execute_partition expected column 0 to be a StringArray",
);

let stats = batch
.column(1)
.as_any()
.downcast_ref::<StructArray>()
.expect(
"execute_partition expected column 1 to be a StructArray",
);

Ok(ExecutePartitionResult::new(
path.value(0),
PartitionStats::from_arrow_struct_array(stats),
))
}
})
.collect::<Result<Vec<_>>>()
}

/// Fetch a partition from an executor
pub async fn fetch_partition(
&mut self,
Expand Down
18 changes: 0 additions & 18 deletions ballista/rust/core/src/serde/scheduler/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,6 @@ impl TryInto<Action> for protobuf::Action {

fn try_into(self) -> Result<Action, Self::Error> {
match self.action_type {
Some(ActionType::ExecutePartition(partition)) => {
Ok(Action::ExecutePartition(ExecutePartition::new(
partition.job_id,
partition.stage_id as usize,
partition.partition_id.iter().map(|n| *n as usize).collect(),
partition
.plan
.as_ref()
.ok_or_else(|| {
BallistaError::General(
"PhysicalPlanNode in ExecutePartition is missing"
.to_owned(),
)
})?
.try_into()?,
HashMap::new(),
)))
}
Some(ActionType::FetchPartition(partition)) => {
Ok(Action::FetchPartition(partition.try_into()?))
}
Expand Down
2 changes: 0 additions & 2 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ pub mod to_proto;
/// Action that can be sent to an executor
#[derive(Debug, Clone)]
pub enum Action {
/// Execute a query and store the results in memory
ExecutePartition(ExecutePartition),
/// Collect a shuffle partition
FetchPartition(PartitionId),
}
Expand Down
4 changes: 0 additions & 4 deletions ballista/rust/core/src/serde/scheduler/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ impl TryInto<protobuf::Action> for Action {

fn try_into(self) -> Result<protobuf::Action, Self::Error> {
match self {
Action::ExecutePartition(partition) => Ok(protobuf::Action {
action_type: Some(ActionType::ExecutePartition(partition.try_into()?)),
settings: vec![],
}),
Action::FetchPartition(partition_id) => Ok(protobuf::Action {
action_type: Some(ActionType::FetchPartition(partition_id.into())),
settings: vec![],
Expand Down
82 changes: 2 additions & 80 deletions ballista/rust/executor/src/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,21 @@ use std::sync::Arc;
use crate::executor::Executor;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
use ballista_core::serde::scheduler::{Action as BallistaAction, PartitionStats};
use ballista_core::serde::scheduler::Action as BallistaAction;

use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
PutResult, SchemaResult, Ticket,
};
use datafusion::arrow::{
datatypes::{DataType, Field, Schema},
error::ArrowError,
ipc::reader::FileReader,
ipc::writer::IpcWriteOptions,
error::ArrowError, ipc::reader::FileReader, ipc::writer::IpcWriteOptions,
record_batch::RecordBatch,
};
use datafusion::physical_plan::displayable;
use futures::{Stream, StreamExt};
use log::{info, warn};
use std::io::{Read, Seek};
use tokio::sync::mpsc::channel;
use tokio::task::JoinHandle;
use tokio::{
sync::mpsc::{Receiver, Sender},
task,
Expand Down Expand Up @@ -92,79 +87,6 @@ impl FlightService for BallistaFlightService {
decode_protobuf(&ticket.ticket).map_err(|e| from_ballista_err(&e))?;

match &action {
BallistaAction::ExecutePartition(partition) => {
info!(
"ExecutePartition: job={}, stage={}, partition={:?}\n{}",
partition.job_id,
partition.stage_id,
partition.partition_id,
displayable(partition.plan.as_ref()).indent().to_string()
);

let mut tasks: Vec<JoinHandle<Result<_, BallistaError>>> = vec![];
for &part in &partition.partition_id {
let partition = partition.clone();
let executor = self.executor.clone();
tasks.push(tokio::spawn(async move {
let results = executor
.execute_partition(
partition.job_id.clone(),
partition.stage_id,
part,
partition.plan.clone(),
)
.await?;
let results = vec![results];

let mut flights: Vec<Result<FlightData, Status>> = vec![];
let options = arrow::ipc::writer::IpcWriteOptions::default();

let mut batches: Vec<Result<FlightData, Status>> = results
.iter()
.flat_map(|batch| create_flight_iter(batch, &options))
.collect();

// append batch vector to schema vector, so that the first message sent is the schema
flights.append(&mut batches);

Ok(flights)
}));
}

// wait for all partitions to complete
let results = futures::future::join_all(tasks).await;

// get results
let mut flights: Vec<Result<FlightData, Status>> = vec![];

// add an initial FlightData message that sends schema
let options = arrow::ipc::writer::IpcWriteOptions::default();
let stats = PartitionStats::default();
let schema = Arc::new(Schema::new(vec![
Field::new("path", DataType::Utf8, false),
stats.arrow_struct_repr(),
]));
let schema_flight_data =
arrow_flight::utils::flight_data_from_arrow_schema(
schema.as_ref(),
&options,
);
flights.push(Ok(schema_flight_data));

// collect statistics from each executed partition
for result in results {
let result = result.map_err(|e| {
Status::internal(format!("Ballista Error: {:?}", e))
})?;
let batches = result.map_err(|e| {
Status::internal(format!("Ballista Error: {:?}", e))
})?;
flights.extend_from_slice(&batches);
}

let output = futures::stream::iter(flights);
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
BallistaAction::FetchPartition(partition_id) => {
// fetch a partition that was previously executed by this executor
info!("FetchPartition {:?}", partition_id);
Expand Down

0 comments on commit 2f5bc94

Please sign in to comment.