Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some resiliency to lost executors #568

Merged
merged 2 commits into from
Jun 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -745,10 +745,10 @@ message ExecutorRegistration {
uint32 port = 3;
}

message GetExecutorMetadataParams {}

message GetExecutorMetadataResult {
repeated ExecutorMetadata metadata = 1;
message ExecutorHeartbeat {
ExecutorMetadata meta = 1;
// Unix epoch-based timestamp in seconds
uint64 timestamp = 2;
}

message RunningTask {
Expand Down Expand Up @@ -847,8 +847,6 @@ message FilePartitionMetadata {
}

service SchedulerGrpc {
rpc GetExecutorsMetadata (GetExecutorMetadataParams) returns (GetExecutorMetadataResult) {}

// Executors must poll the scheduler for heartbeat and to receive tasks
rpc PollWork (PollWorkParams) returns (PollWorkResult) {}

Expand Down
11 changes: 8 additions & 3 deletions ballista/rust/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ async fn run_received_tasks(
task_status_sender: Sender<TaskStatus>,
task: TaskDefinition,
) {
info!("Received task {:?}", task.task_id.as_ref().unwrap());
let task_id = task.task_id.unwrap();
let task_id_log = format!(
"{}/{}/{}",
task_id.job_id, task_id.stage_id, task_id.partition_id
);
info!("Received task {}", task_id_log);
available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
let plan: Arc<dyn ExecutionPlan> = (&task.plan.unwrap()).try_into().unwrap();
let task_id = task.task_id.unwrap();

tokio::spawn(async move {
let execution_result = executor
Expand All @@ -105,7 +109,8 @@ async fn run_received_tasks(
plan,
)
.await;
info!("DONE WITH TASK: {:?}", execution_result);
info!("Done with task {}", task_id_log);
debug!("Statistics: {:?}", execution_result);
available_tasks_slots.fetch_add(1, Ordering::SeqCst);
let _ = task_status_sender.send(as_task_status(
execution_result.map(|_| ()),
Expand Down
39 changes: 13 additions & 26 deletions ballista/rust/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,32 @@
// limitations under the License.

use crate::SchedulerServer;
use ballista_core::serde::protobuf::{
scheduler_grpc_server::SchedulerGrpc, ExecutorMetadata, GetExecutorMetadataParams,
GetExecutorMetadataResult,
};
use ballista_core::serde::scheduler::ExecutorMeta;
use tonic::{Request, Response};
use ballista_core::{serde::scheduler::ExecutorMeta, BALLISTA_VERSION};
use warp::Rejection;

#[derive(Debug, serde::Serialize)]
struct StateResponse {
executors: Vec<ExecutorMeta>,
started: u128,
version: String,
version: &'static str,
}

pub(crate) async fn scheduler_state(
data_server: SchedulerServer,
) -> Result<impl warp::Reply, Rejection> {
let data: Result<Response<GetExecutorMetadataResult>, tonic::Status> = data_server
.get_executors_metadata(Request::new(GetExecutorMetadataParams {}))
.await;
let metadata: Vec<ExecutorMeta> = match data {
Ok(result) => {
let res: &GetExecutorMetadataResult = result.get_ref();
let vec: &Vec<ExecutorMetadata> = &res.metadata;
vec.iter()
.map(|v: &ExecutorMetadata| ExecutorMeta {
host: v.host.clone(),
port: v.port as u16,
id: v.id.clone(),
})
.collect()
}
Err(_) => vec![],
};
// TODO: Display last seen information in UI
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @msathis: I am not super familiar with the UI side of the project, but it would be nice to display which executors died. I'll open an issue for this once this PR is merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @edrevo Great work 👍 I can raise the follow up PR once this is merged! 👌

let executors: Vec<ExecutorMeta> = data_server
.state
.get_executors_metadata()
.await
.unwrap_or_default()
.into_iter()
.map(|(metadata, _duration)| metadata)
.collect();
let response = StateResponse {
executors: metadata,
executors,
started: data_server.start_time,
version: data_server.version.clone(),
version: BALLISTA_VERSION,
};
Ok(warp::reply::json(&response))
}
42 changes: 5 additions & 37 deletions ballista/rust/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use std::{fmt, net::IpAddr};
use ballista_core::serde::protobuf::{
execute_query_params::Query, executor_registration::OptionalHost, job_status,
scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, ExecuteQueryResult,
FailedJob, FilePartitionMetadata, FileType, GetExecutorMetadataParams,
GetExecutorMetadataResult, GetFileMetadataParams, GetFileMetadataResult,
GetJobStatusParams, GetJobStatusResult, JobStatus, PartitionId, PollWorkParams,
PollWorkResult, QueuedJob, RunningJob, TaskDefinition, TaskStatus,
FailedJob, FilePartitionMetadata, FileType, GetFileMetadataParams,
GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition,
TaskStatus,
};
use ballista_core::serde::scheduler::ExecutorMeta;

Expand Down Expand Up @@ -76,9 +76,8 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH};
#[derive(Clone)]
pub struct SchedulerServer {
caller_ip: IpAddr,
state: Arc<SchedulerState>,
pub(crate) state: Arc<SchedulerState>,
start_time: u128,
version: String,
}

impl SchedulerServer {
Expand All @@ -87,7 +86,6 @@ impl SchedulerServer {
namespace: String,
caller_ip: IpAddr,
) -> Self {
const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
let state = Arc::new(SchedulerState::new(config, namespace));
let state_clone = state.clone();

Expand All @@ -101,35 +99,12 @@ impl SchedulerServer {
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
version: VERSION.unwrap_or("Unknown").to_string(),
}
}
}

#[tonic::async_trait]
impl SchedulerGrpc for SchedulerServer {
async fn get_executors_metadata(
&self,
_request: Request<GetExecutorMetadataParams>,
) -> std::result::Result<Response<GetExecutorMetadataResult>, tonic::Status> {
info!("Received get_executors_metadata request");
let result = self
.state
.get_executors_metadata()
.await
.map_err(|e| {
let msg = format!("Error reading executors metadata: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?
.into_iter()
.map(|meta| meta.into())
.collect();
Ok(Response::new(GetExecutorMetadataResult {
metadata: result,
}))
}

async fn poll_work(
&self,
request: Request<PollWorkParams>,
Expand Down Expand Up @@ -279,13 +254,6 @@ impl SchedulerGrpc for SchedulerServer {
}
};
debug!("Received plan for execution: {:?}", plan);
let executors = self.state.get_executors_metadata().await.map_err(|e| {
let msg = format!("Error reading executors metadata: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;
debug!("Found executors: {:?}", executors);

let job_id: String = {
let mut rng = thread_rng();
std::iter::repeat(())
Expand Down
27 changes: 5 additions & 22 deletions ballista/rust/scheduler/src/state/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

//! Etcd config backend.

use std::{task::Poll, time::Duration};
use std::task::Poll;

use crate::state::ConfigBackendClient;
use ballista_core::error::{ballista_error, Result};

use etcd_client::{
GetOptions, LockResponse, PutOptions, WatchOptions, WatchStream, Watcher,
};
use etcd_client::{GetOptions, LockResponse, WatchOptions, WatchStream, Watcher};
use futures::{Stream, StreamExt};
use log::warn;

Expand Down Expand Up @@ -70,25 +68,9 @@ impl ConfigBackendClient for EtcdClient {
.collect())
}

async fn put(
&self,
key: String,
value: Vec<u8>,
lease_time: Option<Duration>,
) -> Result<()> {
async fn put(&self, key: String, value: Vec<u8>) -> Result<()> {
let mut etcd = self.etcd.clone();
let put_options = if let Some(lease_time) = lease_time {
etcd.lease_grant(lease_time.as_secs() as i64, None)
.await
.map(|lease| Some(PutOptions::new().with_lease(lease.id())))
.map_err(|e| {
warn!("etcd lease grant failed: {:?}", e.to_string());
ballista_error("etcd lease grant failed")
})?
} else {
None
};
etcd.put(key.clone(), value.clone(), put_options)
etcd.put(key.clone(), value.clone(), None)
.await
.map_err(|e| {
warn!("etcd put failed: {}", e);
Expand All @@ -99,6 +81,7 @@ impl ConfigBackendClient for EtcdClient {

async fn lock(&self) -> Result<Box<dyn Lock>> {
let mut etcd = self.etcd.clone();
// TODO: make this a namespaced-lock
let lock = etcd
.lock("/ballista_global_lock", None)
.await
Expand Down
Loading