Skip to content

Commit

Permalink
feat: ✨ server clear state
Browse files Browse the repository at this point in the history
  • Loading branch information
yixiaojiu committed Oct 21, 2024
1 parent ecc7f9e commit 446a0b4
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 34 deletions.
9 changes: 8 additions & 1 deletion kimika-server/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::utils::types;

use hyper::Response;
use serde::Serialize;
use std::time;
use tokio::sync::{mpsc, oneshot};

pub struct DataSender {
Expand All @@ -15,6 +17,7 @@ pub struct DataReceiver {
pub struct Transfer {
pub sender: Option<DataSender>,
pub receiver: Option<DataReceiver>,
pub created: time::Instant,
}

#[derive(Clone, Serialize)]
Expand All @@ -26,6 +29,8 @@ pub struct MetadataItem {
pub file_name: Option<String>,
pub file_type: Option<String>,
pub size: Option<u64>,
/// whether completed
pub completed: bool,
}

pub struct Sender {
Expand All @@ -39,19 +44,21 @@ pub struct Metadata {
pub receiver_id: String,
pub metadata_list: Vec<MetadataItem>,
pub selected_metadata_tx: mpsc::Sender<Vec<String>>,
pub created: time::Instant,
}

#[derive(Clone, Serialize)]
pub struct Receiver {
pub id: String,
pub alias: String,
pub created: time::Instant,
}

impl Transfer {
pub fn new() -> Self {
Self {
sender: None,
receiver: None,
created: time::Instant::now(),
}
}
}
11 changes: 11 additions & 0 deletions kimika-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use hyper_util::rt::TokioIo;
use std::net::SocketAddr;
use tklog::{async_error, ASYNC_LOG, LEVEL, MODE};
use tokio::net::TcpListener;
use tokio::time;

async fn async_log_init() {
ASYNC_LOG
Expand All @@ -29,6 +30,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(addr).await?;

let server = service::Server::new();
let server_clone = server.clone();

tokio::spawn(async move {
const DURATION_TIME: u64 = 60 * 60;

loop {
time::sleep(time::Duration::from_secs(DURATION_TIME)).await;
server_clone.clone().clear_state().await;
}
});

let server_service = service_fn(move |req| server.clone().handle(req));

Expand Down
29 changes: 25 additions & 4 deletions kimika-server/src/service/get_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::Server;
use crate::data;
use crate::utils::hyper_utils;
use crate::utils::types;

Expand All @@ -13,9 +12,20 @@ struct Params {
id: String,
}

#[derive(Clone, Serialize)]
pub struct MetadataItem {
pub id: String,
pub token: String,
/// file or message
pub metadata_type: String,
pub file_name: Option<String>,
pub file_type: Option<String>,
pub size: Option<u64>,
}

#[derive(Serialize)]
struct ResponseBody {
metadatas: Vec<data::MetadataItem>,
metadatas: Vec<MetadataItem>,
message: String,
}

Expand All @@ -29,10 +39,21 @@ impl Server {
let metadata_entry = metadata_guard.get(&params.id);

if let Some(metadata) = metadata_entry {
let metadatas = metadata.metadata_list.clone();
let metadatas = metadata
.metadata_list
.iter()
.map(|v| MetadataItem {
id: v.id.clone(),
token: v.token.clone(),
metadata_type: v.metadata_type.clone(),
file_name: v.file_name.clone(),
file_type: v.file_type.clone(),
size: v.size.clone(),
})
.collect();
let body = hyper_utils::full(Bytes::from(
serde_json::to_string(&ResponseBody {
metadatas: metadatas,
metadatas,
message: String::from("ok"),
})
.unwrap(),
Expand Down
19 changes: 14 additions & 5 deletions kimika-server/src/service/get_receivers.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,41 @@
use super::Server;
use crate::data;
use crate::utils::hyper_utils;
use crate::utils::types;

use bytes::Bytes;
use hyper::Response;
use serde::Serialize;

#[derive(Clone, Serialize)]
pub struct Receiver {
pub id: String,
pub alias: String,
}

#[derive(Serialize)]
struct ResponseBody {
receivers: Vec<data::Receiver>,
receivers: Vec<Receiver>,
message: String,
}

impl Server {
pub async fn get_receivers(self, _req: types::RequestType) -> types::ResponseType {
let mut receivers: Vec<data::Receiver> = vec![];
let mut receivers: Vec<Receiver> = vec![];

let receiver_guard = self.receiver.lock().await;

for item in receiver_guard.iter() {
receivers.push(item.value().clone());
let receiver = item.value();
receivers.push(Receiver {
id: receiver.id.clone(),
alias: receiver.alias.clone(),
});
}
drop(receiver_guard);

let body = hyper_utils::full(Bytes::from(
serde_json::to_string(&ResponseBody {
receivers: receivers,
receivers,
message: String::from("ok"),
})
.unwrap(),
Expand Down
28 changes: 17 additions & 11 deletions kimika-server/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ use crate::data;
use crate::utils::hyper_utils;
use crate::utils::types;

use bytes::Bytes;
use hyper::Response;
use std::sync::Arc;
use tklog::async_info;
use tklog::{async_error, async_info};
use tokio::sync::Mutex;

pub struct Server {
Expand Down Expand Up @@ -45,18 +44,13 @@ impl Server {
}
}

pub async fn handle(
self,
req: types::RequestType,
) -> Result<
Response<impl http_body::Body<Data = Bytes, Error = hyper::Error>>,
Box<dyn std::error::Error + Send + Sync>,
> {
pub async fn handle(self, req: types::RequestType) -> types::ResponseType {
let method = req.method();
let path = req.uri().path();
async_info!(format!("[{}] [{}]", method, path));
let log_flag = format!("[{}] [{}]", method, path);
async_info!(log_flag);

match (method, path) {
let res = match (method, path) {
(&hyper::Method::POST, "/register") => self.post_register(req).await,
(&hyper::Method::POST, "/upload") => self.post_upload(req).await,
(&hyper::Method::POST, "/download") => self.post_download(req).await,
Expand All @@ -69,6 +63,18 @@ impl Server {
*res.status_mut() = hyper::StatusCode::NOT_FOUND;
Ok(res)
}
};

if let Err(e) = res {
let error_message = e.to_string();
async_error!(format!("{} {}", log_flag, error_message));
return Ok(hyper_utils::rejection_response(error_message));
}

res
}

pub async fn clear_state(self) {

}
}
18 changes: 14 additions & 4 deletions kimika-server/src/service/post_download.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::transfer::transfer;
use super::Server;
use crate::data;
use crate::utils::types;
use crate::utils::{hyper_utils, types};

use hyper::Response;
use serde::Deserialize;
Expand All @@ -21,15 +21,19 @@ impl Server {
let params: Params = serde_qs::from_str(query)?;

let metadata_guard = self.metadata.lock().await;
// TODO none hander
let metadata_entry = metadata_guard.get(&params.id);
// TODO check
if let Some(ref metadata) = metadata_entry {
let metadata_check = metadata
.metadata_list
.iter()
.any(|v| v.token == params.token);
if !metadata_check {
return Ok(hyper_utils::rejection_response("Metadata check failed"));
}
} else {
return Ok(hyper_utils::rejection_response(
"Cannot find metadata from receiver id",
));
}
drop(metadata_entry);
drop(metadata_guard);
Expand Down Expand Up @@ -62,6 +66,12 @@ impl Server {
}
drop(transfer_guard);
drop(transfer_mutex);
Ok(res_body_rx.await.unwrap())

let res = res_body_rx.await?;

// clear server state
self.transfer.remove(&params.token);

Ok(res)
}
}
17 changes: 14 additions & 3 deletions kimika-server/src/service/post_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use bytes::{Buf, Bytes};
use http_body_util::BodyExt;
use hyper::Response;
use serde::{Deserialize, Serialize};
use std::time;
use tokio::sync::mpsc;
use uuid::Uuid;

Expand Down Expand Up @@ -47,9 +48,17 @@ impl Server {
let body = req.collect().await?.aggregate();
let payload: Payload = serde_json::from_reader(body.reader())?;

let receiver_guard = self.receiver.lock().await;
if receiver_guard.contains_key(&payload.receiver_id) {
receiver_guard.remove(&payload.receiver_id);
} else {
return Ok(hyper_utils::rejection_response("Cannot find receiver"));
}
drop(receiver_guard);

let receiver_id = payload.receiver_id;
let metadata_guard = self.metadata.lock().await;
let uuid = Uuid::new_v4().to_string();
let sender_id = Uuid::new_v4().to_string();
let (tx, mut rx) = mpsc::channel(1);
let metadatas = payload
.metadata
Expand All @@ -61,19 +70,21 @@ impl Server {
file_name: v.file_name.clone(),
file_type: v.file_type.clone(),
size: v.size.clone(),
completed: false,
})
.collect::<Vec<data::MetadataItem>>();

metadata_guard.insert(
receiver_id.clone(),
data::Metadata {
sender: data::Sender {
id: uuid.clone(),
id: sender_id.clone(),
alias: payload.alias,
},
receiver_id: receiver_id.clone(),
metadata_list: metadatas,
selected_metadata_tx: tx,
created: time::Instant::now(),
},
);

Expand Down Expand Up @@ -103,7 +114,7 @@ impl Server {

let body = hyper_utils::full(Bytes::from(
serde_json::to_string(&ResponseBody {
id: uuid,
id: sender_id,
selected_metadata_list,
message: String::from("ok"),
})
Expand Down
2 changes: 2 additions & 0 deletions kimika-server/src/service/post_register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use bytes::{Buf, Bytes};
use http_body_util::BodyExt;
use hyper::Response;
use serde::{Deserialize, Serialize};
use std::time;
use uuid::Uuid;

#[derive(Deserialize, Debug)]
Expand All @@ -32,6 +33,7 @@ impl Server {
let receiver = data::Receiver {
id: uuid.clone(),
alias: payload.alias.clone(),
created: time::Instant::now(),
};

let receiver_guard = self.receiver.lock().await;
Expand Down
Loading

0 comments on commit 446a0b4

Please sign in to comment.