Skip to content

Commit

Permalink
Fix cargo clippy (#571)
Browse files Browse the repository at this point in the history
Co-authored-by: yangzhong <yangzhong@ebay.com>
  • Loading branch information
yahoNanJing and kyotoYaho authored Dec 19, 2022
1 parent a2661e4 commit 908aa5a
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 15 deletions.
2 changes: 1 addition & 1 deletion ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ async fn fetch_partition_remote(
// TODO for shuffle client connections, we should avoid creating new connections again and again.
// And we should also avoid to keep alive too many connections for long time.
let host = metadata.host.as_str();
let port = metadata.port as u16;
let port = metadata.port;
let mut ballista_client =
BallistaClient::try_new(host, port)
.await
Expand Down
2 changes: 1 addition & 1 deletion ballista/core/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl PluginEnum {
/// new a struct which impl the PluginRegistrar trait
pub fn init_plugin_manager(&self) -> Box<dyn PluginRegistrar> {
match self {
PluginEnum::UDF => Box::new(UDFPluginManager::default()),
PluginEnum::UDF => Box::<UDFPluginManager>::default(),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,20 +342,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
let partition_id = task.partition_id;
let shuffle_writer_plan =
self.executor
.new_shuffle_writer(job_id.clone(), stage_id as usize, plan)?;
.new_shuffle_writer(job_id.clone(), stage_id, plan)?;

let part = PartitionId {
job_id: job_id.clone(),
stage_id: stage_id as usize,
partition_id: partition_id as usize,
stage_id,
partition_id,
};

info!("Start to execute shuffle write for task {}", task_identity);

let execution_result = self
.executor
.execute_shuffle_write(
task_id as usize,
task_id,
part.clone(),
shuffle_writer_plan.clone(),
task_context,
Expand Down
10 changes: 4 additions & 6 deletions ballista/scheduler/src/state/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ impl ExecutionGraph {
let mut locations = vec![];
for task_status in stage_task_statuses.into_iter() {
{
let stage_id = stage_id as usize;
let task_stage_attempt_num =
task_status.stage_attempt_num as usize;
if task_stage_attempt_num < running_stage.stage_attempt_num {
Expand Down Expand Up @@ -481,7 +480,6 @@ impl ExecutionGraph {
);
} else if let ExecutionStage::UnResolved(unsolved_stage) = stage {
for task_status in stage_task_statuses.into_iter() {
let stage_id = stage_id as usize;
let task_stage_attempt_num =
task_status.stage_attempt_num as usize;
let partition_id = task_status.clone().partition_id as usize;
Expand Down Expand Up @@ -815,8 +813,8 @@ impl ExecutionGraph {
/// Total number of tasks in this plan that are ready for scheduling
pub fn available_tasks(&self) -> usize {
self.stages
.iter()
.map(|(_, stage)| {
.values()
.map(|stage| {
if let ExecutionStage::Running(stage) = stage {
stage.available_tasks()
} else {
Expand Down Expand Up @@ -1412,8 +1410,8 @@ impl Debug for ExecutionGraph {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let stages = self
.stages
.iter()
.map(|(_, stage)| format!("{:?}", stage))
.values()
.map(|stage| format!("{:?}", stage))
.collect::<Vec<String>>()
.join("");
write!(f, "ExecutionGraph[job_id={}, session_id={}, available_tasks={}, is_successful={}]\n{}",
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
result = execute_query(&ctx, &plan, opt.debug).await?;
}
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
millis.push(elapsed);
let row_count = result.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
Expand Down Expand Up @@ -405,7 +405,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
.unwrap();
}
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
millis.push(elapsed);
let row_count = batches.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
Expand Down Expand Up @@ -556,7 +556,7 @@ fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result<String> {
}
if query > 0 && query < 23 {
let filename = format!("{}/q{}.sql", sql_path, query);
Ok(fs::read_to_string(&filename).expect("failed to read query"))
Ok(fs::read_to_string(filename).expect("failed to read query"))
} else {
Err(DataFusionError::Plan(
"invalid query. Expected value between 1 and 22".to_owned(),
Expand Down

0 comments on commit 908aa5a

Please sign in to comment.