Skip to content

Commit

Permalink
feat: ✨ impl post upload for local server
Browse files Browse the repository at this point in the history
  • Loading branch information
yixiaojiu committed Nov 2, 2024
1 parent a38c76d commit bd2d34f
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions kimika/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ hyper = {version = "1.5.0", features = ["server", "http1"]}
http-body-util = "0.1.2"
hyper-util = {version = "0.1.7", features = ["tokio"]}
once_cell = "1.20.2"
serde_qs = "0.13.0"

13 changes: 7 additions & 6 deletions kimika/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ enum Commands {
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() {
let cli = Cli::parse();
let mut config = config::Config::new();

match cli.command {
Commands::Send(args) => send::send(args, &mut config).await?,
Commands::Receive(args) => receive::receive(args, &mut config).await?,
let result = match cli.command {
Commands::Send(args) => send::send(args, &mut config).await,
Commands::Receive(args) => receive::receive(args, &mut config).await,
// Commands::Test => utils::multiselect::metadata_select().await?,
};
if let Err(e) = result {
eprintln!("Error: {}", e);
}

Ok(())
}
14 changes: 4 additions & 10 deletions kimika/src/receive/local.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
use super::ReceiveArgs;
use crate::config;
use crate::server::receiver::start_server;
use crate::server::udp::listen_boardcast;

use tokio::sync::oneshot;

pub async fn local_receive(
_args: &ReceiveArgs,
config: &config::Config,
) -> Result<(), Box<dyn std::error::Error>> {
pub async fn local_receive() -> Result<(), Box<dyn std::error::Error>> {
let (close_udp_tx, close_udp_rx) = oneshot::channel::<()>();

let alias = config.alias.clone();
let port = config.receiver.port;

tokio::spawn(async move {
listen_boardcast(close_udp_rx, alias, port).await.unwrap();
listen_boardcast(close_udp_rx).await.unwrap();
});

start_server(close_udp_tx).await?;
Ok(())
}
2 changes: 1 addition & 1 deletion kimika/src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn receive(
if args.server {
remote::remote_receive(&args, &config).await?;
} else {
local::local_receive(&args, &config).await?;
local::local_receive().await?;
}

Ok(())
Expand Down
14 changes: 13 additions & 1 deletion kimika/src/request/local.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::server::sender;
use crate::server::{receiver, sender};
use reqwest::{Client, Url};
use std::net::SocketAddr;

Expand Down Expand Up @@ -29,4 +29,16 @@ impl RequestClient {

Ok(String::from_utf8_lossy(&bytes).to_string())
}

pub async fn post_metadata(
&self,
payload: &receiver::PostRegisterPayload,
) -> Result<receiver::PostMetadataResponse, reqwest::Error> {
let mut url = self.url.clone();
url.set_path("/register");

let result = Client::new().post(url).json(payload).send().await?;
let bytes = result.bytes().await?;
Ok(serde_json::from_slice(&bytes).unwrap())
}
}
143 changes: 117 additions & 26 deletions kimika/src/server/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use super::{full, rejection_response, RequestType, ResponseType};
use crate::{utils::select, CONFIG};
use crate::{utils, CONFIG};

use bytes::Buf;
use http_body_util::BodyExt;
use hyper::{server::conn::http1, service::service_fn, Response};
use hyper_util::rt::TokioIo;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use std::{net::SocketAddr, path::PathBuf};
use tokio::fs;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::{net::TcpListener, sync::oneshot, sync::Mutex};
use tokio_stream::StreamExt;
use uuid::Uuid;

#[derive(Clone, Debug)]
Expand All @@ -20,18 +23,13 @@ pub struct MetadataItem {
pub file_name: Option<String>,
pub file_type: Option<String>,
pub size: Option<u64>,
}

pub struct ReceiverServer {
sender_alias: Arc<Mutex<Option<String>>>,
metadata_list: Arc<Mutex<Vec<MetadataItem>>>,
close_server_tx: Arc<oneshot::Sender<()>>,
pub completed: bool,
}

/** ====================================== */

#[derive(Deserialize)]
struct PayloadMetadataItem {
#[derive(Deserialize, Serialize)]
pub struct PayloadMetadataItem {
id: String,
/// file or message
metadata_type: String,
Expand All @@ -40,41 +38,57 @@ struct PayloadMetadataItem {
size: Option<u64>,
}

#[derive(Deserialize)]
struct PostRegisterPayload {
#[derive(Deserialize, Serialize)]
pub struct PostRegisterPayload {
alias: String,
metadata_list: Vec<PayloadMetadataItem>,
}

#[derive(Serialize)]
#[derive(Serialize, Deserialize)]
struct ResponseMetadata {
id: String,
token: String,
pub id: String,
pub token: String,
}

#[derive(Serialize, Deserialize)]
pub struct PostMetadataResponse {
pub selected_metadata_list: Vec<ResponseMetadata>,
}

#[derive(Serialize)]
struct ResponseBody {
selected_metadata_list: Vec<ResponseMetadata>,
/** ====================================== */

#[derive(Deserialize)]
struct PostUploadParams {
token: String,
}

/** ====================================== */

pub struct ReceiverServer {
sender_alias: Arc<Mutex<Option<String>>>,
metadata_list: Arc<Mutex<Vec<MetadataItem>>>,
close_server_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
close_udp_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}

impl Clone for ReceiverServer {
fn clone(&self) -> Self {
Self {
sender_alias: Arc::clone(&self.sender_alias),
metadata_list: Arc::clone(&self.metadata_list),
close_server_tx: Arc::clone(&self.close_server_tx),
close_udp_tx: Arc::clone(&self.close_udp_tx),
}
}
}

impl ReceiverServer {
fn new(close_server_tx: Arc<oneshot::Sender<()>>) -> Self {
fn new(close_server_tx: oneshot::Sender<()>, close_udp_tx: oneshot::Sender<()>) -> Self {
Self {
metadata_list: Arc::new(Mutex::new(vec![])),
sender_alias: Arc::new(Mutex::new(None)),
close_server_tx,
close_server_tx: Arc::new(Mutex::new(Some(close_server_tx))),
close_udp_tx: Arc::new(Mutex::new(Some(close_udp_tx))),
}
}

Expand All @@ -98,7 +112,7 @@ impl ReceiverServer {
res
}

pub async fn post_metadata(self, req: RequestType) -> ResponseType {
async fn post_metadata(self, req: RequestType) -> ResponseType {
let mut sender_alias_guard = self.sender_alias.lock().await;
if sender_alias_guard.is_some() {
return Ok(rejection_response("Receiver already being connected"));
Expand All @@ -122,6 +136,7 @@ impl ReceiverServer {
file_name: v.file_name.clone(),
file_type: v.file_type.clone(),
size: v.size.clone(),
completed: false,
})
.collect();

Expand All @@ -136,18 +151,94 @@ impl ReceiverServer {
let mut metadata_list_guard = self.metadata_list.lock().await;
metadata_list_guard.extend(metadata_list);

Ok(Response::new(full(serde_json::to_string(&ResponseBody {
selected_metadata_list,
})?)))
if let Some(close_udp_tx) = self.close_udp_tx.lock().await.take() {
if let Err(e) = close_udp_tx.send(()) {
eprintln!("Error: {:?}", e);
};
}

Ok(Response::new(full(serde_json::to_string(
&PostMetadataResponse {
selected_metadata_list,
},
)?)))
}

async fn post_upload(self, req: RequestType) -> ResponseType {
let (parts, req_body) = req.into_parts();
// TODO none hander
let query = parts.uri.query().unwrap();
let params: PostUploadParams = serde_qs::from_str(query)?;

let metadata_list_guard = self.metadata_list.lock().await;
let metadata = match metadata_list_guard.iter().find(|v| v.token == params.token) {
Some(metadata) => {
if metadata.completed {
return Ok(rejection_response("Metadata already completed"));
}
metadata.clone()
}
None => {
return Ok(rejection_response("Metadata check failed"));
}
};
drop(metadata_list_guard);

if metadata.metadata_type == "file" {
let mut stream = req_body.into_data_stream();
let mut pathbuf = PathBuf::from(CONFIG.receiver.save_folder.clone());
let filename = metadata.file_name.clone().unwrap();
pathbuf.push(&filename);
let mut rename_num = 1;
loop {
if !pathbuf.exists() {
break;
}
pathbuf.set_file_name(format!("{}({})", &filename, rename_num));
rename_num += 1;
}
let total_size = metadata.size.unwrap();
let progreebar = utils::handle::create_progress_bar(total_size, &filename);
let mut buffer_writer = BufWriter::new(fs::File::create(pathbuf).await?);
let mut downloaded_size = 0;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buffer_writer.write(&chunk).await?;
downloaded_size += chunk.len() as u64;
progreebar.set_position(std::cmp::min(downloaded_size, total_size));
}
buffer_writer.flush().await?;
progreebar.finish_with_message(filename);
} else {
let body = req_body.collect().await?;
println!("{}", String::from_utf8_lossy(&body.to_bytes()));
}

let mut metadata_list_guard = self.metadata_list.lock().await;
metadata_list_guard.iter_mut().for_each(|v| {
if v.token == metadata.token {
v.completed = true
}
});
let all_completed = metadata_list_guard.iter().all(|v| v.completed == true);
if all_completed {
if let Some(close_server_tx) = self.close_server_tx.lock().await.take() {
if let Err(e) = close_server_tx.send(()) {
eprintln!("Error: {:?}", e);
};
}
}

Ok(Response::new(full("ok")))
}
}

pub async fn start_server() -> Result<(), std::io::Error> {
pub async fn start_server(close_udp_tx: oneshot::Sender<()>) -> Result<(), std::io::Error> {
let address: SocketAddr = ([0, 0, 0, 0], CONFIG.receiver.port).into();
let (close_server_tx, mut close_server_rx) = oneshot::channel::<()>();

let listener = TcpListener::bind(address).await?;
let server = ReceiverServer::new(Arc::new(close_server_tx));
let server = ReceiverServer::new(close_server_tx, close_udp_tx);

let server_service = service_fn(move |req| server.clone().handle(req));

Expand Down
11 changes: 4 additions & 7 deletions kimika/src/server/udp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::request::local as request_local;
use crate::CONFIG;

use serde::{Deserialize, Serialize};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
Expand All @@ -13,12 +14,8 @@ pub struct UDPPacket {
pub port: u16,
}

pub async fn listen_boardcast(
mut close_rx: oneshot::Receiver<()>,
alias: String,
port: u16,
) -> Result<(), std::io::Error> {
let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
pub async fn listen_boardcast(mut close_rx: oneshot::Receiver<()>) -> Result<(), std::io::Error> {
let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), CONFIG.receiver.port);
let socket = UdpSocket::bind(address).await?;
socket.set_broadcast(true)?;
let mut buffer = vec![0u8; BUFFER_SIZE];
Expand All @@ -34,7 +31,7 @@ pub async fn listen_boardcast(
if let Ok(packet) = serde_json::from_slice::<UDPPacket>(&buffer[..num_bytes]) {
let request =
request_local::RequestClient::new(&SocketAddr::new(address.ip(), packet.port));
let result = request.register(alias.clone(), port).await;
let result = request.register(CONFIG.alias.clone(), CONFIG.receiver.port).await;
match result {
Ok(result) => {
println!("sucess {}", result);
Expand Down

0 comments on commit bd2d34f

Please sign in to comment.