From 84b283217c8d58e9ef66523131bbaca696fa9278 Mon Sep 17 00:00:00 2001 From: yixiaojiu Date: Tue, 22 Oct 2024 22:40:59 +0800 Subject: [PATCH] feat: :sparkles: Improve server status management --- Cargo.lock | 39 +++++++++++++++ kimika-server/src/data.rs | 17 +++++-- kimika-server/src/main.rs | 2 +- kimika-server/src/service/get_metadata.rs | 13 +---- kimika-server/src/service/mod.rs | 30 ++++++++++++ kimika-server/src/service/post_download.rs | 6 ++- kimika-server/src/service/post_metadata.rs | 46 +++++++++--------- kimika-server/src/service/post_register.rs | 33 ++++++++----- .../src/service/post_select_metadata.rs | 39 ++++++--------- kimika/Cargo.toml | 1 + kimika/src/receive/mod.rs | 1 - kimika/src/receive/remote.rs | 8 +++- kimika/src/receive/remote_bark.rs | 48 ------------------- kimika/src/request/remote.rs | 15 ++---- kimika/src/send/remote.rs | 2 +- kimika/src/utils/handle.rs | 8 ++++ 16 files changed, 169 insertions(+), 139 deletions(-) delete mode 100644 kimika/src/receive/remote_bark.rs diff --git a/Cargo.lock b/Cargo.lock index da56605..d5fc6e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -278,6 +278,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "chrono" version = "0.4.38" @@ -1005,6 +1011,7 @@ dependencies = [ "indicatif", "kimika_grpc", "kimika_shared", + "mac_address", "reqwest", "serde", "tokio", @@ -1095,6 +1102,16 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "mac_address" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8836fae9d0d4be2c8b4efcdd79e828a2faa058a90d005abf42f91cac5493a08e" +dependencies = [ + "nix", + "winapi", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1107,6 +1124,15 @@ version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1178,6 +1204,19 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" +dependencies = [ + "bitflags 2.5.0", + "cfg-if", + "cfg_aliases", + "libc", + "memoffset", +] + [[package]] name = "num-traits" version = "0.2.19" diff --git a/kimika-server/src/data.rs b/kimika-server/src/data.rs index 4ba944d..6b74c21 100644 --- a/kimika-server/src/data.rs +++ b/kimika-server/src/data.rs @@ -3,7 +3,7 @@ use crate::utils::types; use hyper::Response; use serde::Serialize; use std::time; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; pub struct DataSender { pub req_body: hyper::body::Incoming, @@ -17,10 +17,9 @@ pub struct DataReceiver { pub struct Transfer { pub sender: Option, pub receiver: Option, - pub created: time::Instant, } -#[derive(Clone, Serialize)] +#[derive(Clone, Debug, Serialize)] pub struct MetadataItem { pub id: String, pub token: String, @@ -33,23 +32,32 @@ pub struct MetadataItem { pub completed: bool, } +#[derive(Debug)] pub struct Sender { pub alias: String, pub id: String, } +#[derive(Debug)] pub struct Metadata { /// sender alias pub sender: Sender, pub receiver_id: String, pub metadata_list: Vec, - pub selected_metadata_tx: mpsc::Sender>, + pub selected_metadata_tx: Option>>, pub created: time::Instant, } +#[derive(Debug)] pub struct Receiver { pub id: String, pub alias: String, + + /// Unique identifier from client, such as: mac address + /// + /// Preventing duplicate insertion + pub identifier: Option, + pub created: time::Instant, } @@ -58,7 +66,6 @@ impl Transfer { Self { sender: None, receiver: None, - created: time::Instant::now(), } } } diff --git a/kimika-server/src/main.rs b/kimika-server/src/main.rs index c1f6ae7..7752946 100644 --- a/kimika-server/src/main.rs +++ b/kimika-server/src/main.rs @@ -33,7 +33,7 @@ async fn main() -> Result<(), Box> { let server_clone = server.clone(); tokio::spawn(async move { - const DURATION_TIME: u64 = 60 * 60; + const DURATION_TIME: u64 = 2 * 60; loop { time::sleep(time::Duration::from_secs(DURATION_TIME)).await; diff --git a/kimika-server/src/service/get_metadata.rs b/kimika-server/src/service/get_metadata.rs index f4fe3ee..d4c3f29 100644 --- a/kimika-server/src/service/get_metadata.rs +++ b/kimika-server/src/service/get_metadata.rs @@ -26,7 +26,6 @@ pub struct MetadataItem { #[derive(Serialize)] struct ResponseBody { metadatas: Vec, - message: String, } impl Server { @@ -52,21 +51,13 @@ impl Server { }) .collect(); let body = hyper_utils::full(Bytes::from( - serde_json::to_string(&ResponseBody { - metadatas, - message: String::from("ok"), - }) - .unwrap(), + serde_json::to_string(&ResponseBody { metadatas }).unwrap(), )); let res = Response::new(body); Ok(res) } else { let body = hyper_utils::full(Bytes::from( - serde_json::to_string(&ResponseBody { - metadatas: vec![], - message: String::from("cannot find metadata from id"), - }) - .unwrap(), + serde_json::to_string(&ResponseBody { metadatas: vec![] }).unwrap(), )); let mut res = Response::new(body); *res.status_mut() = hyper::StatusCode::BAD_REQUEST; diff --git a/kimika-server/src/service/mod.rs b/kimika-server/src/service/mod.rs index f0e6a27..bdba84a 100644 --- a/kimika-server/src/service/mod.rs +++ b/kimika-server/src/service/mod.rs @@ -25,6 +25,8 @@ pub struct Server { transfer: Arc>>, } +const EXPIRE_TIME: u64 = 60 * 10; + impl Clone for Server { fn clone(&self) -> Self { Self { @@ -75,6 +77,34 @@ impl Server { } pub async fn clear_state(self) { + async_info!("clear state start"); + + let mut clear_receiver_count = 0; + let receiver_guard = self.receiver.lock().await; + receiver_guard.retain(|_, v| { + let result = v.created.elapsed().as_secs() < EXPIRE_TIME; + if !result { + clear_receiver_count += 1; + } + result + }); + drop(receiver_guard); + + let mut clear_metadata_count = 0; + let metadata_guard = self.metadata.lock().await; + metadata_guard.retain(|_, v| { + let result = v.created.elapsed().as_secs() < EXPIRE_TIME; + if !result { + clear_metadata_count += 1; + } + result + }); + drop(metadata_guard); + async_info!("clear state end"); + async_info!(format!( + "clear receiver count: {}, clear metadata count: {}", + clear_receiver_count, clear_metadata_count + )); } } diff --git a/kimika-server/src/service/post_download.rs b/kimika-server/src/service/post_download.rs index ed622b0..590b312 100644 --- a/kimika-server/src/service/post_download.rs +++ b/kimika-server/src/service/post_download.rs @@ -49,14 +49,16 @@ impl Server { match transfer_guard.sender.take() { Some(sender) => { - transfer( + if let Err(e) = transfer( sender, data::DataReceiver { res_sender: res_body_tx, }, ) .await - .unwrap(); + { + eprintln!("Error: {}", e); + }; } None => { transfer_guard.receiver.replace(data::DataReceiver { diff --git a/kimika-server/src/service/post_metadata.rs b/kimika-server/src/service/post_metadata.rs index 9426dde..8cd49f8 100644 --- a/kimika-server/src/service/post_metadata.rs +++ b/kimika-server/src/service/post_metadata.rs @@ -1,14 +1,13 @@ use super::Server; use crate::data; -use crate::utils::hyper_utils; -use crate::utils::types; +use crate::utils::{hyper_utils, types}; use bytes::{Buf, Bytes}; use http_body_util::BodyExt; use hyper::Response; use serde::{Deserialize, Serialize}; use std::time; -use tokio::sync::mpsc; +use tokio::sync::oneshot; use uuid::Uuid; #[derive(Deserialize, Debug)] @@ -59,7 +58,7 @@ impl Server { let receiver_id = payload.receiver_id; let metadata_guard = self.metadata.lock().await; let sender_id = Uuid::new_v4().to_string(); - let (tx, mut rx) = mpsc::channel(1); + let (tx, rx) = oneshot::channel(); let metadatas = payload .metadata .iter() @@ -83,7 +82,7 @@ impl Server { }, receiver_id: receiver_id.clone(), metadata_list: metadatas, - selected_metadata_tx: tx, + selected_metadata_tx: Some(tx), created: time::Instant::now(), }, ); @@ -91,24 +90,27 @@ impl Server { drop(metadata_guard); // TODO none handle - let selected_metadata_tokens = rx.recv().await.unwrap(); - let mut selected_metadata_list = Vec::new(); + let selected_metadata_tokens = rx.await.unwrap(); + let metadata_guard = self.metadata.lock().await; - if let Some(mut metadata) = metadata_guard.get_mut(&receiver_id) { - metadata - .metadata_list - .retain(|v| selected_metadata_tokens.contains(&v.token)); - selected_metadata_list = metadata - .metadata_list - .iter() - .map(|v| ResponseMetadata { - id: v.id.clone(), - token: v.token.clone(), - }) - .collect(); - } else { - // TODO none handle - } + let selected_metadata_list = + if let Some(mut metadata) = metadata_guard.get_mut(&receiver_id) { + metadata + .metadata_list + .retain(|v| selected_metadata_tokens.contains(&v.token)); + metadata + .metadata_list + .iter() + .map(|v| ResponseMetadata { + id: v.id.clone(), + token: v.token.clone(), + }) + .collect() + } else { + return Ok(hyper_utils::rejection_response( + "Cannot find metadata from receiver id", + )); + }; drop(metadata_guard); diff --git a/kimika-server/src/service/post_register.rs b/kimika-server/src/service/post_register.rs index eaea21e..c11e821 100644 --- a/kimika-server/src/service/post_register.rs +++ b/kimika-server/src/service/post_register.rs @@ -13,13 +13,13 @@ use uuid::Uuid; #[derive(Deserialize, Debug)] struct Payload { alias: String, + identifier: Option, } #[derive(Serialize)] struct ResponseBody { /// receiver id id: String, - message: String, } impl Server { @@ -28,24 +28,33 @@ impl Server { let body = req.collect().await?.aggregate(); let payload: Payload = serde_json::from_reader(body.reader())?; + let receiver_guard = self.receiver.lock().await; + + let mut receiver_item = receiver_guard + .iter() + .find(|item| item.identifier.eq(&payload.identifier)); + let uuid = Uuid::new_v4().to_string(); - let receiver = data::Receiver { - id: uuid.clone(), - alias: payload.alias.clone(), - created: time::Instant::now(), + let id = if let Some(receiver) = receiver_item.take() { + receiver.value().id.clone() + } else { + let receiver = data::Receiver { + id: uuid.clone(), + alias: payload.alias, + identifier: payload.identifier, + created: time::Instant::now(), + }; + + receiver_guard.insert(uuid.clone(), receiver); + uuid }; - let receiver_guard = self.receiver.lock().await; - receiver_guard.insert(uuid.clone(), receiver); + drop(receiver_item); drop(receiver_guard); let body = hyper_utils::full(Bytes::from( - serde_json::to_string(&ResponseBody { - id: uuid, - message: String::from("ok"), - }) - .unwrap(), + serde_json::to_string(&ResponseBody { id }).unwrap(), )); let res = Response::new(body); diff --git a/kimika-server/src/service/post_select_metadata.rs b/kimika-server/src/service/post_select_metadata.rs index 50ef745..1ac84ef 100644 --- a/kimika-server/src/service/post_select_metadata.rs +++ b/kimika-server/src/service/post_select_metadata.rs @@ -1,11 +1,10 @@ use super::Server; -use crate::utils::hyper_utils; -use crate::utils::types; +use crate::utils::{hyper_utils, types}; -use bytes::{Buf, Bytes}; +use bytes::Buf; use http_body_util::BodyExt; use hyper::Response; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; #[derive(Deserialize, Debug)] struct Payload { @@ -15,35 +14,25 @@ struct Payload { selected_tokens: Vec, } -#[derive(Serialize)] -struct ResponseBody { - message: String, -} - impl Server { pub async fn post_select_metadata(self, req: types::RequestType) -> types::ResponseType { let body = req.collect().await?.aggregate(); - let payload: Payload = serde_json::from_reader(body.reader())?; let metadata_guard = self.metadata.lock().await; - let metadata_entry = metadata_guard.get(&payload.id); - - if let Some(metadata) = metadata_entry { - metadata - .selected_metadata_tx - .send(payload.selected_tokens) - .await?; + let metadata_entry = metadata_guard.get_mut(&payload.id); + + if let Some(mut metadata) = metadata_entry { + if let Some(tx) = metadata.selected_metadata_tx.take() { + tx.send(payload.selected_tokens).unwrap(); + } + } else { + return Ok(hyper_utils::rejection_response( + "Cannot find metadata from id", + )); } - let body = hyper_utils::full(Bytes::from( - serde_json::to_string(&ResponseBody { - message: String::from("ok"), - }) - .unwrap(), - )); - - let res = Response::new(body); + let res = Response::new(hyper_utils::empty()); Ok(res) } } diff --git a/kimika/Cargo.toml b/kimika/Cargo.toml index 3dfcccf..d2b45f8 100644 --- a/kimika/Cargo.toml +++ b/kimika/Cargo.toml @@ -22,6 +22,7 @@ reqwest = { version = "0.12.7", features = ["stream", "json"] } uuid = {version = "1.10.0", features = ["v4", "fast-rng", "macro-diagnostics"]} tokio-util = "0.7.12" bytes = "1.7.1" +mac_address = "1.1.7" [build-dependencies] tonic-build = "0.11" diff --git a/kimika/src/receive/mod.rs b/kimika/src/receive/mod.rs index e8df34f..c7cef0f 100644 --- a/kimika/src/receive/mod.rs +++ b/kimika/src/receive/mod.rs @@ -1,7 +1,6 @@ mod local; mod local_grpc; mod remote; -mod remote_bark; mod remote_grpc; mod udp; diff --git a/kimika/src/receive/remote.rs b/kimika/src/receive/remote.rs index 12c88e6..03bcc30 100644 --- a/kimika/src/receive/remote.rs +++ b/kimika/src/receive/remote.rs @@ -2,6 +2,7 @@ use super::ReceiveArgs; use crate::request::remote as request_remote; use crate::utils; use crate::{config, utils::handle}; + use crossterm::style::Stylize; use std::path::PathBuf; use tokio::io::{AsyncWriteExt, BufWriter}; @@ -21,7 +22,12 @@ pub async fn remote_receive( let request = request_remote::RequestClient::new(&address); - let receiver_id = request.post_register(config.alias.clone()).await?.id; + let mac_address = utils::handle::get_mac_address(); + + let receiver_id = request + .post_register(config.alias.clone(), mac_address) + .await? + .id; let mut metadatas = Vec::new(); loop { diff --git a/kimika/src/receive/remote_bark.rs b/kimika/src/receive/remote_bark.rs deleted file mode 100644 index ef20678..0000000 --- a/kimika/src/receive/remote_bark.rs +++ /dev/null @@ -1,48 +0,0 @@ -use super::{remote_grpc, ReceiveArgs}; -use crate::{config, utils::handle}; -use crossterm::style::Stylize; -use kimika_grpc::remote; - -pub async fn remote_receive( - args: &ReceiveArgs, - config: &config::Config, -) -> Result<(), Box> { - let address = if let Some(addr) = handle::handle_address(args.address.clone(), config) { - addr - } else { - println!("{}", "No server address configured".red()); - return Ok(()); - }; - - let alias = config.alias.clone(); - let save_folder = std::path::PathBuf::from(config.receiver.save_folder.clone()); - - let mut client = remote_grpc::create_client(address) - .await - .expect("connect remote server failed"); - eprintln!("{} {}", "Connected to remote server: ".green(), address); - - let register_res = remote_grpc::register_receiver(&mut client, &alias) - .await - .expect("register receiver failed"); - - let receiver_id = register_res.receiver_id; - - let mut content_res = remote_grpc::get_content(&mut client, &receiver_id) - .await - .expect("get content failed"); - - let mut content = remote::get_content_response::Content::default(); - while let Some(res) = content_res.message().await? { - for item in res.content_list { - content = item; - } - break; - } - - remote_grpc::receive(&mut client, &receiver_id, &save_folder, content) - .await - .expect("receive failed"); - - Ok(()) -} diff --git a/kimika/src/request/remote.rs b/kimika/src/request/remote.rs index 1966254..7cef8c9 100644 --- a/kimika/src/request/remote.rs +++ b/kimika/src/request/remote.rs @@ -85,12 +85,12 @@ pub struct PostUploadParams { #[derive(Serialize)] pub struct PostRegisterPayload { alias: String, + identifier: Option, } #[derive(Deserialize)] pub struct PostRegisterResponse { pub id: String, - pub message: String, } /** ===================================== */ @@ -104,7 +104,6 @@ pub struct GetMetadataParams { #[derive(Deserialize)] pub struct GetMetadataResponse { pub metadatas: Vec, - pub message: String, } /** ===================================== */ @@ -117,11 +116,6 @@ pub struct PostSelectMetadataPayload { pub selected_tokens: Vec, } -#[derive(Deserialize)] -pub struct PostSelectMetadataResponse { - pub message: String, -} - /** ===================================== */ #[derive(Serialize)] @@ -230,13 +224,14 @@ impl RequestClient { pub async fn post_register( &self, alias: String, + identifier: Option, ) -> Result { let mut url = self.url.clone(); url.set_path("/register"); let result = Client::new() .post(url) - .json(&PostRegisterPayload { alias }) + .json(&PostRegisterPayload { alias, identifier }) .send() .await?; Ok(result.json().await.unwrap()) @@ -259,13 +254,13 @@ impl RequestClient { pub async fn post_select_metadata( &self, payload: &PostSelectMetadataPayload, - ) -> Result { + ) -> Result<(), reqwest::Error> { let mut url = self.url.clone(); url.set_path("/metadata/select"); let result = Client::new().post(url).json(payload).send().await?; match result.error_for_status() { - Ok(res) => Ok(res.json().await.unwrap()), + Ok(_) => Ok(()), Err(err) => Err(err), } } diff --git a/kimika/src/send/remote.rs b/kimika/src/send/remote.rs index a531c3c..868d83d 100644 --- a/kimika/src/send/remote.rs +++ b/kimika/src/send/remote.rs @@ -54,7 +54,7 @@ pub async fn remote_send( if tx.is_closed() { break; } - let res = request_clone.get_receivers().await.expect(""); + let res = request_clone.get_receivers().await.unwrap(); let receiver_iter = res.receivers.iter().map(|receiver| select::SelectItem { id: receiver.id.clone(), label: receiver.alias.clone(), diff --git a/kimika/src/utils/handle.rs b/kimika/src/utils/handle.rs index a6e7c51..eed0aad 100644 --- a/kimika/src/utils/handle.rs +++ b/kimika/src/utils/handle.rs @@ -39,3 +39,11 @@ pub fn handle_message(args: &SendArgs) -> Option { try_read_from_pipeline() } } + +pub fn get_mac_address() -> Option { + if let Ok(Some(ma)) = mac_address::get_mac_address() { + Some(ma.to_string()) + } else { + None + } +}