Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: (controller) restart after failing #98

Draft
wants to merge 2 commits into
base: project-demo-202208
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 59 additions & 40 deletions controller/lib/src/external_api/instance/controller.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::sync::Arc;

use crate::external_api::generic::model::{APIResponse, APIResponseMetadata};
use crate::external_api::generic::model::{APIResponse, APIResponseMetadata, Pagination};
use crate::external_api::interface::ActixAppState;
use crate::external_api::workload::controller::WorkloadControllerError;
use crate::external_api::workload::service::WorkloadServiceError;

use super::super::workload::service::WorkloadService;
use super::model::{Instance, InstanceDTO, InstanceVector, Pagination};
use super::model::{Instance, InstanceDTO};
use super::service::{InstanceService, InstanceServiceError};
use actix_web::http::StatusCode;
use actix_web::{web, HttpResponse, Responder, Scope};
Expand Down Expand Up @@ -89,13 +89,18 @@ impl InstanceController {
body: web::Json<InstanceDTO>,
data: web::Data<ActixAppState>,
) -> impl Responder {
let instance_service =
match InstanceService::new(&data.grpc_address, &data.etcd_address).await {
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};
let instance_service = match InstanceService::new(
&data.grpc_address,
&data.etcd_address,
data.grpc_client_connection_max_retries,
)
.await
{
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};

let mut workload_service = match WorkloadService::new(&data.etcd_address).await {
Ok(service) => service,
Expand Down Expand Up @@ -142,13 +147,18 @@ impl InstanceController {
params: web::Path<(String, String)>,
data: web::Data<ActixAppState>,
) -> impl Responder {
let mut instance_service =
match InstanceService::new(&data.grpc_address, &data.etcd_address).await {
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};
let mut instance_service = match InstanceService::new(
&data.grpc_address,
&data.etcd_address,
data.grpc_client_connection_max_retries,
)
.await
{
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};

let (namespace, name) = params.into_inner();
match instance_service
Expand Down Expand Up @@ -183,13 +193,18 @@ impl InstanceController {
params: web::Path<(String, String)>,
data: web::Data<ActixAppState>,
) -> impl Responder {
let mut instance_service =
match InstanceService::new(&data.grpc_address, &data.etcd_address).await {
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};
let mut instance_service = match InstanceService::new(
&data.grpc_address,
&data.etcd_address,
data.grpc_client_connection_max_retries,
)
.await
{
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};

let (namespace, name) = params.into_inner();
match instance_service
Expand All @@ -216,27 +231,31 @@ impl InstanceController {
pagination: Option<web::Query<Pagination>>,
data: web::Data<ActixAppState>,
) -> impl Responder {
let mut instance_service =
match InstanceService::new(&data.grpc_address, &data.etcd_address).await {
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};
let mut instance_service = match InstanceService::new(
&data.grpc_address,
&data.etcd_address,
data.grpc_client_connection_max_retries,
)
.await
{
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};

match pagination {
let instances = match pagination {
Some(pagination) => {
let instances = instance_service
instance_service
.get_instances(pagination.limit, pagination.offset, &namespace)
.await;

InstanceVector::new(instances).to_http()
.await
}
None => {
let instances = instance_service.get_instances(0, 0, &namespace).await;
None => instance_service.get_instances(0, 0, &namespace).await,
};

InstanceVector::new(instances).to_http()
}
}
HttpResponse::build(StatusCode::OK).json(APIResponse::<Vec<Instance>> {
data: instances,
metadata: APIResponseMetadata::default(),
})
}
}
61 changes: 0 additions & 61 deletions controller/lib/src/external_api/instance/filter.rs

This file was deleted.

1 change: 0 additions & 1 deletion controller/lib/src/external_api/instance/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod controller;
mod filter;
pub mod model;
pub mod service;
34 changes: 4 additions & 30 deletions controller/lib/src/external_api/instance/model.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::net::Ipv4Addr;

use actix_web::HttpResponse;
use proto::controller::{InstanceState, Type};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};

use crate::external_api::generic::model::{APIResponse, APIResponseMetadata};
#[derive(Deserialize, Serialize)]
pub struct InstanceDTO {
pub workload_name: String,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct Instance {
Expand All @@ -23,40 +25,12 @@ pub struct Instance {
pub namespace: String,
}

#[derive(Deserialize, Serialize)]
pub struct InstanceVector {
pub instances: Vec<Instance>,
}
impl InstanceVector {
pub fn new(instances: Vec<Instance>) -> InstanceVector {
InstanceVector { instances }
}
pub fn to_http(self) -> HttpResponse {
HttpResponse::Ok().json(APIResponse::<Vec<Instance>> {
data: self.instances,
metadata: APIResponseMetadata::default(),
})
}
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct Resource {
pub limit: Option<ResourceSummary>,
pub usage: Option<ResourceSummary>,
}

#[derive(Deserialize, Serialize)]
pub struct InstanceDTO {
pub workload_name: String,
}

#[derive(Deserialize, Serialize)]

pub struct Pagination {
pub limit: u32,
pub offset: u32,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct ResourceSummary {
pub cpu: u64,
Expand Down
26 changes: 17 additions & 9 deletions controller/lib/src/external_api/instance/service.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;

use super::filter::InstanceFilterService;
use super::model::Instance;
use crate::etcd::{EtcdClient, EtcdClientError};
use crate::external_api::generic::filter::FilterService;
use crate::external_api::workload::model::Workload;
use crate::grpc_client::interface::{SchedulerClientInterface, SchedulerClientInterfaceError};
use log::{debug, trace};
use proto::controller::InstanceState;
use serde_json;
use thiserror::Error;
use tokio::sync::Mutex;
use tokio::{sync::Mutex, time};
use tonic::{Request, Status};

#[derive(Debug, Error)]
Expand All @@ -32,7 +33,7 @@ pub enum InstanceServiceError {
pub struct InstanceService {
grpc_service: SchedulerClientInterface,
etcd_service: EtcdClient,
filter_service: InstanceFilterService,
filter_service: FilterService,
}

// `InstanceService` is a struct that is inspired from Controllers Provider Modules architectures. It is used as a service in the InstanceController. A service can use other services.
Expand All @@ -46,15 +47,19 @@ impl InstanceService {
pub async fn new(
grpc_address: &str,
etcd_address: &SocketAddr,
grpc_client_connection_max_retries: u32,
) -> Result<Self, InstanceServiceError> {
Ok(InstanceService {
grpc_service: SchedulerClientInterface::new(grpc_address.to_string())
.await
.map_err(InstanceServiceError::SchedulerClientInterfaceError)?,
grpc_service: SchedulerClientInterface::new(
grpc_address.to_string(),
grpc_client_connection_max_retries,
)
.await
.map_err(InstanceServiceError::SchedulerClientInterfaceError)?,
etcd_service: EtcdClient::new(etcd_address.to_string())
.await
.map_err(InstanceServiceError::EtcdError)?,
filter_service: InstanceFilterService::new(),
filter_service: FilterService::new(),
})
}

Expand Down Expand Up @@ -149,6 +154,8 @@ impl InstanceService {
pub fn schedule_instance(this: Arc<Mutex<Self>>, mut instance: Instance) {
//Spawn a thread to start the instance
tokio::spawn(async move {
let mut cooldown = 1;

loop {
let mut stream = this
.clone()
Expand Down Expand Up @@ -196,8 +203,9 @@ impl InstanceService {
}

instance.num_restarts += 1;

debug!("Restarting instance {}", instance.id);
debug!("Restarting instance {} in {}s", instance.id, cooldown);
time::sleep(Duration::from_secs(cooldown)).await;
cooldown *= 2;

this.clone()
.lock()
Expand Down
3 changes: 3 additions & 0 deletions controller/lib/src/external_api/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct ExternalAPIInterface {}
pub struct ActixAppState {
pub etcd_address: SocketAddr,
pub grpc_address: String,
pub grpc_client_connection_max_retries: u32,
}

impl ExternalAPIInterface {
Expand All @@ -32,6 +33,7 @@ impl ExternalAPIInterface {
num_workers: usize,
etcd_address: SocketAddr,
grpc_address: String,
grpc_client_connection_max_retries: u32,
) -> Result<Self, ExternalAPIInterfaceError> {
let mut etcd_client = EtcdClient::new(etcd_address.to_string())
.await
Expand Down Expand Up @@ -65,6 +67,7 @@ impl ExternalAPIInterface {
.app_data(web::Data::new(ActixAppState {
etcd_address,
grpc_address: grpc_address.clone(),
grpc_client_connection_max_retries,
}))
.route("/health", web::get().to(HttpResponse::Ok))
.service(workload::controller::WorkloadController {}.services())
Expand Down
19 changes: 10 additions & 9 deletions controller/lib/src/external_api/namespace/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,19 @@ impl NamespaceController {
Err(err) => return NamespaceControllerError::NamespaceServiceError(err).into(),
};

match pagination {
let namespaces = match pagination {
Some(pagination) => {
let namespaces = namespace_service
namespace_service
.get_all_namespace(pagination.limit, pagination.offset)
.await;
namespaces.to_http()
.await
}
None => {
let namespaces = namespace_service.get_all_namespace(0, 0).await;
namespaces.to_http()
}
}
None => namespace_service.get_all_namespace(0, 0).await,
};

HttpResponse::build(StatusCode::OK).json(APIResponse::<Vec<Namespace>> {
metadata: APIResponseMetadata::default(),
data: namespaces,
})
}

pub async fn patch_namespace(
Expand Down
Loading