From 41435c9b0eb713128bb444193402565e560c2424 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Szczygie=C5=82?= Date: Wed, 10 Jul 2024 12:01:52 +0300 Subject: [PATCH] Simplify `TransferManager` logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mateusz Szczygieł --- drop-storage/src/lib.rs | 16 +++++--- drop-transfer/src/manager.rs | 71 +++++++++++++++++------------------- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/drop-storage/src/lib.rs b/drop-storage/src/lib.rs index 09f98c47..6d30902a 100644 --- a/drop-storage/src/lib.rs +++ b/drop-storage/src/lib.rs @@ -395,12 +395,18 @@ impl Storage { } pub async fn stop_incoming_file(&self, transfer_id: Uuid, file_id: &str) -> Option<()> { - let task = async { - let conn = self.conn.lock().await; - sync::stop_incoming_file(&conn, transfer_id, file_id) - }; + let conn = self.conn.lock().await; + + if let Err(e) = sync::incoming_file_set_local_state( + &conn, + transfer_id, + file_id, + sync::FileState::Terminal, + ) { + error!(self.logger, "Failed to update incoming file sync states"; "error" => %e); + } - match task.await { + match sync::stop_incoming_file(&conn, transfer_id, file_id) { Ok(state) => state, Err(e) => { error!(self.logger, "Failed to stop incoming file sync state"; "error" => %e); diff --git a/drop-transfer/src/manager.rs b/drop-transfer/src/manager.rs index b4abf8d1..6d1adcb4 100644 --- a/drop-transfer/src/manager.rs +++ b/drop-transfer/src/manager.rs @@ -405,14 +405,6 @@ impl TransferManager { let sync = state.file_sync_mut(file_id)?; sync.try_terminate_local(FileTerminalState::Rejected)?; - self.storage - .update_incoming_file_sync_states( - state.xfer.id(), - file_id.as_ref(), - sync::FileState::Terminal, - ) - .await; - self.storage .stop_incoming_file(state.xfer.id(), file_id.as_ref()) .await; @@ -459,7 +451,7 @@ impl TransferManager { &self, transfer_id: Uuid, file_id: &FileId, - success: bool, + success: Result<(), String>, ) -> crate::Result<()> { let mut lock = self.incoming.lock().await; @@ -469,24 +461,31 @@ impl TransferManager { state.ensure_not_cancelled()?; - let state = state.file_sync_mut(file_id)?; - state.try_terminate_local(if success { + let fstate = state.file_sync_mut(file_id)?; + fstate.try_terminate_local(if success.is_ok() { FileTerminalState::Completed } else { FileTerminalState::Failed })?; - self.storage - .update_incoming_file_sync_states( - transfer_id, - file_id.as_ref(), - sync::FileState::Terminal, - ) - .await; self.storage .stop_incoming_file(transfer_id, file_id.as_ref()) .await; + if let Some(conn) = &state.conn { + let file = file_id.clone(); + + let (name, serv_req) = match success { + Ok(()) => ("DONE", ServerReq::Done { file }), + Err(msg) => ("FAIL", ServerReq::Fail { file, msg }), + }; + + debug!(self.logger, "Pushing file {name} message"); + if let Err(e) = conn.send(serv_req) { + warn!(self.logger, "Failed to send {name} message: {e}"); + }; + } + Ok(()) } @@ -505,13 +504,6 @@ impl TransferManager { let sync = state.file_sync_mut(file_id)?; let res = if sync.try_terminate_local(file_state).is_ok() { - self.storage - .update_incoming_file_sync_states( - transfer_id, - file_id.as_ref(), - sync::FileState::Terminal, - ) - .await; self.storage .stop_incoming_file(transfer_id, file_id.as_ref()) .await; @@ -568,18 +560,7 @@ impl TransferManager { .ok_or(crate::Error::BadTransfer)?; state.ensure_not_cancelled()?; - - self.storage - .update_transfer_sync_states(transfer_id, drop_storage::sync::TransferState::Canceled) - .await; - state.xfer_sync = sync::TransferState::Canceled; - - if let Some(conn) = state.conn.take() { - debug!(self.logger, "Pushing outgoing close request"); - if let Err(e) = conn.send(ServerReq::Close) { - warn!(self.logger, "Failed to send close request: {}", e); - } - } + state.cancel_transfer(&self.logger, &self.storage).await; for val in state.file_sync.values_mut() { if let IncomingLocalFileState::InFlight { .. } = &*val { @@ -868,6 +849,22 @@ impl IncomingState { .get_mut(file_id) .ok_or(crate::Error::BadFileId) } + + async fn cancel_transfer(&mut self, logger: &Logger, storage: &Storage) { + storage + .update_transfer_sync_states(self.xfer.id(), sync::TransferState::Canceled) + .await; + + self.xfer_sync = sync::TransferState::Canceled; + + if let Some(conn) = self.conn.take() { + debug!(logger, "Pushing outgoing close request"); + + if let Err(e) = conn.send(ServerReq::Close) { + warn!(logger, "Failed to send close request: {}", e); + } + } + } } impl DirMapping {