Skip to content

Commit

Permalink
refactoring (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
archeoss committed Nov 22, 2023
1 parent 4ae2dbc commit bebd430
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 52 deletions.
7 changes: 4 additions & 3 deletions backend/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod prelude {
};
pub use futures::{Stream, StreamExt};
pub use hyper::{body::Bytes, service::Service, Response, Uri};
pub use std::collections::BTreeMap;
pub use std::{
str::FromStr,
sync::Arc,
Expand Down Expand Up @@ -107,7 +108,7 @@ pub struct BobClient<Context: Send + Sync, Client: ApiNoContext<Context> + Send
main: Arc<Client>,

/// Clients for all known nodes
cluster: HashMap<NodeName, Arc<Client>>,
cluster: BTreeMap<NodeName, Arc<Client>>,

context_marker: PhantomData<fn(Context)>,
}
Expand Down Expand Up @@ -168,7 +169,7 @@ impl<Context: Send + Sync, ApiInterface: ApiNoContext<Context> + Send + Sync>
.change_context(ClientError::PermissionDenied)?
};

let cluster: HashMap<NodeName, Arc<_>> = nodes
let cluster: BTreeMap<NodeName, Arc<_>> = nodes
.iter()
.filter_map(|node| HttpClient::from_node(node, &bob_data.hostname, context.clone()))
.collect();
Expand Down Expand Up @@ -300,7 +301,7 @@ impl<Context: Send + Sync, ApiInterface: ApiNoContext<Context> + Send + Sync>
}

#[must_use]
pub const fn cluster_with_addr(&self) -> &HashMap<NodeName, Arc<ApiInterface>> {
pub const fn cluster_with_addr(&self) -> &BTreeMap<NodeName, Arc<ApiInterface>> {
&self.cluster
}

Expand Down
55 changes: 55 additions & 0 deletions backend/src/models/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,61 @@ impl From<dto::MetricsSnapshotModel> for TypedMetrics {
}
}

#[derive(IntoParams, Deserialize, Clone)]
#[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))]
pub struct Pagination {
#[serde(default)]
pub page: usize,
#[serde(default)]
pub per_page: usize,
}

#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))]
pub struct PaginatedResponse<T> {
/// The page of data being returned
pub data: Vec<T>,
/// The number of rows returned in the current page
pub count: usize,
/// The total number of rows available
pub total: usize,
/// The current page being returned
pub page: usize,
/// The number of pages available
pub page_count: usize,
}

impl<T> PaginatedResponse<T> {
/// Create a new `PaginatedResponse`
#[must_use]
pub fn new(data: Vec<T>, total: usize, page: usize, page_size: usize) -> Self {
let count = data.len().try_into().unwrap_or(0);
let page_count = total / page_size + usize::from(total % page_size != 0);

Self {
data,
count,
total,
page,
page_count,
}
}

/// Transform the data contained in the `PaginatedResponse`
pub fn map<B, F>(self, func: F) -> PaginatedResponse<B>
where
F: Fn(T) -> B,
{
PaginatedResponse::<B> {
data: self.data.into_iter().map(func).collect(),
count: self.count,
total: self.total,
page: self.page,
page_count: self.page_count,
}
}
}

