Skip to content

Commit

Permalink
feat: ✨ Improve server status management
Browse files Browse the repository at this point in the history
  • Loading branch information
yixiaojiu committed Oct 22, 2024
1 parent 446a0b4 commit 84b2832
Show file tree
Hide file tree
Showing 16 changed files with 169 additions and 139 deletions.
39 changes: 39 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 12 additions & 5 deletions kimika-server/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::utils::types;
use hyper::Response;
use serde::Serialize;
use std::time;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

pub struct DataSender {
pub req_body: hyper::body::Incoming,
Expand All @@ -17,10 +17,9 @@ pub struct DataReceiver {
pub struct Transfer {
pub sender: Option<DataSender>,
pub receiver: Option<DataReceiver>,
pub created: time::Instant,
}

#[derive(Clone, Serialize)]
#[derive(Clone, Debug, Serialize)]
pub struct MetadataItem {
pub id: String,
pub token: String,
Expand All @@ -33,23 +32,32 @@ pub struct MetadataItem {
pub completed: bool,
}

#[derive(Debug)]
pub struct Sender {
pub alias: String,
pub id: String,
}

#[derive(Debug)]
pub struct Metadata {
/// sender alias
pub sender: Sender,
pub receiver_id: String,
pub metadata_list: Vec<MetadataItem>,
pub selected_metadata_tx: mpsc::Sender<Vec<String>>,
pub selected_metadata_tx: Option<oneshot::Sender<Vec<String>>>,
pub created: time::Instant,
}

#[derive(Debug)]
pub struct Receiver {
pub id: String,
pub alias: String,

/// Unique identifier from client, such as: mac address
///
/// Preventing duplicate insertion
pub identifier: Option<String>,

pub created: time::Instant,
}

Expand All @@ -58,7 +66,6 @@ impl Transfer {
Self {
sender: None,
receiver: None,
created: time::Instant::now(),
}
}
}
2 changes: 1 addition & 1 deletion kimika-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server_clone = server.clone();

tokio::spawn(async move {
const DURATION_TIME: u64 = 60 * 60;
const DURATION_TIME: u64 = 2 * 60;

loop {
time::sleep(time::Duration::from_secs(DURATION_TIME)).await;
Expand Down
13 changes: 2 additions & 11 deletions kimika-server/src/service/get_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ pub struct MetadataItem {
#[derive(Serialize)]
struct ResponseBody {
metadatas: Vec<MetadataItem>,
message: String,
}

impl Server {
Expand All @@ -52,21 +51,13 @@ impl Server {
})
.collect();
let body = hyper_utils::full(Bytes::from(
serde_json::to_string(&ResponseBody {
metadatas,
message: String::from("ok"),
})
.unwrap(),
serde_json::to_string(&ResponseBody { metadatas }).unwrap(),
));
let res = Response::new(body);
Ok(res)
} else {
let body = hyper_utils::full(Bytes::from(
serde_json::to_string(&ResponseBody {
metadatas: vec![],
message: String::from("cannot find metadata from id"),
})
.unwrap(),
serde_json::to_string(&ResponseBody { metadatas: vec![] }).unwrap(),
));
let mut res = Response::new(body);
*res.status_mut() = hyper::StatusCode::BAD_REQUEST;
Expand Down
30 changes: 30 additions & 0 deletions kimika-server/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct Server {
transfer: Arc<dashmap::DashMap<String, Mutex<data::Transfer>>>,
}

const EXPIRE_TIME: u64 = 60 * 10;

impl Clone for Server {
fn clone(&self) -> Self {
Self {
Expand Down Expand Up @@ -75,6 +77,34 @@ impl Server {
}

pub async fn clear_state(self) {
async_info!("clear state start");

let mut clear_receiver_count = 0;
let receiver_guard = self.receiver.lock().await;
receiver_guard.retain(|_, v| {
let result = v.created.elapsed().as_secs() < EXPIRE_TIME;
if !result {
clear_receiver_count += 1;
}
result
});
drop(receiver_guard);

let mut clear_metadata_count = 0;
let metadata_guard = self.metadata.lock().await;
metadata_guard.retain(|_, v| {
let result = v.created.elapsed().as_secs() < EXPIRE_TIME;
if !result {
clear_metadata_count += 1;
}
result
});
drop(metadata_guard);

async_info!("clear state end");
async_info!(format!(
"clear receiver count: {}, clear metadata count: {}",
clear_receiver_count, clear_metadata_count
));
}
}
6 changes: 4 additions & 2 deletions kimika-server/src/service/post_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@ impl Server {

match transfer_guard.sender.take() {
Some(sender) => {
transfer(
if let Err(e) = transfer(
sender,
data::DataReceiver {
res_sender: res_body_tx,
},
)
.await
.unwrap();
{
eprintln!("Error: {}", e);
};
}
None => {
transfer_guard.receiver.replace(data::DataReceiver {
Expand Down
46 changes: 24 additions & 22 deletions kimika-server/src/service/post_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use super::Server;
use crate::data;
use crate::utils::hyper_utils;
use crate::utils::types;
use crate::utils::{hyper_utils, types};

use bytes::{Buf, Bytes};
use http_body_util::BodyExt;
use hyper::Response;
use serde::{Deserialize, Serialize};
use std::time;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use uuid::Uuid;

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -59,7 +58,7 @@ impl Server {
let receiver_id = payload.receiver_id;
let metadata_guard = self.metadata.lock().await;
let sender_id = Uuid::new_v4().to_string();
let (tx, mut rx) = mpsc::channel(1);
let (tx, rx) = oneshot::channel();
let metadatas = payload
.metadata
.iter()
Expand All @@ -83,32 +82,35 @@ impl Server {
},
receiver_id: receiver_id.clone(),
metadata_list: metadatas,
selected_metadata_tx: tx,
selected_metadata_tx: Some(tx),
created: time::Instant::now(),
},
);

drop(metadata_guard);

// TODO none handle
let selected_metadata_tokens = rx.recv().await.unwrap();
let mut selected_metadata_list = Vec::new();
let selected_metadata_tokens = rx.await.unwrap();

let metadata_guard = self.metadata.lock().await;
if let Some(mut metadata) = metadata_guard.get_mut(&receiver_id) {
metadata
.metadata_list
.retain(|v| selected_metadata_tokens.contains(&v.token));
selected_metadata_list = metadata
.metadata_list
.iter()
.map(|v| ResponseMetadata {
id: v.id.clone(),
token: v.token.clone(),
})
.collect();
} else {
// TODO none handle
}
let selected_metadata_list =
if let Some(mut metadata) = metadata_guard.get_mut(&receiver_id) {
metadata
.metadata_list
.retain(|v| selected_metadata_tokens.contains(&v.token));
metadata
.metadata_list
.iter()
.map(|v| ResponseMetadata {
id: v.id.clone(),
token: v.token.clone(),
})
.collect()
} else {
return Ok(hyper_utils::rejection_response(
"Cannot find metadata from receiver id",
));
};

drop(metadata_guard);

Expand Down
33 changes: 21 additions & 12 deletions kimika-server/src/service/post_register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use uuid::Uuid;
#[derive(Deserialize, Debug)]
struct Payload {
alias: String,
identifier: Option<String>,
}

#[derive(Serialize)]
struct ResponseBody {
/// receiver id
id: String,
message: String,
}

impl Server {
Expand All @@ -28,24 +28,33 @@ impl Server {
let body = req.collect().await?.aggregate();
let payload: Payload = serde_json::from_reader(body.reader())?;

let receiver_guard = self.receiver.lock().await;

let mut receiver_item = receiver_guard
.iter()
.find(|item| item.identifier.eq(&payload.identifier));

let uuid = Uuid::new_v4().to_string();

let receiver = data::Receiver {
id: uuid.clone(),
alias: payload.alias.clone(),
created: time::Instant::now(),
let id = if let Some(receiver) = receiver_item.take() {
receiver.value().id.clone()
} else {
let receiver = data::Receiver {
id: uuid.clone(),
alias: payload.alias,
identifier: payload.identifier,
created: time::Instant::now(),
};

receiver_guard.insert(uuid.clone(), receiver);
uuid
};

let receiver_guard = self.receiver.lock().await;
receiver_guard.insert(uuid.clone(), receiver);
drop(receiver_item);
drop(receiver_guard);

let body = hyper_utils::full(Bytes::from(
serde_json::to_string(&ResponseBody {
id: uuid,
message: String::from("ok"),
})
.unwrap(),
serde_json::to_string(&ResponseBody { id }).unwrap(),
));

let res = Response::new(body);
Expand Down
Loading

0 comments on commit 84b2832

Please sign in to comment.