Skip to content

Commit

Permalink
Simplify TransferManager logic
Browse files Browse the repository at this point in the history
Signed-off-by: Mateusz Szczygieł <mateusz.szczygiel@nordsec.com>
  • Loading branch information
matszczygiel committed Jul 10, 2024
1 parent 3a25262 commit 41435c9
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 42 deletions.
16 changes: 11 additions & 5 deletions drop-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,18 @@ impl Storage {
}

pub async fn stop_incoming_file(&self, transfer_id: Uuid, file_id: &str) -> Option<()> {
let task = async {
let conn = self.conn.lock().await;
sync::stop_incoming_file(&conn, transfer_id, file_id)
};
let conn = self.conn.lock().await;

if let Err(e) = sync::incoming_file_set_local_state(
&conn,
transfer_id,
file_id,
sync::FileState::Terminal,
) {
error!(self.logger, "Failed to update incoming file sync states"; "error" => %e);
}

match task.await {
match sync::stop_incoming_file(&conn, transfer_id, file_id) {
Ok(state) => state,
Err(e) => {
error!(self.logger, "Failed to stop incoming file sync state"; "error" => %e);
Expand Down
71 changes: 34 additions & 37 deletions drop-transfer/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,6 @@ impl TransferManager {
let sync = state.file_sync_mut(file_id)?;
sync.try_terminate_local(FileTerminalState::Rejected)?;

self.storage
.update_incoming_file_sync_states(
state.xfer.id(),
file_id.as_ref(),
sync::FileState::Terminal,
)
.await;

self.storage
.stop_incoming_file(state.xfer.id(), file_id.as_ref())
.await;
Expand Down Expand Up @@ -459,7 +451,7 @@ impl TransferManager {
&self,
transfer_id: Uuid,
file_id: &FileId,
success: bool,
success: Result<(), String>,
) -> crate::Result<()> {
let mut lock = self.incoming.lock().await;

Expand All @@ -469,24 +461,31 @@ impl TransferManager {

state.ensure_not_cancelled()?;

let state = state.file_sync_mut(file_id)?;
state.try_terminate_local(if success {
let fstate = state.file_sync_mut(file_id)?;
fstate.try_terminate_local(if success.is_ok() {
FileTerminalState::Completed
} else {
FileTerminalState::Failed
})?;

self.storage
.update_incoming_file_sync_states(
transfer_id,
file_id.as_ref(),
sync::FileState::Terminal,
)
.await;
self.storage
.stop_incoming_file(transfer_id, file_id.as_ref())
.await;

if let Some(conn) = &state.conn {
let file = file_id.clone();

let (name, serv_req) = match success {
Ok(()) => ("DONE", ServerReq::Done { file }),
Err(msg) => ("FAIL", ServerReq::Fail { file, msg }),
};

debug!(self.logger, "Pushing file {name} message");
if let Err(e) = conn.send(serv_req) {
warn!(self.logger, "Failed to send {name} message: {e}");
};
}

Ok(())
}

Expand All @@ -505,13 +504,6 @@ impl TransferManager {
let sync = state.file_sync_mut(file_id)?;

let res = if sync.try_terminate_local(file_state).is_ok() {
self.storage
.update_incoming_file_sync_states(
transfer_id,
file_id.as_ref(),
sync::FileState::Terminal,
)
.await;
self.storage
.stop_incoming_file(transfer_id, file_id.as_ref())
.await;
Expand Down Expand Up @@ -568,18 +560,7 @@ impl TransferManager {
.ok_or(crate::Error::BadTransfer)?;

state.ensure_not_cancelled()?;

self.storage
.update_transfer_sync_states(transfer_id, drop_storage::sync::TransferState::Canceled)
.await;
state.xfer_sync = sync::TransferState::Canceled;

if let Some(conn) = state.conn.take() {
debug!(self.logger, "Pushing outgoing close request");
if let Err(e) = conn.send(ServerReq::Close) {
warn!(self.logger, "Failed to send close request: {}", e);
}
}
state.cancel_transfer(&self.logger, &self.storage).await;

for val in state.file_sync.values_mut() {
if let IncomingLocalFileState::InFlight { .. } = &*val {
Expand Down Expand Up @@ -868,6 +849,22 @@ impl IncomingState {
.get_mut(file_id)
.ok_or(crate::Error::BadFileId)
}

async fn cancel_transfer(&mut self, logger: &Logger, storage: &Storage) {
storage
.update_transfer_sync_states(self.xfer.id(), sync::TransferState::Canceled)
.await;

self.xfer_sync = sync::TransferState::Canceled;

if let Some(conn) = self.conn.take() {
debug!(logger, "Pushing outgoing close request");

if let Err(e) = conn.send(ServerReq::Close) {
warn!(logger, "Failed to send close request: {}", e);
}
}
}
}

impl DirMapping {
Expand Down

0 comments on commit 41435c9

Please sign in to comment.