#[cfg(test)]
mod tests {
use super::{
Expand Down
97 changes: 48 additions & 49 deletions backend/src/services/api.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use axum::extract::Path;

use crate::{
connector::dto::NodeConfiguration,
models::bob::{DiskName, IsActive},
};

use super::{
auth::HttpClient,
methods::{fetch_configuration, fetch_metrics, fetch_nodes, fetch_vdisks},
prelude::*,
};
use crate::{
connector::dto::NodeConfiguration,
models::bob::{DiskName, IsActive},
};
use axum::extract::{Path, Query};

/// Returns count of Physical Disks per status
///
Expand Down Expand Up @@ -234,7 +232,7 @@ pub async fn get_space(Extension(client): Extension<HttpBobClient>) -> Json<Spac

Json(total_space)
}

// TODO: return simple list of nodes and simplified version of detailed_nodes
/// Returns list of all known nodes
///
/// # Errors
Expand All @@ -245,14 +243,53 @@ pub async fn get_space(Extension(client): Extension<HttpBobClient>) -> Json<Spac
context_path = ApiV1::to_path(),
path = "/nodes",
responses(
(status = 200, body = Vec<Node>, content_type = "application/json", description = "Node List"),
(status = 200, body = PaginatedResponse<Node>, content_type = "application/json", description = "Node List"),
(status = 401, description = "Unauthorized")
),
security(("api_key" = []))
))]
pub async fn get_nodes(Extension(client): Extension<HttpBobClient>) -> AxumResult<Json<Vec<Node>>> {
pub async fn get_nodes(
Extension(client): Extension<HttpBobClient>,
Query(params): Query<Option<Pagination>>,
) -> AxumResult<Json<PaginatedResponse<Node>>> {
tracing::info!("get /nodes : {client:?}");

let len = client.cluster_with_addr().len();
Ok(if let Some(page_params) = params {
Json(PaginatedResponse::new(
batch_get_nodes(client, page_params.clone()).await?,
len,
page_params.page,
page_params.per_page,
))
} else {
Json(PaginatedResponse::new(
dump_get_nodes(client).await?,
len,
1,
1,
))
})
}

pub async fn batch_get_nodes(
client: BobClient<

Check warning on line 276 in backend/src/services/api.rs

View workflow job for this annotation

GitHub Actions / gen-openapi

unused variable: `client`

Check warning on line 276 in backend/src/services/api.rs

View workflow job for this annotation

GitHub Actions / gen-openapi

unused variable: `client`
ClientContext,
ContextWrapper<
Client<
DropContextService<hyper::Client<HttpConnector>, ClientContext>,
ClientContext,
Basic,
>,
ClientContext,
>,
>,
page_params: Pagination,

Check warning on line 287 in backend/src/services/api.rs

View workflow job for this annotation

GitHub Actions / gen-openapi

unused variable: `page_params`

Check warning on line 287 in backend/src/services/api.rs

View workflow job for this annotation

GitHub Actions / gen-openapi

unused variable: `page_params`
) -> AxumResult<Vec<Node>> {
todo!()
}

pub async fn dump_get_nodes(client: HttpBobClient) -> AxumResult<Vec<Node>> {
let mut metrics: FuturesUnordered<_> = client
.cluster()
.map(move |node| {
Expand Down Expand Up @@ -341,7 +378,7 @@ pub async fn get_nodes(Extension(client): Extension<HttpBobClient>) -> AxumResul
}
tracing::trace!("send response: {res:?}");

Ok(Json(res.values().cloned().collect()))
Ok(res.values().cloned().collect())
}

/// Get Virtual Disks
Expand Down Expand Up @@ -497,44 +534,6 @@ async fn is_node_online(client: &HttpBobClient, node: &dto::Node) -> bool {
(client.probe_socket(&node.name).await).map_or(false, |code| code == StatusCode::OK)
}

fn proccess_disks(
disks: &[dto::DiskState],
space: &dto::SpaceInfo,
metrics: &dto::MetricsSnapshotModel,
) -> Vec<Disk> {
let mut res_disks = vec![];
let mut visited_disks = HashSet::new();
for disk in disks {
if !visited_disks.insert(disk.name.clone()) {
tracing::warn!(
"disk {} with path {} duplicated, skipping...",
disk.name,
disk.path
);
continue;
}
res_disks.push(Disk {
name: disk.name.clone(),
path: disk.path.clone(),
status: DiskStatus::from_space_info(space, &disk.name),
total_space: space.total_disk_space_bytes,
used_space: space
.occupied_disk_space_by_disk
.get(&disk.name.clone())
.copied()
.unwrap_or_default(),
iops: metrics
.metrics
.get(&format!("hardware.disks.{:?}_iops", disk.name))
.cloned()
.unwrap_or_default()
.value,
});
}

res_disks
}

/// Get Raw Metrics from Node
///
/// # Errors
Expand Down

0 comments on commit bebd430

Please sign in to comment.