From 35d21ae297524b674b837014a7abaa804fea81f7 Mon Sep 17 00:00:00 2001 From: Simeon Romanov Date: Thu, 12 Oct 2023 23:16:13 +0300 Subject: [PATCH] login page (backend) --- CHANGELOG.md | 2 + Cargo.toml | 2 +- api/openapi.yaml | 100 ++- backend/Cargo.toml | 23 +- backend/src/connector/api.rs | 410 ++++++++++ backend/src/connector/client.rs | 431 +++++++++++ backend/src/connector/context.rs | 88 +++ backend/src/connector/dto.rs | 1218 ++++++++++++++++++++++++++++++ backend/src/connector/error.rs | 129 ++++ backend/src/connector/mod.rs | 308 ++++++++ backend/src/error.rs | 2 - backend/src/lib.rs | 88 ++- backend/src/main.rs | 54 +- backend/src/models/api.rs | 1 + backend/src/models/bob.rs | 4 + backend/src/models/mod.rs | 8 + backend/src/models/shared.rs | 148 ++++ backend/src/router.rs | 107 ++- backend/src/services/auth.rs | 355 +++++++++ backend/src/services/mod.rs | 47 +- docker-compose.yaml | 4 +- dockerfiles/alpine/Dockerfile | 4 +- proc_macro/.gitignore | 3 + proc_macro/Cargo.toml | 20 + proc_macro/src/lib.rs | 36 + 25 files changed, 3507 insertions(+), 85 deletions(-) create mode 100644 backend/src/connector/api.rs create mode 100644 backend/src/connector/client.rs create mode 100644 backend/src/connector/context.rs create mode 100644 backend/src/connector/dto.rs create mode 100644 backend/src/connector/error.rs create mode 100644 backend/src/models/api.rs create mode 100644 backend/src/models/bob.rs create mode 100644 backend/src/models/shared.rs create mode 100644 backend/src/services/auth.rs create mode 100644 proc_macro/.gitignore create mode 100644 proc_macro/Cargo.toml create mode 100644 proc_macro/src/lib.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index e2da55f3..fb6fb36f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,10 @@ Bob Management GUI changelog ## [Unreleased] #### Added + - Initial project structure, backend only (#9) - Initial project stricture, frontend (#10) - Dockerfile and Docker-Compose to simplify deployment (#5) - CI/CD configuration (#11) - Logger Initialization (#14) +- Login Page, backend (#16) diff --git a/Cargo.toml b/Cargo.toml index a01413c5..c62b8018 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ license-file = "./LICENSE" edition = "2021" [workspace] -members = [ "cli", "frontend", "backend", "utils" ] +members = [ "cli", "frontend", "backend", "utils", "proc_macro" ] default-members = [ "frontend", "backend"] resolver = "2" diff --git a/api/openapi.yaml b/api/openapi.yaml index b9eec94f..baeb1a55 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -1,14 +1,73 @@ openapi: 3.0.3 info: title: bob-management - description: 'Bob Management GUI: Backend' + description: Bob Management GUI contact: name: Romanov Simeon ArchArcheoss@proton.me license: name: '' version: 0.0.0 paths: - /root: + /api/v1/login: + post: + tags: + - services::auth + summary: Login to a BOB cluster + description: | + Login to a BOB cluster + + # Errors + This function can return the following errors + + 1. [`StatusCode::BAD_REQUEST`] + The function failed to parse hostname of the request + + 2. [`StatusCode::NOT_FOUND`] + The client was unable to reach the host + + 3. [`StatusCode::UNAUTHORIZED`] + The client couldn't authorize on the host + operationId: login + parameters: + - name: hostname + in: path + description: Address to connect to + required: true + schema: + $ref: '#/components/schemas/Hostname' + - name: credentials + in: path + description: '[Optional] Credentials used for BOB authentication' + required: true + schema: + allOf: + - $ref: '#/components/schemas/Credentials' + nullable: true + requestBody: + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/BobConnectionData' + required: true + responses: + '200': + description: Successful authorization + '400': + description: Bad Hostname + '401': + description: Bad Credentials + '404': + description: Can't reach specified hostname + /api/v1/logout: + post: + tags: + - services::auth + operationId: logout + responses: + '200': + description: Logged out + /api/v1/root: get: tags: - crate @@ -16,6 +75,43 @@ paths: responses: '200': description: Hello Bob! +components: + schemas: + BobConnectionData: + type: object + description: Data needed to connect to a BOB cluster + required: + - hostname + properties: + credentials: + allOf: + - $ref: '#/components/schemas/Credentials' + nullable: true + hostname: + $ref: '#/components/schemas/Hostname' + example: + credentials: + login: archeoss + password: '12345' + hostname: 0.0.0.0:7000 + Credentials: + type: object + description: Optional auth credentials for a BOB cluster + required: + - login + - password + properties: + login: + type: string + description: Login used during auth + password: + type: string + description: Password used during auth + example: + login: archeoss + password: '12345' + Hostname: + $ref: '#/components/schemas/Uri' tags: - name: bob description: BOB management API diff --git a/backend/Cargo.toml b/backend/Cargo.toml index e4f2d2ec..7cfb2071 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bob-management" -description = "Bob Management GUI: Backend" +description = "Bob Management GUI" publish = false keywords = [ "BOB", "Management", "GUI" ] version.workspace = true @@ -13,10 +13,9 @@ repository.workspace = true [dependencies] # Backend (lib.rs) ## Axum related -axum = "0.6" +axum = { version = "0.6", features = ["headers"] } axum-macros = "0.3" -axum-login = "0.6" -axum-sessions = "0.6" +tower-sessions = "0.5" tower = "0.4" tower-http = { version = "0.4", features = ["cors", "fs"] } @@ -25,6 +24,7 @@ tracing = "0.1" file-rotate = "0.7" tracing-appender = "0.2" tracing-subscriber = "0.3" +# tracing-forest = { version = "0.1", features = ["tokio"] } ## Error Handling error-stack = "0.4" @@ -32,8 +32,12 @@ thiserror = "1.0" ## General tokio = { version = "1.32", features = ["rt", "macros", "rt-multi-thread" ] } -hyper = "0.14" -lazy_static = "1.4" +hyper = { version = "0.14", features = ["http2", "client"] } +hyper_serde = "0.13" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +uuid = { version = "1.4", features = ["v4", "serde", "fast-rng"] } +futures = "0.3" ## OpenAPI + Swagger utoipa = { version = "4.0", features = ["yaml", "axum_extras", "chrono", "openapi_extensions"], optional = true } @@ -44,7 +48,14 @@ utoipa-rapidoc = { version = "1.0", features = ["axum"], optional = true } ## CLI cli = { path = "../cli" } +# Macro +proc-macro = { path = "../proc_macro" } + +[dev-dependencies] +utoipa = { version = "4.0", features = ["yaml", "axum_extras", "chrono", "openapi_extensions"]} + [features] default = [ "swagger" ] swagger = [ "dep:utoipa", "dep:utoipa-swagger-ui" , "dep:utoipa-redoc", "dep:utoipa-rapidoc" ] gen_api = [ "dep:utoipa" ] + diff --git a/backend/src/connector/api.rs b/backend/src/connector/api.rs new file mode 100644 index 00000000..3c9b6a1f --- /dev/null +++ b/backend/src/connector/api.rs @@ -0,0 +1,410 @@ +//! +//! This file was auto-generated using OpenAPI server generator +//! at +//! on 2023-07-07 +//! using BOB's REST API schema, commit = ade0eadf1db7cda072cfab07dff7b1b57247e34a: +//! +//! + +use super::dto::{self}; +use super::prelude::*; + +pub type ServiceError = Box; + +/// Errors that happend during API request proccessing +#[derive(Debug, Error)] +pub enum APIError { + #[error("the request to the specified resource failed")] + RequestFailed, + #[error("server received invalid status code from client: `{0}`")] + InvalidStatusCode(StatusCode), + #[error("can't read hyper response")] + ResponseError, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum GetAlienResponse { + /// Alien Node name + AlienNodeName(String), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetAlienDirResponse { + /// Directory + Directory(dto::Dir), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not acceptable backend + NotAcceptableBackend(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetDisksResponse { + /// A JSON array with disks and their states + AJSONArrayWithDisksAndTheirStates(Vec), + /// Permission denied + PermissionDenied(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum GetMetricsResponse { + /// Metrics + Metrics(dto::MetricsSnapshotModel), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetNodesResponse { + /// A JSON array of nodes info and vdisks on them + AJSONArrayOfNodesInfoAndVdisksOnThem(Vec), + /// Permission denied + PermissionDenied, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetPartitionResponse { + /// A JSON with partition info + AJSONWithPartitionInfo(dto::Partition), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not found + NotFound(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetPartitionsResponse { + /// Node info and JSON array with partitions info + NodeInfoAndJSONArrayWithPartitionsInfo(dto::VDiskPartitions), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not found + NotFound(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetRecordsResponse { + /// Records count + RecordsCount(i32), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not found + NotFound(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetReplicasLocalDirsResponse { + /// A JSON array with dirs + AJSONArrayWithDirs(Vec), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not found + NotFound(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum GetSpaceInfoResponse { + /// Space info + SpaceInfo(dto::SpaceInfo), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum GetStatusResponse { + /// A JSON with node info + AJSONWithNodeInfo(dto::Node), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetVDiskResponse { + /// A JSON with vdisk info + AJSONWithVdiskInfo(dto::VDisk), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not found + NotFound(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetVDisksResponse { + /// A JSON array of vdisks info + AJSONArrayOfVdisksInfo(Vec), + /// Permission denied + PermissionDenied, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum GetVersionResponse { + /// Version info + VersionInfo(dto::VersionInfo), +} + +/// Returns configuration of the node +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetConfigurationResponse { + /// Configuration object + ConfigurationObject(dto::NodeConfiguration), + /// Permission denied + PermissionDenied, +} + +/// API +pub trait Api { + fn poll_ready( + &self, + _cx: &mut Context, + ) -> Poll>> { + Poll::Ready(Ok(())) + } + + /// Return directory of alien + async fn get_alien_dir(&self, context: &C) -> Result; + + /// Returns the list of disks with their states + async fn get_disks(&self, context: &C) -> Result; + + /// Get metrics + async fn get_metrics(&self, context: &C) -> Result; + + /// Returns a list of known nodes + async fn get_nodes(&self, context: &C) -> Result; + + /// Returns a partition info by ID + async fn get_partition( + &self, + v_disk_id: i32, + partition_id: String, + context: &C, + ) -> Result; + + /// Returns a list of partitions + async fn get_partitions( + &self, + v_disk_id: i32, + context: &C, + ) -> Result; + + /// Returns count of records of this on node + async fn get_records( + &self, + v_disk_id: i32, + context: &C, + ) -> Result; + + /// Returns directories of local replicas of vdisk + async fn get_replicas_local_dirs( + &self, + v_disk_id: i32, + context: &C, + ) -> Result; + + /// Get space info + async fn get_space_info(&self, context: &C) -> Result; + + /// Returns information about self + async fn get_status(&self, context: &C) -> Result; + + /// Returns a vdisk info by ID + async fn get_v_disk(&self, v_disk_id: i32, context: &C) -> Result; + + /// Returns a list of vdisks + async fn get_v_disks(&self, context: &C) -> Result; + + /// Returns server version + async fn get_version(&self, context: &C) -> Result; + + /// Returns configuration of the node + async fn get_configuration(&self, context: &C) -> Result; +} + +/// API where `Context` isn't passed on every API call +pub trait ApiNoContext { + fn poll_ready(&self, _cx: &mut Context) -> Poll>; + + fn context(&self) -> &C; + + /// Return directory of alien + async fn get_alien_dir(&self) -> Result; + + /// Returns the list of disks with their states + async fn get_disks(&self) -> Result; + + /// Get metrics + async fn get_metrics(&self) -> Result; + + /// Returns a list of known nodes + async fn get_nodes(&self) -> Result; + + /// Returns a partition info by ID + async fn get_partition( + &self, + v_disk_id: i32, + partition_id: String, + ) -> Result; + + /// Returns a list of partitions + async fn get_partitions(&self, v_disk_id: i32) -> Result; + + /// Returns count of records of this on node + async fn get_records(&self, v_disk_id: i32) -> Result; + + /// Returns directories of local replicas of vdisk + async fn get_replicas_local_dirs( + &self, + v_disk_id: i32, + ) -> Result; + + /// Get space info + async fn get_space_info(&self) -> Result; + + /// Returns information about self + async fn get_status(&self) -> Result; + + /// Returns a vdisk info by ID + async fn get_v_disk(&self, v_disk_id: i32) -> Result; + + /// Returns a list of vdisks + async fn get_v_disks(&self) -> Result; + + /// Returns server version + async fn get_version(&self) -> Result; + + /// Returns configuration of the node + async fn get_configuration(&self) -> Result; +} + +/// Trait to extend an API to make it easy to bind it to a context. +pub trait ContextWrapperExt +where + Self: Sized, +{ + /// Binds this API to a context. + fn with_context(self, context: C) -> ContextWrapper; +} + +impl + Send + Sync, C: Clone + Send + Sync> ContextWrapperExt for T { + fn with_context(self: T, context: C) -> ContextWrapper { + ContextWrapper::::new(self, context) + } +} + +impl + Send + Sync, C: Clone + Send + Sync> ApiNoContext for ContextWrapper { + fn poll_ready(&self, cx: &mut Context) -> Poll> { + self.api().poll_ready(cx) + } + + fn context(&self) -> &C { + Self::context(self) + } + + /// Return directory of alien + async fn get_alien_dir(&self) -> Result { + let context = self.context().clone(); + self.api().get_alien_dir(&context).await + } + /// Returns the list of disks with their states + async fn get_disks(&self) -> Result { + let context = self.context().clone(); + self.api().get_disks(&context).await + } + + /// Get metrics + async fn get_metrics(&self) -> Result { + let context = self.context().clone(); + self.api().get_metrics(&context).await + } + + /// Returns a list of known nodes + async fn get_nodes(&self) -> Result { + let context = self.context().clone(); + self.api().get_nodes(&context).await + } + + /// Returns a partition info by ID + async fn get_partition( + &self, + v_disk_id: i32, + partition_id: String, + ) -> Result { + let context = self.context().clone(); + self.api() + .get_partition(v_disk_id, partition_id, &context) + .await + } + + /// Returns a list of partitions + async fn get_partitions(&self, v_disk_id: i32) -> Result { + let context = self.context().clone(); + self.api().get_partitions(v_disk_id, &context).await + } + + /// Returns count of records of this on node + async fn get_records(&self, v_disk_id: i32) -> Result { + let context = self.context().clone(); + self.api().get_records(v_disk_id, &context).await + } + + /// Returns directories of local replicas of vdisk + async fn get_replicas_local_dirs( + &self, + v_disk_id: i32, + ) -> Result { + let context = self.context().clone(); + self.api() + .get_replicas_local_dirs(v_disk_id, &context) + .await + } + + /// Get space info + async fn get_space_info(&self) -> Result { + let context = self.context().clone(); + self.api().get_space_info(&context).await + } + + /// Returns information about self + async fn get_status(&self) -> Result { + let context = self.context().clone(); + self.api().get_status(&context).await + } + + /// Returns a vdisk info by ID + async fn get_v_disk(&self, v_disk_id: i32) -> Result { + let context = self.context().clone(); + self.api().get_v_disk(v_disk_id, &context).await + } + + /// Returns a list of vdisks + async fn get_v_disks(&self) -> Result { + let context = self.context().clone(); + self.api().get_v_disks(&context).await + } + + /// Returns server version + async fn get_version(&self) -> Result { + let context = self.context().clone(); + self.api().get_version(&context).await + } + + /// Returns configuration of the node + async fn get_configuration(&self) -> Result { + let context = self.context().clone(); + self.api().get_configuration(&context).await + } +} + +pub mod prelude { + pub use super::{ + APIError, Api, GetAlienDirResponse, GetAlienResponse, GetConfigurationResponse, + GetDisksResponse, GetMetricsResponse, GetNodesResponse, GetPartitionResponse, + GetPartitionsResponse, GetRecordsResponse, GetReplicasLocalDirsResponse, + GetSpaceInfoResponse, GetStatusResponse, GetVDiskResponse, GetVDisksResponse, + GetVersionResponse, + }; +} diff --git a/backend/src/connector/client.rs b/backend/src/connector/client.rs new file mode 100644 index 00000000..ed9d64a4 --- /dev/null +++ b/backend/src/connector/client.rs @@ -0,0 +1,431 @@ +//! +//! This file was *partly* auto-generated using OpenAPI server generator +//! at +//! on 2023-07-07 +//! using BOB's REST API schema, commit = ade0eadf1db7cda072cfab07dff7b1b57247e34a: +//! +//! +//! This file was modified in order to get rid of the "swagger" crate, which brings +//! a lot of unnecessary dependencies (for example, openssl, which can cause problems +//! when creating docker images, even if we use only the http client). In addition, +//! some refactoring was done in order to: +//! 1. Reduce the code size (from 2k-ish LOC to 500). +//! 2. Provide new functionality, e.g. adding authorization (and other) headers for +//! new requests +//! 3. Unify error handling with `error_stack` +//! + +#![allow( + missing_docs, + clippy::module_name_repetitions, + dead_code, + unused_variables +)] + +use hyper::body::to_bytes; + +use super::{api::prelude::*, prelude::*}; + +/// Error type for failing to create a Client +#[derive(Debug, Error)] +pub enum ClientInitError { + #[error("invlaid URL scheme")] + InvalidScheme, + + #[error("invlaid URI scheme")] + InvalidUri, + + #[error("no hostname specified")] + MissingHost, +} + +/// A client that implements the API by making HTTP calls out to a server. +#[derive(Clone)] +pub struct Client +where + S: Service<(Request, C), Response = Response> + Clone + Sync + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + std::fmt::Display, + C: Clone + Send + Sync + 'static, +{ + /// Inner service + client_service: S, + + /// Base path of the API + base_path: String, + + /// Context Marker + con_marker: PhantomData, + + /// Credentials Marker + cred_marker: PhantomData, +} + +#[derive(Debug, Clone)] +pub enum HyperClient { + Http(hyper::client::Client), +} + +impl Service> for HyperClient { + type Response = Response; + type Error = hyper::Error; + type Future = hyper::client::ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + match self { + Self::Http(client) => client.poll_ready(cx), + } + } + + fn call(&mut self, req: Request) -> Self::Future { + match self { + Self::Http(client) => client.call(req), + } + } +} + +impl Client, C, Cr> +where + C: Clone + Send + Sync + 'static, +{ + /// Create an HTTP client. + /// + /// # Arguments + /// * `base_path` - base path of the client API, i.e. + /// + /// # Errors + /// + /// This function will return an error if base path isn't valid URL + pub fn try_new(base_path: &str) -> Result { + let uri = Uri::from_str(base_path).change_context(ClientInitError::InvalidUri)?; + + let scheme = uri.scheme_str().ok_or(ClientInitError::InvalidScheme)?; + let scheme = scheme.to_ascii_lowercase(); + + let connector = Connector::builder(); + + let client_service = match scheme.as_str() { + "http" => HyperClient::Http(hyper::client::Client::builder().build(connector.build())), + + _ => { + return Err(ClientInitError::InvalidScheme.into()); + } + }; + + let client_service = DropContextService::new(client_service); + + Ok(Self { + client_service, + base_path: into_base_path(base_path, None)?, + con_marker: PhantomData, + cred_marker: PhantomData, + }) + } +} + +/// Convert input into a base path, e.g. . Also checks the scheme as it goes. +fn into_base_path( + input: impl TryInto, + _correct_scheme: Option<&'static str>, +) -> Result { + // First convert to Uri, since a base path is a subset of Uri. + let uri = input + .try_into() + .change_context(ClientInitError::InvalidUri)?; + + let scheme = uri.scheme_str().ok_or(ClientInitError::InvalidScheme)?; + + // Check the scheme if necessary + // if let Some(correct_scheme) = correct_scheme { + // if scheme != correct_scheme { + // return Err(ClientInitError::InvalidScheme); + // } + // } + + let host = uri.host().ok_or(ClientInitError::MissingHost)?; + let port = uri.port_u16().map(|x| format!(":{x}")).unwrap_or_default(); + Ok(format!( + "{}://{}{}{}", + scheme, + host, + port, + uri.path().trim_end_matches('/') + )) +} + +impl + Client, C>, C, Cr> +where + C: Clone + Send + Sync + 'static, +{ + /// Create an HTTP client. + /// + /// # Arguments + /// * `base_path` - base path of the client API, i.e. + /// + /// # Errors + /// + /// This function will return an error if base path isn't valid URL + pub fn try_new_http(base_path: &str) -> Result { + let http_connector = Connector::builder().build(); + + Self::try_new_with_connector(base_path, Some("http"), http_connector) + } +} + +impl Client, C>, C, Cr> +where + Connector: hyper::client::connect::Connect + Clone + Send + Sync + 'static, + C: Clone + Send + Sync + 'static, +{ + /// Create a client with a custom implementation of [`hyper::client::Connect`]. + /// + /// Intended for use with custom implementations of connect for e.g. protocol logging + /// or similar functionality which requires wrapping the transport layer. When wrapping a TCP connection, + /// this function should be used in conjunction with `swagger::Connector::builder()`. + /// + /// # Arguments + /// + /// * `base_path` - base path of the client API, i.e. + /// * `protocol` - Which protocol to use when constructing the request url, e.g. `Some("http")` + /// * `connector` - Implementation of `hyper::client::Connect` to use for the client + /// + /// # Errors + /// + /// The function will fail if base path isn't a valid URL + /// + pub fn try_new_with_connector( + base_path: &str, + protocol: Option<&'static str>, + connector: Connector, + ) -> Result { + let client_service = hyper::client::Client::builder().build(connector); + let client_service = DropContextService::new(client_service); + + Ok(Self { + client_service, + base_path: into_base_path(base_path, protocol)?, + con_marker: PhantomData, + cred_marker: PhantomData, + }) + } +} + +impl Client +where + Cr: Credentials + Clone, + S: Service<(Request, C), Response = Response> + Clone + Sync + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + std::fmt::Display + error_stack::Context, + C: Clone + Send + Sync + Has + Has>>, +{ + fn form_request( + &self, + endpoint: &str, + method: Method, + context: &C, + ) -> Result, ClientError> { + let uri = format!("{}{endpoint}", self.base_path); + + let uri = Uri::from_str(&uri).change_context(ClientError::BadUri)?; + let mut request = Request::builder() + .method(method) + .uri(uri) + .body(Body::empty()) + .change_context(ClientError::CantFormRequest)?; + let xspan = Has::::get(context); + let header = + HeaderValue::from_str(&xspan.0).change_context(ClientError::CantFormRequest)?; + + request + .headers_mut() + .insert(HeaderName::from_static("x-span-id"), header); + + let auth_data = Has::>>::get(context); + if let Some(auth) = auth_data { + request + .headers_mut() + .typed_insert::>(auth.clone()); + } + + Ok(request) + } + + async fn handle_response_json Deserialize<'a>, T>( + &self, + response: Response, + body_handler: impl Fn(R) -> T + Send, + ) -> Result { + let body = response.into_body(); + let body = to_bytes(body) + .await + .change_context(APIError::ResponseError)?; + let body = std::str::from_utf8(&body).change_context(APIError::ResponseError)?; + + let body = serde_json::from_str::(body) + .change_context(APIError::ResponseError) + .attach_printable("Response body did not match the schema")?; + + Ok(body_handler(body)) + } +} + +impl Client +where + Cr: Credentials + Clone, + S: Service<(Request, C), Response = Response> + Clone + Sync + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + std::fmt::Display + error_stack::Context, + C: Clone + Send + Sync + Has, +{ + async fn call(&self, req: Request, cx: &C) -> Result, APIError> { + let timeout = Has::::get(cx); + tokio::time::timeout( + timeout.clone().into_inner(), + self.client_service.clone().call((req, cx.clone())), + ) + .await + .change_context(APIError::RequestFailed) + .attach_printable("No Response received")? + .change_context(APIError::RequestFailed) + .attach_printable("Hyper error") + } +} + +impl Api for Client +where + Cr: Credentials + Clone, + S: Service<(Request, C), Response = Response> + Clone + Sync + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + std::fmt::Display + error_stack::Context, + C: Clone + + Send + + Sync + + Has + + Has + + Has>>, +{ + /// Return directory of alien + #[must_use] + async fn get_alien_dir(&self, context: &C) -> Result { + todo!() + } + + /// Returns the list of disks with their states + #[must_use] + async fn get_disks(&self, context: &C) -> Result { + todo!() + } + + /// Get metrics + #[must_use] + async fn get_metrics(&self, context: &C) -> Result { + todo!() + } + + /// Returns a list of known nodes + #[must_use] + async fn get_nodes(&self, context: &C) -> Result { + let request = self + .form_request("/nodes", Method::GET, context) + .change_context(APIError::RequestFailed)?; + // let update_context: RefClient = ref_self; + // let response = update_context.call(request, context).await?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: Vec| { + GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(body) + }) + .await?), + 403 => Ok(GetNodesResponse::PermissionDenied), + _ => Err(APIError::from(response))?, + } + } + + /// Returns a partition info by ID + #[must_use] + async fn get_partition( + &self, + v_disk_id: i32, + partition_id: String, + context: &C, + ) -> Result { + todo!() + } + + /// Returns a list of partitions + #[must_use] + async fn get_partitions( + &self, + v_disk_id: i32, + context: &C, + ) -> Result { + todo!() + } + + /// Returns count of records of this on node + #[must_use] + async fn get_records( + &self, + v_disk_id: i32, + context: &C, + ) -> Result { + todo!() + } + + /// Returns directories of local replicas of vdisk + #[must_use] + async fn get_replicas_local_dirs( + &self, + v_disk_id: i32, + context: &C, + ) -> Result { + todo!() + } + + /// Get space info + #[must_use] + async fn get_space_info(&self, context: &C) -> Result { + todo!() + } + + /// Returns information about self + #[must_use] + async fn get_status(&self, context: &C) -> Result { + todo!() + } + + /// Returns a vdisk info by ID + #[must_use] + async fn get_v_disk(&self, v_disk_id: i32, context: &C) -> Result { + todo!() + } + + /// Returns a list of vdisks + #[must_use] + async fn get_v_disks(&self, context: &C) -> Result { + todo!() + } + + /// Returns server version + #[must_use] + async fn get_version(&self, context: &C) -> Result { + todo!() + } + + /// Returns configuration of the node + #[must_use] + async fn get_configuration(&self, context: &C) -> Result { + todo!() + } +} + +impl From> for APIError { + fn from(response: Response) -> Self { + let code = response.status(); + + Self::InvalidStatusCode(code) + } +} diff --git a/backend/src/connector/context.rs b/backend/src/connector/context.rs new file mode 100644 index 00000000..c00ec005 --- /dev/null +++ b/backend/src/connector/context.rs @@ -0,0 +1,88 @@ +use super::prelude::*; +use proc_macro::Context; + +pub trait Has { + fn get(&self) -> &T; + fn get_mut(&mut self) -> &mut T; +} + +/// Context, created for each Client instance. +#[derive(Clone, Debug, Context)] +pub struct ClientContext { + #[has] + pub timeout: RequestTimeout, + #[has] + pub auth_data: Option>, + #[has] + pub xspan: XSpanIdString, +} + +/// Context wrapper, to bind an API with a context. +#[derive(Debug, Clone)] +pub struct ContextWrapper { + api: T, + context: C, +} + +impl ContextWrapper { + /// Create a new `ContextWrapper`, binding the API and context. + pub const fn new(api: T, context: C) -> Self { + Self { api, context } + } + + /// Borrows the API. + pub const fn api(&self) -> &T { + &self.api + } + + /// Borrows the context. + pub const fn context(&self) -> &C { + &self.context + } +} + +/// Middleware that wraps a `hyper::service::Service` and drops any contextual information +/// on the request. +#[derive(Debug, Clone)] +pub struct DropContextService +where + C: Send + 'static, +{ + inner: T, + marker: PhantomData, +} + +impl DropContextService +where + C: Send + 'static, +{ + /// Create a new `DropContextService` struct wrapping a value + pub const fn new(inner: T) -> Self { + Self { + inner, + marker: PhantomData, + } + } +} + +impl hyper::service::Service<(Request, Context)> + for DropContextService +where + Context: Send + 'static, + Inner: hyper::service::Service>, +{ + type Response = Inner::Response; + type Error = Inner::Error; + type Future = Inner::Future; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, (req, _): (Request, Context)) -> Self::Future { + self.inner.call(req) + } +} diff --git a/backend/src/connector/dto.rs b/backend/src/connector/dto.rs new file mode 100644 index 00000000..f66d5bed --- /dev/null +++ b/backend/src/connector/dto.rs @@ -0,0 +1,1218 @@ +//! +//! This file was *partly* auto-generated using OpenAPI server generator +//! at +//! on 2023-07-07 +//! using BOB's REST API schema, commit = ade0eadf1db7cda072cfab07dff7b1b57247e34a: +//! +//! +//! This file was modified in order to get rid of the "swagger" crate, which brings +//! a lot of unnecessary dependencies (for example, openssl, which can cause problems +//! when creating docker images, even if we use only the http client). In addition, +//! some refactoring was done to reduce the code size. +//! + +use std::collections::HashMap; + +type StdError = dyn std::error::Error; + +/// Function, used for parsing strings into DTOs +/// Accpets closures, that decides what to do with keys and values +fn parse(s: &str, mut matcher: F) -> Result<(), Box> +where + F: FnMut(&str, &str) -> Result<(), Box>, +{ + let mut string_iter = s.split(','); + let mut key_result = string_iter.next(); + + while key_result.is_some() { + let Some(val) = string_iter.next() else { + return Err("Missing value while parsing".into()); + }; + + if let Some(key) = key_result { + matcher(key, val)?; + } + + // Get the next key + key_result = string_iter.next(); + } + Ok(()) +} + +#[derive(Default, Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Dir { + #[serde(rename = "name")] + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + + #[serde(rename = "path")] + #[serde(skip_serializing_if = "Option::is_none")] + pub path: Option, + + #[serde(rename = "children")] + #[serde(skip_serializing_if = "Option::is_none")] + pub children: Option>, +} + +impl Dir { + #[must_use] + pub const fn new() -> Self { + Self { + name: None, + path: None, + children: None, + } + } +} + +/// Converts the Dir value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Dir { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.name + .as_ref() + .map(|name| ["name".to_string(), name.to_string()].join(",")), + self.path + .as_ref() + .map(|path| ["path".to_string(), path.to_string()].join(",")), + // Skipping children in query parameter serialization + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a Dir value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Dir { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub name: Vec, + pub path: Vec, + pub children: Vec>, + } + let mut intermediate_rep = IntermediateRep::default(); + + // Parse into intermediate representation + parse(s, |key, val| { + match key { + "name" => intermediate_rep + .name + .push(::from_str(val).map_err(|x| x.to_string())?), + "path" => intermediate_rep + .path + .push(::from_str(val).map_err(|x| x.to_string())?), + "children" => { + return Err("Parsing a container in this style is not supported in Dir".into()) + } + _ => return Err("Unexpected key while parsing Dir".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + name: intermediate_rep.name.into_iter().next(), + path: intermediate_rep.path.into_iter().next(), + children: intermediate_rep.children.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct DiskState { + #[serde(rename = "name")] + pub name: String, + + #[serde(rename = "path")] + pub path: String, + + #[serde(rename = "is_active")] + pub is_active: bool, +} + +impl DiskState { + /// Creates a new [`DiskState`]. + #[must_use] + pub const fn new(name: String, path: String, is_active: bool) -> Self { + Self { + name, + path, + is_active, + } + } +} + +/// Converts the [`DiskState`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for DiskState { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("name".to_string()), + Some(self.name.to_string()), + Some("path".to_string()), + Some(self.path.to_string()), + Some("is_active".to_string()), + Some(self.is_active.to_string()), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`DiskState`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for DiskState { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub name: Vec, + pub path: Vec, + pub is_active: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "name" => intermediate_rep + .name + .push(::from_str(val).map_err(|x| x.to_string())?), + "path" => intermediate_rep + .path + .push(::from_str(val).map_err(|x| x.to_string())?), + "is_active" => intermediate_rep + .is_active + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing DiskState".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + name: intermediate_rep + .name + .into_iter() + .next() + .ok_or_else(|| "name missing in DiskState".to_string())?, + path: intermediate_rep + .path + .into_iter() + .next() + .ok_or_else(|| "path missing in DiskState".to_string())?, + is_active: intermediate_rep + .is_active + .into_iter() + .next() + .ok_or_else(|| "is_active missing in DiskState".to_string())?, + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct DistrFunc { + // Note: inline enums are not fully supported by openapi-generator + #[serde(rename = "func")] + #[serde(skip_serializing_if = "Option::is_none")] + pub func: Option, +} + +impl DistrFunc { + #[must_use] + pub const fn new() -> Self { + Self { func: None } + } +} + +/// Converts the [`DistrFunc`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for DistrFunc { + fn to_string(&self) -> String { + let params: Vec> = vec![self + .func + .as_ref() + .map(|func| ["func".to_string(), func.to_string()].join(","))]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`DistrFunc`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for DistrFunc { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub func: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + // Parse into intermediate representation + parse(s, |key, val| { + match key { + "func" => intermediate_rep + .func + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing DistrFunc".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + func: intermediate_rep.func.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Error { + #[serde(rename = "code")] + pub code: String, + + #[serde(rename = "message")] + pub message: String, +} + +impl Error { + #[must_use] + pub const fn new(code: String, message: String) -> Self { + Self { code, message } + } +} + +/// Converts the [`Error`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Error { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("code".to_string()), + Some(self.code.to_string()), + Some("message".to_string()), + Some(self.message.to_string()), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`Error`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Error { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub code: Vec, + pub message: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + parse(s, |key, val| { + match key { + "code" => intermediate_rep + .code + .push(::from_str(val).map_err(|x| x.to_string())?), + "message" => intermediate_rep + .message + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing Error".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + code: intermediate_rep + .code + .into_iter() + .next() + .ok_or_else(|| "code missing in Error".to_string())?, + message: intermediate_rep + .message + .into_iter() + .next() + .ok_or_else(|| "message missing in Error".to_string())?, + }) + } +} + +#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct MetricsEntryModel { + #[serde(rename = "value")] + pub value: u64, + + #[serde(rename = "timestamp")] + pub timestamp: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct MetricsSnapshotModel { + #[serde(rename = "metrics")] + pub metrics: HashMap, +} + +impl PartialEq for MetricsEntryModel { + fn eq(&self, other: &Self) -> bool { + self.value == other.value + } +} + +impl PartialOrd for MetricsEntryModel { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for MetricsEntryModel { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.value.cmp(&other.value) + } +} + +impl Eq for MetricsEntryModel {} +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Node { + #[serde(rename = "name")] + pub name: String, + + #[serde(rename = "address")] + pub address: String, + + #[serde(rename = "vdisks")] + #[serde(skip_serializing_if = "Option::is_none")] + pub vdisks: Option>, +} + +impl Node { + #[must_use] + pub const fn new(name: String, address: String) -> Self { + Self { + name, + address, + vdisks: None, + } + } +} + +/// Converts the [`Node`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Node { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("name".to_string()), + Some(self.name.to_string()), + Some("address".to_string()), + Some(self.address.to_string()), + // Skipping vdisks in query parameter serialization + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`Node`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Node { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub name: Vec, + pub address: Vec, + pub vdisks: Vec>, + } + + let mut intermediate_rep = IntermediateRep::default(); + parse(s, |key, val| { + match key { + "name" => intermediate_rep + .name + .push(::from_str(val).map_err(|x| x.to_string())?), + "address" => intermediate_rep + .address + .push(::from_str(val).map_err(|x| x.to_string())?), + "vdisks" => { + return Err("Parsing a container in this style is not supported in Node".into()) + } + _ => return Err("Unexpected key while parsing Node".into()), + } + Ok(()) + })?; + + Ok(Self { + name: intermediate_rep + .name + .into_iter() + .next() + .ok_or_else(|| "name missing in Node".to_string())?, + address: intermediate_rep + .address + .into_iter() + .next() + .ok_or_else(|| "address missing in Node".to_string())?, + vdisks: intermediate_rep.vdisks.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct NodeConfiguration { + #[serde(rename = "blob_file_name_prefix")] + #[serde(skip_serializing_if = "Option::is_none")] + pub blob_file_name_prefix: Option, + + #[serde(rename = "root_dir_name")] + #[serde(skip_serializing_if = "Option::is_none")] + pub root_dir_name: Option, +} + +impl NodeConfiguration { + #[must_use] + pub const fn new() -> Self { + Self { + blob_file_name_prefix: None, + root_dir_name: None, + } + } +} + +/// Converts the [`NodeConfiguration`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for NodeConfiguration { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.blob_file_name_prefix + .as_ref() + .map(|blob_file_name_prefix| { + [ + "blob_file_name_prefix".to_string(), + blob_file_name_prefix.to_string(), + ] + .join(",") + }), + self.root_dir_name.as_ref().map(|root_dir_name| { + ["root_dir_name".to_string(), root_dir_name.to_string()].join(",") + }), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`NodeConfiguration`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for NodeConfiguration { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub blob_file_name_prefix: Vec, + pub root_dir_name: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "blob_file_name_prefix" => intermediate_rep + .blob_file_name_prefix + .push(::from_str(val).map_err(|x| x.to_string())?), + "root_dir_name" => intermediate_rep + .root_dir_name + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing NodeConfiguration".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + blob_file_name_prefix: intermediate_rep.blob_file_name_prefix.into_iter().next(), + root_dir_name: intermediate_rep.root_dir_name.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Partition { + #[serde(rename = "vdisk_id")] + #[serde(skip_serializing_if = "Option::is_none")] + pub vdisk_id: Option, + + #[serde(rename = "node_name")] + #[serde(skip_serializing_if = "Option::is_none")] + pub node_name: Option, + + #[serde(rename = "disk_name")] + #[serde(skip_serializing_if = "Option::is_none")] + pub disk_name: Option, + + #[serde(rename = "timestamp")] + #[serde(skip_serializing_if = "Option::is_none")] + pub timestamp: Option, + + #[serde(rename = "records_count")] + #[serde(skip_serializing_if = "Option::is_none")] + pub records_count: Option, +} + +impl Partition { + #[must_use] + pub const fn new() -> Self { + Self { + vdisk_id: None, + node_name: None, + disk_name: None, + timestamp: None, + records_count: None, + } + } +} + +/// Converts the [`Partition`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Partition { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.vdisk_id + .as_ref() + .map(|vdisk_id| ["vdisk_id".to_string(), vdisk_id.to_string()].join(",")), + self.node_name + .as_ref() + .map(|node_name| ["node_name".to_string(), node_name.to_string()].join(",")), + self.disk_name + .as_ref() + .map(|disk_name| ["disk_name".to_string(), disk_name.to_string()].join(",")), + self.timestamp + .as_ref() + .map(|timestamp| ["timestamp".to_string(), timestamp.to_string()].join(",")), + self.records_count.as_ref().map(|records_count| { + ["records_count".to_string(), records_count.to_string()].join(",") + }), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`Partition`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Partition { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub vdisk_id: Vec, + pub node_name: Vec, + pub disk_name: Vec, + pub timestamp: Vec, + pub records_count: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "vdisk_id" => intermediate_rep + .vdisk_id + .push(::from_str(val).map_err(|x| x.to_string())?), + "node_name" => intermediate_rep + .node_name + .push(::from_str(val).map_err(|x| x.to_string())?), + "disk_name" => intermediate_rep + .disk_name + .push(::from_str(val).map_err(|x| x.to_string())?), + "timestamp" => intermediate_rep + .timestamp + .push(::from_str(val).map_err(|x| x.to_string())?), + "records_count" => intermediate_rep + .records_count + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing Partition".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + vdisk_id: intermediate_rep.vdisk_id.into_iter().next(), + node_name: intermediate_rep.node_name.into_iter().next(), + disk_name: intermediate_rep.disk_name.into_iter().next(), + timestamp: intermediate_rep.timestamp.into_iter().next(), + records_count: intermediate_rep.records_count.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Replica { + #[serde(rename = "node")] + pub node: String, + + #[serde(rename = "disk")] + pub disk: String, + + #[serde(rename = "path")] + pub path: String, +} + +impl Replica { + #[must_use] + pub const fn new(node: String, disk: String, path: String) -> Self { + Self { node, disk, path } + } +} + +/// Converts the [`Replica`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Replica { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("node".to_string()), + Some(self.node.to_string()), + Some("disk".to_string()), + Some(self.disk.to_string()), + Some("path".to_string()), + Some(self.path.to_string()), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`Replica`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Replica { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub node: Vec, + pub disk: Vec, + pub path: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "node" => intermediate_rep + .node + .push(::from_str(val).map_err(|x| x.to_string())?), + "disk" => intermediate_rep + .disk + .push(::from_str(val).map_err(|x| x.to_string())?), + "path" => intermediate_rep + .path + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing Replica".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + node: intermediate_rep + .node + .into_iter() + .next() + .ok_or_else(|| "node missing in Replica".to_string())?, + disk: intermediate_rep + .disk + .into_iter() + .next() + .ok_or_else(|| "disk missing in Replica".to_string())?, + path: intermediate_rep + .path + .into_iter() + .next() + .ok_or_else(|| "path missing in Replica".to_string())?, + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct SpaceInfo { + #[serde(rename = "total_disk_space_bytes")] + pub total_disk_space_bytes: u64, + + #[serde(rename = "free_disk_space_bytes")] + pub free_disk_space_bytes: u64, + + #[serde(rename = "used_disk_space_bytes")] + pub used_disk_space_bytes: u64, + + #[serde(rename = "occupied_disk_space_bytes")] + pub occupied_disk_space_bytes: u64, + + #[serde(rename = "occupied_disk_space_by_disk")] + pub occupied_disk_space_by_disk: std::collections::HashMap, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct StatusExt { + #[serde(rename = "status")] + #[serde(skip_serializing_if = "Option::is_none")] + pub status: Option, + + #[serde(rename = "ok")] + #[serde(skip_serializing_if = "Option::is_none")] + pub ok: Option, + + #[serde(rename = "msg")] + #[serde(skip_serializing_if = "Option::is_none")] + pub msg: Option, +} + +impl StatusExt { + #[must_use] + pub const fn new() -> Self { + Self { + status: None, + ok: None, + msg: None, + } + } +} + +/// Converts the [`StatusExt`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for StatusExt { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.status + .as_ref() + .map(|status| ["status".to_string(), status.to_string()].join(",")), + self.ok + .as_ref() + .map(|ok| ["ok".to_string(), ok.to_string()].join(",")), + self.msg + .as_ref() + .map(|msg| ["msg".to_string(), msg.to_string()].join(",")), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`StatusExt`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for StatusExt { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub status: Vec, + pub ok: Vec, + pub msg: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "status" => intermediate_rep + .status + .push(::from_str(val).map_err(|x| x.to_string())?), + "ok" => intermediate_rep + .ok + .push(::from_str(val).map_err(|x| x.to_string())?), + "msg" => intermediate_rep + .msg + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing StatusExt".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + status: intermediate_rep.status.into_iter().next(), + ok: intermediate_rep.ok.into_iter().next(), + msg: intermediate_rep.msg.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct VDisk { + #[serde(rename = "id")] + pub id: i32, + + #[serde(rename = "replicas")] + #[serde(skip_serializing_if = "Option::is_none")] + pub replicas: Option>, +} + +impl VDisk { + #[must_use] + pub const fn new(id: i32) -> Self { + Self { id, replicas: None } + } +} + +/// Converts the [`VDisk`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for VDisk { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("id".to_string()), + Some(self.id.to_string()), + // Skipping replicas in query parameter serialization + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`VDisk`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for VDisk { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub id: Vec, + pub replicas: Vec>, + } + let mut intermediate_rep = IntermediateRep::default(); + parse(s, |key, val| { + match key { + "id" => intermediate_rep + .id + .push(::from_str(val).map_err(|x| x.to_string())?), + "replicas" => { + return Err( + "Parsing a container in this style is not supported in VDisk".into(), + ) + } + _ => return Err("Unexpected key while parsing VDisk".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + id: intermediate_rep + .id + .into_iter() + .next() + .ok_or_else(|| "id missing in VDisk".to_string())?, + replicas: intermediate_rep.replicas.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct VDiskPartitions { + #[serde(rename = "vdisk")] + #[serde(skip_serializing_if = "Option::is_none")] + pub vdisk: Option, + + #[serde(rename = "node")] + #[serde(skip_serializing_if = "Option::is_none")] + pub node: Option, + + #[serde(rename = "disk")] + #[serde(skip_serializing_if = "Option::is_none")] + pub disk: Option, + + #[serde(rename = "partitions")] + #[serde(skip_serializing_if = "Option::is_none")] + pub partitions: Option>, +} + +impl VDiskPartitions { + #[must_use] + pub const fn new() -> Self { + Self { + vdisk: None, + node: None, + disk: None, + partitions: None, + } + } +} + +/// Converts the [`VDiskPartitions`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for VDiskPartitions { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.vdisk + .as_ref() + .map(|vdisk| ["vdisk".to_string(), vdisk.to_string()].join(",")), + self.node + .as_ref() + .map(|node| ["node".to_string(), node.to_string()].join(",")), + self.disk + .as_ref() + .map(|disk| ["disk".to_string(), disk.to_string()].join(",")), + self.partitions.as_ref().map(|partitions| { + [ + "partitions".to_string(), + partitions + .iter() + .map(std::string::ToString::to_string) + .collect::>() + .join(","), + ] + .join(",") + }), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`VDiskPartitions`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for VDiskPartitions { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub vdisk: Vec, + pub node: Vec, + pub disk: Vec, + pub partitions: Vec>, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "vdisk" => intermediate_rep + .vdisk + .push(::from_str(val).map_err(|x| x.to_string())?), + "node" => intermediate_rep + .node + .push(::from_str(val).map_err(|x| x.to_string())?), + "disk" => intermediate_rep + .disk + .push(::from_str(val).map_err(|x| x.to_string())?), + "partitions" => { + return Err( + "Parsing a container in this style is not supported in VDiskPartitions" + .into(), + ) + } + _ => return Err("Unexpected key while parsing VDiskPartitions".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + vdisk: intermediate_rep.vdisk.into_iter().next(), + node: intermediate_rep.node.into_iter().next(), + disk: intermediate_rep.disk.into_iter().next(), + partitions: intermediate_rep.partitions.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Version { + #[serde(rename = "version")] + #[serde(skip_serializing_if = "Option::is_none")] + pub version: Option, + + #[serde(rename = "build_time")] + #[serde(skip_serializing_if = "Option::is_none")] + pub build_time: Option, +} + +impl Version { + #[must_use] + pub const fn new() -> Self { + Self { + version: None, + build_time: None, + } + } +} + +/// Converts the [`Version`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Version { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.version + .as_ref() + .map(|version| ["version".to_string(), version.to_string()].join(",")), + self.build_time + .as_ref() + .map(|build_time| ["build_time".to_string(), build_time.to_string()].join(",")), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`Version`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Version { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub version: Vec, + pub build_time: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + parse(s, |key, val| { + match key { + "version" => intermediate_rep + .version + .push(::from_str(val).map_err(|x| x.to_string())?), + "build_time" => intermediate_rep + .build_time + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing Version".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + version: intermediate_rep.version.into_iter().next(), + build_time: intermediate_rep.build_time.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct VersionInfo { + #[serde(rename = "bobversion")] + #[serde(skip_serializing_if = "Option::is_none")] + pub bobversion: Option, + + #[serde(rename = "pearlversion")] + #[serde(skip_serializing_if = "Option::is_none")] + pub pearlversion: Option, +} + +impl VersionInfo { + #[must_use] + pub const fn new() -> Self { + Self { + bobversion: None, + pearlversion: None, + } + } +} + +/// Converts the [`VersionInfo`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for VersionInfo { + fn to_string(&self) -> String { + let params: Vec> = vec![ + // Skipping bobversion in query parameter serialization + + // Skipping pearlversion in query parameter serialization + + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`VersionInfo`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for VersionInfo { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub bobversion: Vec, + pub pearlversion: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + parse(s, |key, val| { + match key { + "bobversion" => intermediate_rep.bobversion.push( + ::from_str(val).map_err(|x| x.to_string())?, + ), + "pearlversion" => intermediate_rep.pearlversion.push( + ::from_str(val).map_err(|x| x.to_string())?, + ), + _ => return Err("Unexpected key while parsing VersionInfo".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + bobversion: intermediate_rep.bobversion.into_iter().next(), + pearlversion: intermediate_rep.pearlversion.into_iter().next(), + }) + } +} diff --git a/backend/src/connector/error.rs b/backend/src/connector/error.rs new file mode 100644 index 00000000..61b5dd35 --- /dev/null +++ b/backend/src/connector/error.rs @@ -0,0 +1,129 @@ +use super::prelude::*; + +impl From for StatusCode { + fn from(value: GetAlienResponse) -> Self { + match value { + GetAlienResponse::AlienNodeName(_) => StatusCode::OK, + } + } +} + +impl From for StatusCode { + fn from(value: GetAlienDirResponse) -> Self { + match value { + GetAlienDirResponse::Directory(_) => StatusCode::OK, + GetAlienDirResponse::PermissionDenied(_) => StatusCode::FORBIDDEN, + GetAlienDirResponse::NotAcceptableBackend(_) => StatusCode::NOT_ACCEPTABLE, + } + } +} + +impl From for StatusCode { + fn from(value: GetDisksResponse) -> Self { + match value { + GetDisksResponse::AJSONArrayWithDisksAndTheirStates(_) => StatusCode::OK, + GetDisksResponse::PermissionDenied(_) => StatusCode::FORBIDDEN, + } + } +} + +impl From for StatusCode { + fn from(value: GetMetricsResponse) -> Self { + match value { + GetMetricsResponse::Metrics(_) => StatusCode::OK, + } + } +} + +impl From for StatusCode { + fn from(value: GetNodesResponse) -> Self { + match value { + GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(_) => StatusCode::OK, + GetNodesResponse::PermissionDenied => StatusCode::FORBIDDEN, + } + } +} + +impl From for StatusCode { + fn from(value: GetPartitionResponse) -> Self { + match value { + GetPartitionResponse::AJSONWithPartitionInfo(_) => StatusCode::OK, + GetPartitionResponse::PermissionDenied(_) => StatusCode::FORBIDDEN, + GetPartitionResponse::NotFound(_) => StatusCode::NOT_FOUND, + } + } +} + +impl From for StatusCode { + fn from(value: GetRecordsResponse) -> Self { + match value { + GetRecordsResponse::RecordsCount(_) => StatusCode::OK, + GetRecordsResponse::PermissionDenied(_) => StatusCode::FORBIDDEN, + GetRecordsResponse::NotFound(_) => StatusCode::NOT_FOUND, + } + } +} + +impl From for StatusCode { + fn from(value: GetReplicasLocalDirsResponse) -> Self { + match value { + GetReplicasLocalDirsResponse::AJSONArrayWithDirs(_) => StatusCode::OK, + GetReplicasLocalDirsResponse::PermissionDenied(_) => StatusCode::FORBIDDEN, + GetReplicasLocalDirsResponse::NotFound(_) => StatusCode::NOT_FOUND, + } + } +} + +impl From for StatusCode { + fn from(value: GetStatusResponse) -> Self { + match value { + GetStatusResponse::AJSONWithNodeInfo(_) => StatusCode::OK, + } + } +} + +impl From for StatusCode { + fn from(value: GetVDiskResponse) -> Self { + match value { + GetVDiskResponse::AJSONWithVdiskInfo(_) => StatusCode::OK, + GetVDiskResponse::PermissionDenied(_) => StatusCode::FORBIDDEN, + GetVDiskResponse::NotFound(_) => StatusCode::NOT_FOUND, + } + } +} + +impl From for StatusCode { + fn from(value: GetVDisksResponse) -> Self { + match value { + GetVDisksResponse::AJSONArrayOfVdisksInfo(_) => StatusCode::OK, + GetVDisksResponse::PermissionDenied => StatusCode::FORBIDDEN, + } + } +} + +impl From for StatusCode { + fn from(value: GetVersionResponse) -> Self { + match value { + GetVersionResponse::VersionInfo(_) => StatusCode::OK, + } + } +} + +impl From for StatusCode { + fn from(value: GetConfigurationResponse) -> Self { + match value { + GetConfigurationResponse::ConfigurationObject(_) => StatusCode::OK, + GetConfigurationResponse::PermissionDenied => StatusCode::FORBIDDEN, + } + } +} + +pub trait AsApiError { + fn as_invalid_status(self) -> APIError; +} + +impl> AsApiError for T { + fn as_invalid_status(self) -> APIError { + APIError::InvalidStatusCode(self.into()) + } +} diff --git a/backend/src/connector/mod.rs b/backend/src/connector/mod.rs index 8b137891..114be2cb 100644 --- a/backend/src/connector/mod.rs +++ b/backend/src/connector/mod.rs @@ -1 +1,309 @@ +mod prelude { + pub use super::api::prelude::*; + pub use super::{ + context::{ContextWrapper, DropContextService, Has}, + ClientError, Connector, + }; + pub use crate::{models::shared::XSpanIdString, prelude::*, services::auth::HttpClient}; + pub use axum::{ + headers::{authorization::Credentials, Authorization, HeaderMapExt}, + http::{HeaderName, HeaderValue}, + }; + pub use futures::StreamExt; + pub use hyper::{service::Service, Response, Uri}; + pub use std::{ + str::FromStr, + sync::Arc, + task::{Context, Poll}, + }; +} +use api::{ApiNoContext, ContextWrapperExt}; +use client::Client; +use context::ClientContext; +use prelude::*; + +use self::error::AsApiError; + +pub mod api; +pub mod client; +pub mod context; +pub mod dto; +pub mod error; + +pub type ApiInterface = dyn ApiNoContext + Send + Sync; + +#[derive(Debug, Error)] +pub enum ClientError { + #[error("couldn't init http client")] + InitClient, + #[error("couldn't probe connection to the node")] + Inaccessible, + #[error("permission denied")] + PermissionDenied, + #[error("no client found for requested resource")] + NoClient, + #[error("can't form hyper request")] + CantFormRequest, + #[error("bad URI")] + BadUri, +} + +/// HTTP Connector constructor +#[derive(Debug)] +pub struct Connector; + +impl Connector { + /// Alows building a HTTP(S) connector. Used for instantiating clients with custom + /// connectors. + #[must_use] + pub const fn builder() -> Builder { + Builder {} + } +} + +/// Builder for HTTP(S) connectors +#[derive(Debug)] +pub struct Builder {} + +impl Builder { + /// [Stub] Use HTTPS instead of HTTP + #[must_use] + pub const fn https(self) -> HttpsBuilder { + HttpsBuilder {} + } + + /// Build a HTTP connector + #[must_use] + pub fn build(self) -> hyper::client::connect::HttpConnector { + hyper::client::connect::HttpConnector::new() + } +} + +// TODO +/// [Stub] Builder for HTTPS connectors +#[derive(Debug)] +pub struct HttpsBuilder {} + +impl HttpsBuilder { + pub fn build(self) { + unimplemented!() + } +} + +#[derive(Clone)] +pub struct BobClient + Send + Sync> { + /// Unique Identifier + id: Uuid, + + /// Bob's hostname + hostname: Hostname, + + // NOTE: Can (and should) the API interface mutate?.. + /// Connection, + main: Arc, + + /// Clients for all known nodes + cluster: HashMap>, +} + +#[allow(clippy::missing_fields_in_debug)] +impl + Send + Sync + Clone> std::fmt::Debug + for BobClient +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let user = &self + .main + .context() + .auth_data + .as_ref() + .map_or("Unknown", |cred| cred.username()); + f.debug_struct("BobClient") + .field("hostname", &self.hostname) + .field("user", &user) + .finish() + } +} + +impl + Send + Sync> BobClient { + /// Creates new [`BobClient`] from [`BobConnectionData`] + /// + /// # Errors + /// The function will fail if a hostname isn't a valid url or the client couldn't establish + /// connection for the BOB cluster + pub async fn try_new( + bob_data: BobConnectionData, + timeout: RequestTimeout, + ) -> Result { + let auth = bob_data + .credentials + .map(|creds| Authorization::basic(&creds.login, &creds.password)); + let hostname = bob_data.hostname.clone(); + + let context: ClientContext = ClientContext { + timeout, + auth_data: auth, + xspan: XSpanIdString::gen(), + }; + let client = Client::try_new_http(&hostname.to_string()) + .change_context(ClientError::InitClient) + .attach_printable(format!("Hostname: {}", hostname.to_string()))?; + let nodes_resp = client + .clone() + .with_context(context.clone()) + .get_nodes() + .await + .change_context(ClientError::Inaccessible) + .attach_printable(format!("Hostname: {}", hostname.to_string()))?; + let api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(nodes) = nodes_resp else { + Err(nodes_resp.as_invalid_status()) + .change_context(ClientError::Inaccessible) + .attach_printable(format!("Hostname: {}", hostname.to_string()))? + }; + + let cluster: HashMap> = nodes + .iter() + .filter_map(|node| HttpClient::from_node(node, &bob_data.hostname, context.clone())) + .collect(); + + Ok(BobClient { + id: Uuid::new_v4(), + hostname: bob_data.hostname, + main: Arc::new(client.with_context(context)), + cluster, + }) + } + + /// Probes connection to the Bob's main connected node + /// + /// Returns `StatusCode::OK` on success + /// + /// # Errors + /// + /// The function fails if there was an error during creation of request + /// It shouldn't happen on normal behaviour + /// + pub async fn probe_main(&self) -> Result { + match self + .main + .get_nodes() + .await + .change_context(ClientError::Inaccessible)? + { + api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(_) => Ok(StatusCode::OK), + api::GetNodesResponse::PermissionDenied => Err(ClientError::PermissionDenied.into()), + } + } + + /// Probes connection to the Bob's main connected node + /// + /// Returns `StatusCode::OK` on success + /// + /// # Errors + /// + /// The function fails if there was an error during creation of request + /// It shouldn't happen on normal behaviour + /// + pub async fn probe_secondary(&self, node_name: &NodeName) -> Result { + match self + .cluster + .get(node_name) + .ok_or(ClientError::NoClient)? + .get_nodes() + .await + .change_context(ClientError::Inaccessible)? + { + api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(_) => Ok(StatusCode::OK), + api::GetNodesResponse::PermissionDenied => Err(ClientError::PermissionDenied.into()), + } + } + + /// Probes connection to all Bob's connected nodes + /// + /// Returns `StatusCode::OK` on success + /// + /// # Errors + /// + /// The function fails if there was an error during creation of request + /// It shouldn't happen on normal behaviour + /// + // pub async fn probe_cluster(&self) -> Vec<(NodeName, StatusCode)> { + pub async fn probe_cluster(&self) -> Vec<(NodeName, StatusCode)> { + let v: Vec<_> = futures::stream::iter(&self.cluster) + .map(|(node_name, node)| async { + ( + node_name.clone(), + match node.get_nodes().await { + Ok(api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(_)) => { + StatusCode::OK + } + Ok(api::GetNodesResponse::PermissionDenied) => StatusCode::UNAUTHORIZED, + Err(_) => StatusCode::NOT_FOUND, + }, + ) + }) + .collect() + .await; + + futures::future::join_all(v).await + } + + #[must_use] + pub fn context(&self) -> &ClientContext { + self.main.context() + } + + #[must_use] + pub fn api_main(&self) -> &ApiInterface { + self.main.as_ref() + } + + pub fn cluster(&self) -> impl Iterator> { + self.cluster.values() + } + + pub fn api_secondary(&self, node_name: &NodeName) -> Option<&ApiInterface> { + self.cluster.get(node_name).map(std::convert::AsRef::as_ref) + } + + #[must_use] + pub const fn cluster_with_addr(&self) -> &HashMap> { + &self.cluster + } + + #[must_use] + pub const fn hostname(&self) -> &Hostname { + &self.hostname + } + + #[must_use] + pub const fn id(&self) -> &Uuid { + &self.id + } +} + +impl HttpClient { + fn from_node( + node: &dto::Node, + hostname: &Hostname, + context: ClientContext, + ) -> Option<(String, Arc)> { + let Some(port) = hostname.port() else { + return None; + }; + let name = &node.name; + let client = Hostname::with_port(&node.address, port).map_or_else( + |_| { + tracing::warn!("couldn't change port for {name}. Client won't be created"); + None + }, + |hostname| Some((name, Client::try_new_http(&hostname.to_string()))), + ); + if let Some((name, Ok(client))) = client { + Some((name.clone(), Arc::new(client.with_context(context)))) + } else { + tracing::warn!("couldn't create client for {name}"); + None + } + } +} diff --git a/backend/src/error.rs b/backend/src/error.rs index 3ac148d7..3d5abaf1 100644 --- a/backend/src/error.rs +++ b/backend/src/error.rs @@ -1,5 +1,3 @@ -#![allow(clippy::module_name_repetitions)] - use axum::response::{IntoResponse, Response}; use hyper::StatusCode; use thiserror::Error; diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 3006b219..d562f11d 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -1,9 +1,15 @@ -#![allow(clippy::multiple_crate_versions, clippy::module_name_repetitions)] +#![allow( + async_fn_in_trait, + clippy::multiple_crate_versions, + clippy::module_name_repetitions +)] #[cfg(all(feature = "swagger", debug_assertions))] use axum::{routing::get, Router}; -use utoipa::OpenApi; +#[allow(unused_imports)] +use prelude::*; + pub mod config; pub mod connector; pub mod error; @@ -11,10 +17,12 @@ pub mod models; pub mod router; pub mod services; -#[derive(OpenApi)] -#[cfg_attr(not(all(feature = "swagger", debug_assertions)), openapi())] +#[cfg_attr(all(feature = "swagger", debug_assertions), derive(OpenApi))] #[cfg_attr(all(feature = "swagger", debug_assertions), openapi( - paths(root), + paths(root, services::auth::login, services::auth::logout), + components( + schemas(models::shared::Credentials, models::shared::Hostname, models::shared::BobConnectionData) + ), tags( (name = "bob", description = "BOB management API") ) @@ -26,6 +34,7 @@ pub struct ApiDoc; #[allow(clippy::unused_async)] #[cfg_attr(all(feature = "swagger", debug_assertions), utoipa::path( get, + context_path = ApiV1::to_path(), path = "/root", responses( (status = 200, description = "Hello Bob!") @@ -35,7 +44,12 @@ pub async fn root() -> &'static str { "Hello Bob!" } /// Generate openapi documentation for the project +/// +/// # Panics +/// +/// Panics if `OpenAPI` couldn't be converted into YAML format #[cfg(all(feature = "swagger", debug_assertions))] +#[allow(clippy::expect_used)] pub fn openapi_doc() -> Router { use utoipa_rapidoc::RapiDoc; use utoipa_redoc::{Redoc, Servable}; @@ -64,11 +78,63 @@ pub fn openapi_doc() -> Router { } pub mod prelude { - #![allow(unused_imports)] - pub use crate::error::AppError; - pub use crate::router::RouteError; - pub use axum::response::Result as AxumResult; + pub use crate::{ + connector::{ + client::Client, + context::{ClientContext, ContextWrapper, DropContextService}, + BobClient, + }, + error::AppError, + models::{ + bob::NodeName, + shared::{BobConnectionData, Hostname, RequestTimeout, XSpanIdString}, + }, + router::{ApiV1, ApiVersion, RouteError, RouterApiExt}, + services::auth::HttpBobClient, + ApiDoc, + }; + pub use axum::{ + async_trait, + headers::authorization::Basic, + response::{IntoResponse, Response, Result as AxumResult}, + Extension, Json, + }; pub use error_stack::{Context, Report, Result, ResultExt}; - // #[cfg(all(feature = "swagger", debug_assertions))] - pub use utoipa::OpenApi; + pub use hyper::{client::HttpConnector, Body, Method, Request, StatusCode}; + pub use serde::{Deserialize, Serialize}; + pub use std::{collections::HashMap, hash::Hash, marker::PhantomData, str::FromStr}; + pub use thiserror::Error; + #[cfg(all(feature = "swagger", debug_assertions))] + pub use utoipa::{IntoParams, OpenApi, ToSchema}; + pub use uuid::Uuid; +} + +pub mod main { + pub mod prelude { + pub use crate::{ + config::{ConfigExt, LoggerExt}, + models::shared::RequestTimeout, + prelude::*, + root, + router::{ApiV1, ApiVersion, NoApi, RouterApiExt}, + services::{ + api_router_v1, + auth::{require_auth, AuthState, BobUser, HttpBobClient, InMemorySessionStore}, + }, + ApiDoc, + }; + pub use axum::{ + error_handling::HandleErrorLayer, middleware::from_fn_with_state, BoxError, Extension, + Router, + }; + pub use cli::Parser; + pub use error_stack::{Result, ResultExt}; + pub use hyper::{Method, StatusCode}; + pub use std::{env, path::PathBuf}; + pub use tower::ServiceBuilder; + pub use tower_http::{cors::CorsLayer, services::ServeDir}; + pub use tower_sessions::{MemoryStore, SessionManagerLayer}; + pub use tracing::Level; + pub use uuid::Uuid; + } } diff --git a/backend/src/main.rs b/backend/src/main.rs index 37b3376e..97a908f7 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -4,21 +4,7 @@ clippy::expect_used )] -use axum::Router; -use bob_management::{ - config::{ConfigExt, LoggerExt}, - prelude::*, - root, - router::{ApiV1, ApiVersion, NoApi, RouterApiExt}, - services::api_router_v1, - ApiDoc, -}; -use cli::Parser; -use error_stack::{Result, ResultExt}; -use hyper::Method; -use std::env; -use tower::ServiceBuilder; -use tower_http::{cors::CorsLayer, services::ServeDir}; +use bob_management::main::prelude::*; const FRONTEND_FOLDER: &str = "frontend"; @@ -41,7 +27,9 @@ async fn main() -> Result<(), AppError> { let app = router(cors); #[cfg(all(feature = "swagger", debug_assertions))] - let app = app.merge(bob_management::openapi_doc()); + let app = app + .merge(bob_management::openapi_doc()) + .layer(Extension(RequestTimeout::from(config.request_timeout))); axum::Server::bind(&addr) .serve(app.into_make_service()) @@ -54,6 +42,20 @@ async fn main() -> Result<(), AppError> { #[allow(clippy::unwrap_used, clippy::expect_used)] fn router(cors: CorsLayer) -> Router { + let session_store = MemoryStore::default(); + let session_service = ServiceBuilder::new() + .layer(HandleErrorLayer::new(|err: BoxError| async move { + tracing::error!(err); + StatusCode::BAD_REQUEST + })) + .layer( + SessionManagerLayer::new(session_store) + .with_expiry(tower_sessions::Expiry::OnSessionEnd), + ); + + let user_store: InMemorySessionStore = InMemorySessionStore::default(); + let auth_state = AuthState::new(user_store); + let mut frontend = env::current_exe().expect("Couldn't get current executable path."); frontend.pop(); frontend.push(FRONTEND_FOLDER); @@ -62,28 +64,26 @@ fn router(cors: CorsLayer) -> Router { // Frontend .nest_service("/", ServeDir::new(frontend)); - // Add API - let router = router - .with_context::() - .api_route("/root", &Method::GET, root) - .unwrap() - .expect("Couldn't register new API route"); - router .nest( ApiV1::to_path(), - api_router_v1().expect("couldn't get API routes"), + api_router_v1(auth_state.clone()) + .expect("couldn't get API routes") + .layer(ServiceBuilder::new().layer(cors)), ) - .layer(ServiceBuilder::new().layer(cors)) + .layer(session_service) + .with_state(auth_state) } #[cfg(test)] mod tests { #![allow(clippy::expect_used)] + use bob_management::main::prelude::*; use bob_management::services::api_router_v1; - #[test] fn register_routes() { - let _ = api_router_v1().expect("Router has invalid API methods"); + let user_store: InMemorySessionStore = InMemorySessionStore::default(); + let auth_state = AuthState::new(user_store); + let _ = api_router_v1(auth_state).expect("Router has invalid API methods"); } } diff --git a/backend/src/models/api.rs b/backend/src/models/api.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/backend/src/models/api.rs @@ -0,0 +1 @@ + diff --git a/backend/src/models/bob.rs b/backend/src/models/bob.rs new file mode 100644 index 00000000..3a4109f9 --- /dev/null +++ b/backend/src/models/bob.rs @@ -0,0 +1,4 @@ +pub type NodeName = String; +pub type DiskName = String; +pub type NodeAddress = String; +pub type IsActive = bool; diff --git a/backend/src/models/mod.rs b/backend/src/models/mod.rs index 8b137891..39107f58 100644 --- a/backend/src/models/mod.rs +++ b/backend/src/models/mod.rs @@ -1 +1,9 @@ +pub mod api; +pub mod bob; +pub mod shared; +pub mod prelude { + pub use crate::prelude::*; + pub use hyper::Uri; + pub use std::{net::SocketAddr, time::Duration}; +} diff --git a/backend/src/models/shared.rs b/backend/src/models/shared.rs new file mode 100644 index 00000000..1d5d6d27 --- /dev/null +++ b/backend/src/models/shared.rs @@ -0,0 +1,148 @@ +use super::prelude::*; +use std::result::Result; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] +pub struct Hostname( + #[serde( + deserialize_with = "hyper_serde::deserialize", + serialize_with = "hyper_serde::serialize" + )] + Uri, +); + +#[derive(Debug, Error)] +pub enum HostnameError { + #[error("bad address: no port")] + NoPort, + #[error("bad address: couldn't parse hostname")] + BadAddress, +} + +impl Hostname { + #[must_use] + pub fn port(&self) -> Option { + self.0.port_u16() + } + + /// Creates [`Hostname`] from string with specified port + /// + /// # Errors + /// + /// This function will return an error if address doesn't have a port or a port is invalid + pub fn with_port(address: &str, port: u16) -> error_stack::Result { + let (body, _) = address.rsplit_once(':').ok_or(HostnameError::NoPort)?; + let mut body = body.to_string(); + body.push_str(&format!(":{port}")); + + Ok(Self( + hyper::http::Uri::from_str(&body).change_context(HostnameError::BadAddress)?, + )) + } +} + +impl TryFrom for Hostname { + type Error = >::Error; + + fn try_from(value: SocketAddr) -> Result { + Ok(Self(Uri::try_from(value.to_string())?)) + } +} + +impl TryFrom for SocketAddr { + type Error = std::net::AddrParseError; + + fn try_from(value: Hostname) -> Result { + value.to_string().parse() + } +} + +impl ToString for Hostname { + fn to_string(&self) -> String { + self.0.to_string() + } +} + +/// Data needed to connect to a BOB cluster +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[cfg_attr( + all(feature = "swagger", debug_assertions), + derive(IntoParams, ToSchema) +)] +#[cfg_attr(all(feature = "swagger", debug_assertions), + schema(example = json!({"hostname": "0.0.0.0:7000", "credentials": {"login": "archeoss", "password": "12345"}})))] +pub struct BobConnectionData { + /// Address to connect to + pub hostname: Hostname, + + /// [Optional] Credentials used for BOB authentication + #[serde(skip_serializing_if = "Option::is_none")] + pub credentials: Option, +} + +/// Optional auth credentials for a BOB cluster +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize)] +#[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] +#[cfg_attr(all(feature = "swagger", debug_assertions), schema(example = json!({"login": "archeoss", "password": "12345"})))] +pub struct Credentials { + /// Login used during auth + pub login: String, + + /// Password used during auth + pub password: String, +} + +#[allow(clippy::missing_fields_in_debug)] +impl std::fmt::Debug for Credentials { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Credentials") + .field("login", &self.login) + .finish() + } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct RequestTimeout(Duration); + +impl RequestTimeout { + #[must_use] + pub const fn from_millis(millis: u64) -> Self { + Self(Duration::from_millis(millis)) + } + + #[must_use] + pub const fn into_inner(self) -> Duration { + self.0 + } +} + +impl From for RequestTimeout { + fn from(value: Duration) -> Self { + Self(value) + } +} + +/// Header - `X-Span-ID` - used to track a request through a chain of microservices. +pub const X_SPAN_ID: &str = "X-Span-ID"; + +/// Wrapper for a string being used as an X-Span-ID. +#[derive(Debug, Clone)] +pub struct XSpanIdString(pub String); + +impl XSpanIdString { + /// Extract an X-Span-ID from a request header if present, and if not + /// generate a new one. + pub fn get_or_generate(req: &hyper::Request) -> Self { + let x_span_id = req.headers().get(X_SPAN_ID); + + x_span_id + .and_then(|x| x.to_str().ok()) + .map(|x| Self(x.to_string())) + .unwrap_or_else(Self::gen) + } + + /// Generate Random `X-Span-ID` string. + pub fn gen() -> Self { + Self(uuid::Uuid::new_v4().to_string()) + } +} diff --git a/backend/src/router.rs b/backend/src/router.rs index d6befe7a..de9d964e 100644 --- a/backend/src/router.rs +++ b/backend/src/router.rs @@ -2,15 +2,14 @@ use crate::prelude::*; use axum::body::HttpBody; use axum::routing::on; use axum::{handler::Handler, routing::MethodFilter, Router}; -use hyper::{Body, Method}; use std::convert::Infallible; -use std::marker::PhantomData; use std::ops::Deref; use thiserror::Error; -use utoipa::OpenApi; #[cfg(all(feature = "swagger", debug_assertions))] use utoipa::openapi::PathItemType; +#[cfg(all(feature = "swagger", debug_assertions))] +use utoipa::OpenApi; #[derive(Clone, Debug, Error, PartialEq, Eq, PartialOrd, Ord)] pub enum RouteError { @@ -28,13 +27,16 @@ pub enum RouteError { InvalidHandler, } +/// Empty Api Context pub struct NoApi; impl<'a> ApiVersion<'a> for NoApi {} +/// Api Context (ver. 1) pub struct ApiV1; pub trait ApiVersion<'a> { + /// Provides API context path #[must_use] fn to_path() -> &'a str { "" @@ -42,6 +44,7 @@ pub trait ApiVersion<'a> { } impl<'a> ApiVersion<'a> for ApiV1 { + #[must_use] fn to_path() -> &'a str { "/api/v1" } @@ -56,7 +59,6 @@ pub struct ContextRouter { impl<'a, Version, Doc, S, B> ContextRouter where Version: ApiVersion<'a>, - Doc: OpenApi, B: HttpBody + Send + 'static, S: Clone + Send + Sync + 'static, { @@ -90,39 +92,82 @@ where } /// Add API Route to the `Router` - /// #[must_use] - pub fn api_route(mut self, path: &str, method: &Method, handler: H) -> Self + #[cfg(not(all(feature = "swagger", debug_assertions)))] + pub fn api_route(self, path: &str, method: &Method, handler: H) -> Self where H: Handler, T: 'static, S: Clone + Send + Sync + 'static, B: HttpBody + Send + 'static, { - #[cfg(all(feature = "swagger", debug_assertions))] - match try_convert_path_item_type_from_method(method) - .map(|path_item_type| check_api::<_, _, _, H, Version, Doc>(path, &path_item_type)) - { - Ok(Ok(())) => (), - Ok(Err(err)) | Err(err) => { + self.apply_handler::(path, method, handler) + } + + fn apply_handler(mut self, path: &str, method: &Method, handler: H) -> Self + where + H: Handler, + T: 'static, + S: Clone + Send + Sync + 'static, + B: HttpBody + Send + 'static, + { + match try_convert_method_filter_from_method(method) { + Ok(method) => self.inner = self.inner.route(path, on(method, handler)), + Err(err) => { if let Some(errors) = &mut self.api_errors { errors.extend_one(err); } else { self.api_errors = Some(err); } } - } + }; - match try_convert_method_filter_from_method(method) { - Ok(method) => self.inner = self.inner.route(path, on(method, handler)), - Err(err) => { + self + } +} + +#[cfg(all(feature = "swagger", debug_assertions))] +impl<'a, Version, Doc, S, B> ContextRouter +where + Version: ApiVersion<'a>, + Doc: OpenApi, + B: HttpBody + Send + 'static, + S: Clone + Send + Sync + 'static, +{ + /// Add API Route to the `Router` + #[must_use] + pub fn api_route(self, path: &str, method: &Method, handler: H) -> Self + where + H: Handler, + T: 'static, + S: Clone + Send + Sync + 'static, + B: HttpBody + Send + 'static, + { + // #[cfg(all(feature = "swagger", debug_assertions))] + self.check_api::(path, method) + .apply_handler::(path, method, handler) + } + + fn check_api(mut self, path: &str, method: &Method) -> Self + where + H: Handler, + T: 'static, + S: Clone + Send + Sync + 'static, + B: HttpBody + Send + 'static, + Doc: OpenApi, + { + match try_convert_path_item_type_from_method(method) + .map(|path_item_type| check_api::<_, _, _, H, Version, Doc>(path, &path_item_type)) + { + Ok(Ok(())) => (), + Ok(Err(err)) | Err(err) => { if let Some(errors) = &mut self.api_errors { errors.extend_one(err); } else { self.api_errors = Some(err); } } - }; + } self } @@ -136,22 +181,38 @@ impl Deref for ContextRouter { } } +#[cfg(all(feature = "swagger", debug_assertions))] +pub trait RouterApiExt { + /// Wraps `Router` with `ApiVersion` and `OpenApi` instances into the new context to call + /// `api_route` with said context + fn with_context<'a, Version: ApiVersion<'a>, Doc>(self) -> ContextRouter; +} + +#[cfg(not(all(feature = "swagger", debug_assertions)))] pub trait RouterApiExt { /// Wraps `Router` with `ApiVersion` and `OpenApi` instances into the new context to call /// `api_route` with said context - fn with_context<'a, Version: ApiVersion<'a>, Doc: OpenApi>( - self, - ) -> ContextRouter; + fn with_context<'a, Version: ApiVersion<'a>, Doc>(self) -> ContextRouter; +} + +#[cfg(all(feature = "swagger", debug_assertions))] +impl RouterApiExt for Router +where + B: HttpBody + Send + 'static, + S: Clone + Send + Sync + 'static, +{ + fn with_context<'a, Version: ApiVersion<'a>, Doc>(self) -> ContextRouter { + ContextRouter::new(self) + } } +#[cfg(not(all(feature = "swagger", debug_assertions)))] impl RouterApiExt for Router where B: HttpBody + Send + 'static, S: Clone + Send + Sync + 'static, { - fn with_context<'a, Version: ApiVersion<'a>, Doc: OpenApi>( - self, - ) -> ContextRouter { + fn with_context<'a, Version: ApiVersion<'a>, Doc>(self) -> ContextRouter { ContextRouter::new(self) } } diff --git a/backend/src/services/auth.rs b/backend/src/services/auth.rs new file mode 100644 index 00000000..772ffb71 --- /dev/null +++ b/backend/src/services/auth.rs @@ -0,0 +1,355 @@ +use super::prelude::*; + +pub type HttpClient = ContextWrapper< + Client< + DropContextService, ClientContext>, + ClientContext, + Basic, + >, + ClientContext, +>; +pub type HttpBobClient = BobClient; + +#[derive(Debug, Error)] +pub enum AuthError { + #[error("Couldn't load user")] + LoadError, + #[error("Couldn't lock user store")] + PoisonError, + #[error("Auth layer failed to deserialize data")] + DeserializeError, + #[error("Couldn't login user")] + LoginError, + #[error("Couldn't logout user")] + LogoutError, + #[error("Couldn't update session")] + SessionError, +} + +/// Optional credentials for a BOB cluster +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Deserialize)] +pub struct Credentials { + pub login: String, + pub password: String, +} + +/// Login to a BOB cluster +/// +/// # Errors +/// This function can return the following errors +/// +/// 1. [`StatusCode::BAD_REQUEST`] +/// The function failed to parse hostname of the request +/// +/// 2. [`StatusCode::NOT_FOUND`] +/// The client was unable to reach the host +/// +/// 3. [`StatusCode::UNAUTHORIZED`] +/// The client couldn't authorize on the host +/// +#[cfg_attr(all(feature = "swagger", debug_assertions), + utoipa::path( + post, + context_path = ApiV1::to_path(), + path = "/login", + params( + BobConnectionData + ), + responses( + (status = 200, description = "Successful authorization"), + (status = 400, description = "Bad Hostname"), + (status = 401, description = "Bad Credentials"), + (status = 404, description = "Can't reach specified hostname") + ) +))] +#[tracing::instrument(ret, skip(auth), level = "info", fields(method = "POST"))] +pub async fn login( + mut auth: BobAuth, + Extension(request_timeout): Extension, + Json(bob): Json, +) -> AxumResult { + let bob_client = BobClient::::try_new(bob.clone(), request_timeout) + .await + .map_err(|err| { + tracing::error!("{err:?}"); + match err.current_context() { + ClientError::InitClient => StatusCode::BAD_REQUEST, + ClientError::Inaccessible => StatusCode::NOT_FOUND, + ClientError::PermissionDenied => StatusCode::UNAUTHORIZED, + _ => StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + let res = match bob_client.probe_main().await { + Ok(res) => res, + Err(err) => { + tracing::error!("{err:?}"); + return Err(StatusCode::UNAUTHORIZED.into()); + } + }; + + if res == StatusCode::OK { + auth.user_store + .save( + *bob_client.id(), + BobUser { + login: if let Some(creds) = bob.credentials { + creds.login + } else { + "Unknown".to_string() + }, + }, + ) + .await + .map_err(|err| { + tracing::error!("{err:?}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + auth.login(bob_client.id()).await.map_err(|err| { + tracing::error!("{err:?}"); + StatusCode::UNAUTHORIZED + })?; + auth.client_store.insert(*bob_client.id(), bob_client); + } + + Ok(res) +} + +#[async_trait] +pub trait Store { + type Error: std::error::Error + Context; + + /// Load `Value` from abstract store + /// + /// # Errors + /// + /// This function will return an error if a `Value` couldn't be loaded + async fn load(&self, user_id: &Id) -> Result, Self::Error>; + + /// Save `Value` into abstract store + /// Returns old data, if there was any + /// + /// # Errors + /// + /// This function will return an error if a `Value` couldn't be saved + async fn save(&mut self, user_id: Id, value: Value) -> Result, Self::Error>; +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct AuthData { + user: Option, + user_id: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct BobUser { + login: String, +} + +#[derive(Debug, Clone)] +pub struct AuthState { + user_store: UserStore, + client_store: HashMap, + _user: PhantomData, +} + +impl AuthState { + pub fn new(user_store: UserStore) -> Self { + Self { + user_store, + _user: PhantomData, + client_store: HashMap::new(), + } + } +} + +#[derive(Debug, Clone)] +pub struct AuthStore +where + SessionStore: Store, +{ + session: Session, + auth_data: AuthData, + user_store: SessionStore, + client_store: HashMap, +} + +impl AuthStore +where + User: Clone + Serialize + for<'a> Deserialize<'a> + Sync + Send, + Id: Clone + Serialize + for<'a> Deserialize<'a> + Sync + Send, + SessionStore: Store + Sync + Send, + Client: Send, +{ + async fn login(&mut self, user_id: &Id) -> Result<(), AuthError> { + if let Some(user) = self + .user_store + .load(user_id) + .await + .change_context(AuthError::LoginError)? + { + self.auth_data.user = Some(user); + self.auth_data.user_id = Some(user_id.clone()); + } + + self.update_session().change_context(AuthError::LoginError) + } + + fn logout(&mut self) -> Result<(), AuthError> { + self.auth_data = AuthData { + user: None, + user_id: None, + }; + + self.update_session().change_context(AuthError::LogoutError) + } + + const fn user(&self) -> Option<&User> { + self.auth_data.user.as_ref() + } +} + +impl AuthStore +where + User: Clone + Serialize + for<'a> Deserialize<'a>, + Id: Clone + Serialize + for<'a> Deserialize<'a>, + SessionStore: Store, +{ + const AUTH_DATA_KEY: &'static str = "_auth_data"; + /// Update session of this [`AuthStore`]. + /// + /// # Errors + /// + /// This function will return an error if `auth_data` couldn't be serialized. + fn update_session(&self) -> Result<(), AuthError> { + self.session + .insert(Self::AUTH_DATA_KEY, self.auth_data.clone()) + .change_context(AuthError::SessionError) + } +} + +// NOTE: async_trait is used in `FromRequestParts` declaration, so we still need to use it here +#[async_trait] +impl FromRequestParts for AuthStore +where + S: Send + Sync, + User: Serialize + for<'a> Deserialize<'a> + Clone + Send, + Id: Serialize + for<'a> Deserialize<'a> + Clone + Send + Sync, + UserStore: Store + Send + Sync, + AuthState: FromRef, + Client: Send, +{ + type Rejection = (StatusCode, &'static str); + + async fn from_request_parts( + parts: &mut Parts, + state: &S, + ) -> std::result::Result { + let session = Session::from_request_parts(parts, state).await?; + + let mut auth_data: AuthData = session + .get(Self::AUTH_DATA_KEY) + .map_err(|_| { + ( + StatusCode::BAD_REQUEST, + "Auth layer failed to deserialize data", + ) + })? + .unwrap_or(AuthData { + user: None, + user_id: None, + }); + + let AuthState { + user_store, + client_store, + .. + } = AuthState::from_ref(state); + + // Poll store to refresh current user. + if let Some(ref user_id) = auth_data.user_id { + match user_store.load(user_id).await { + Ok(user) => auth_data.user = user, + + Err(_) => return Err((StatusCode::BAD_REQUEST, "Could not load from user store")), + } + }; + + Ok(Self { + session, + auth_data, + user_store, + client_store, + }) + } +} + +/// Handles protected routes +/// +/// # Errors +/// +/// This function will return an error if a protected route was called from unauthorized context +pub async fn require_auth( + auth: AuthStore, + request: Request, + next: Next, +) -> std::result::Result +where + User: Serialize + for<'a> Deserialize<'a> + Clone + Send + Sync, + Id: Serialize + for<'a> Deserialize<'a> + Clone + Send + Sync, + UserStore: Store + Send + Sync, + Client: Send, + Body: Send + Sync, +{ + if auth.user().is_some() { + let response = next.run(request).await; + Ok(response) + } else { + Err(StatusCode::UNAUTHORIZED) + } +} + +#[derive(Debug, Clone)] +pub struct InMemorySessionStore(Arc>>); + +impl Default for InMemorySessionStore { + fn default() -> Self { + Self(Arc::new(Mutex::new(HashMap::default()))) + } +} + +#[async_trait] +impl Store for InMemorySessionStore +where + Item: Send + Sync + Clone, + ItemId: Sync + Hash + Eq + Send, +{ + type Error = AuthError; + + async fn load(&self, item_id: &ItemId) -> Result, Self::Error> { + Ok(self.0.lock().await.get(item_id).cloned()) + } + + async fn save(&mut self, user_id: ItemId, value: Item) -> Result, Self::Error> { + Ok(self.0.lock().await.insert(user_id, value)) + } +} + +pub type BobAuth = AuthStore>; + +#[cfg_attr(feature = "swagger", utoipa::path( + post, + context_path = ApiV1::to_path(), + path = "/logout", + responses( + (status = 200, description = "Logged out") + ) + ))] +#[tracing::instrument(ret, skip(auth), level = "info", fields(method = "POST"))] +pub async fn logout(mut auth: BobAuth) -> impl IntoResponse { + tracing::info!("post /logout : {:?}", &auth.auth_data); + auth.logout().map_or_else( + |_| StatusCode::OK.into_response(), + axum::response::IntoResponse::into_response, + ) +} diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index 4f33c54a..a22cd7c7 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -1,15 +1,42 @@ -use crate::prelude::*; -use axum::{ - response::{IntoResponse, Response}, - Router, -}; -use hyper::{Body, StatusCode}; -use thiserror::Error; +mod prelude { + pub use crate::connector::ClientError; + pub use crate::prelude::*; + pub use axum::middleware::from_fn_with_state; + pub use axum::{ + extract::{FromRef, FromRequestParts}, + http::request::Parts, + middleware::Next, + Router, + }; + pub use std::sync::Arc; + pub use tokio::sync::Mutex; + pub use tower_sessions::Session; +} + +pub mod auth; + +use crate::root; +use auth::{login, logout, require_auth, AuthState, BobUser, HttpBobClient, InMemorySessionStore}; +use prelude::*; + +type BobAuthState = AuthState, HttpBobClient>; -/// Export all secured routes +/// Export all secured API routes +/// +/// # Errors +/// +/// This function will return an error if one of the routes couldn't be registred #[allow(dead_code)] -pub fn api_router_v1() -> Result, RouteError> { - Ok(Router::new()) +pub fn api_router_v1(auth_state: BobAuthState) -> Result, RouteError> { + Router::new() + .with_context::() + .api_route("/root", &Method::GET, root) + .unwrap()? + .route_layer(from_fn_with_state(auth_state, require_auth)) + .with_context::() + .api_route("/logout", &Method::POST, logout) + .api_route("/login", &Method::POST, login) + .unwrap() } /// Errors that happend during API request proccessing diff --git a/docker-compose.yaml b/docker-compose.yaml index 17fa10a1..839f0ec9 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -9,7 +9,7 @@ services: - "./config.yaml:/config.yaml" networks: bobnet: - ipv4_address: 192.168.17.11 + ipv4_address: 192.168.18.11 ports: - "9000:9000" command: "--config-file /config.yaml" @@ -18,4 +18,4 @@ networks: driver: bridge ipam: config: - - subnet: 192.168.17.0/24 + - subnet: 192.168.18.0/24 diff --git a/dockerfiles/alpine/Dockerfile b/dockerfiles/alpine/Dockerfile index 940bec29..959d5d81 100644 --- a/dockerfiles/alpine/Dockerfile +++ b/dockerfiles/alpine/Dockerfile @@ -26,12 +26,14 @@ RUN echo "$(case "$BUILD_PROFILE" in\ (*) echo "$BUILD_PROFILE";;\ esac)" >> ./build_profile_dir -RUN mkdir -p backend/src frontend cli/src utils/src +RUN mkdir -p backend/src frontend cli/src utils/src proc_macro/src RUN mkdir target COPY Cargo.toml Cargo.toml COPY cli/Cargo.toml cli/Cargo.toml COPY backend/Cargo.toml backend/Cargo.toml COPY utils/Cargo.toml utils/Cargo.toml +COPY proc_macro/Cargo.toml proc_macro/Cargo.toml +COPY proc_macro/src/lib.rs proc_macro/src/lib.rs COPY frontend/Cargo.toml frontend/Cargo.toml COPY .cargo .cargo RUN echo "// if you see this, the build broke" > backend/src/lib.rs \ diff --git a/proc_macro/.gitignore b/proc_macro/.gitignore new file mode 100644 index 00000000..69369904 --- /dev/null +++ b/proc_macro/.gitignore @@ -0,0 +1,3 @@ +/target +**/*.rs.bk +Cargo.lock diff --git a/proc_macro/Cargo.toml b/proc_macro/Cargo.toml new file mode 100644 index 00000000..55098f53 --- /dev/null +++ b/proc_macro/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "proc-macro" +description = "Bob Management GUI: proc-macro" +publish = false +keywords = [ "BOB", "Management", "GUI" ] +version.workspace = true +authors.workspace = true +license-file.workspace = true +edition.workspace = true +readme.workspace = true +repository.workspace = true + +[lib] +proc-macro = true + +[dependencies] +quote = "1" +proc-macro2 = "1.0" +syn = "2.0" + diff --git a/proc_macro/src/lib.rs b/proc_macro/src/lib.rs new file mode 100644 index 00000000..600257ff --- /dev/null +++ b/proc_macro/src/lib.rs @@ -0,0 +1,36 @@ +use proc_macro::TokenStream; + +use quote::quote; +use syn::{DeriveInput, FieldsNamed}; + +#[proc_macro_derive(Context, attributes(has))] +pub fn derive_context_attr(item: TokenStream) -> TokenStream { + let ast: DeriveInput = syn::parse(item).unwrap(); + let name = &ast.ident; + let mut func_stream = TokenStream::default(); + + if let syn::Data::Struct(s) = ast.data { + if let syn::Fields::Named(FieldsNamed { named, .. }) = s.fields { + let fields = named.iter().map(|f| &f.ident); + let ftypes = named.iter().map(|f| &f.ty); + + for (field, ftype) in fields.into_iter().zip(ftypes.into_iter()) { + func_stream.extend::( + quote! { + impl Has<#ftype> for #name { + fn get(&self) -> &#ftype { + &self.#field + } + fn get_mut(&mut self) -> &mut #ftype { + &mut self.#field + } + } + } + .into(), + ); + } + } + }; + + func_stream +}