Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FILE-601 Receiver does not reports events #332

Merged
merged 3 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This means the whole API has changed. Even though semantics are the same
the mechanism is now different and requires new implementation to use properly.
* Split checksum events into finalize and verify
* Add `base_dir` field in the `RequestQueued` event files
* Fix rare issue of missing receiver's in-progress events

---
<br>
Expand Down
1 change: 1 addition & 0 deletions drop-transfer/src/ws/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub trait HandlerLoop {
async fn on_bin_msg(&mut self, ws: &mut WebSocket, bytes: Vec<u8>) -> anyhow::Result<()>;

async fn finalize_success(self);
async fn finalize_failure(self);
}

pub trait Request {
Expand Down
1 change: 1 addition & 0 deletions drop-transfer/src/ws/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ impl RunContext<'_> {
"WS connection broke for {}: {err:?}",
xfer.id()
);
handler.finalize_failure().await;
} else {
info!(self.logger, "Sucesfully finalizing transfer loop");
handler.finalize_success().await;
Expand Down
28 changes: 19 additions & 9 deletions drop-transfer/src/ws/server/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,18 @@ impl<const PING: bool> HandlerLoop<'_, PING> {
self.stop_task(&file_id, Status::FileRejected).await;
}
}

fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
let jobs = std::mem::take(&mut self.jobs);

async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -359,20 +371,18 @@ impl<const PING: bool> handler::HandlerLoop for HandlerLoop<'_, PING> {
async fn finalize_success(mut self) {
debug!(self.logger, "Finalizing");
}

// While the destructor ensures the events are paused this function waits for
// the execution to be finished
async fn finalize_failure(mut self) {
self.take_pause_futures().await;
}
}

impl<const PING: bool> Drop for HandlerLoop<'_, PING> {
fn drop(&mut self) {
debug!(self.logger, "Stopping server handler");

let jobs = std::mem::take(&mut self.jobs);
tokio::spawn(async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
});
tokio::spawn(self.take_pause_futures());
}
}

Expand Down
28 changes: 19 additions & 9 deletions drop-transfer/src/ws/server/v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,18 @@ impl HandlerLoop<'_> {
}
}
}

fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
let jobs = std::mem::take(&mut self.jobs);

async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -476,20 +488,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
);
}

// While the destructor ensures the events are paused this function waits for
// the execution to be finished
async fn finalize_failure(mut self) {
self.take_pause_futures().await;
}
}

impl Drop for HandlerLoop<'_> {
fn drop(&mut self) {
debug!(self.logger, "Stopping server handler");

let jobs = std::mem::take(&mut self.jobs);
tokio::spawn(async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
});
tokio::spawn(self.take_pause_futures());
}
}

Expand Down
28 changes: 19 additions & 9 deletions drop-transfer/src/ws/server/v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,18 @@ impl HandlerLoop<'_> {
}
}
}

fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
let jobs = std::mem::take(&mut self.jobs);

async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -535,20 +547,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
);
}

// While the destructor ensures the events are paused this function waits for
// the execution to be finished
async fn finalize_failure(mut self) {
self.take_pause_futures().await;
}
}

impl Drop for HandlerLoop<'_> {
fn drop(&mut self) {
debug!(self.logger, "Stopping server handler");

let jobs = std::mem::take(&mut self.jobs);
tokio::spawn(async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
});
tokio::spawn(self.take_pause_futures());
}
}

Expand Down
11 changes: 11 additions & 0 deletions test/drop_test/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,17 @@ def __str__(self):
return f"DrainEvents({self._count})"


class ClearEventQueue(Action):
def __init__(self):
pass

async def run(self, drop: ffi.Drop):
_ = await drop._events.gather_all(0)

def __str__(self):
return "ClearEventQueue()"


class NoEvent(Action):
def __init__(self, duration: int = 3):
self._duration = duration
Expand Down
196 changes: 196 additions & 0 deletions test/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -2664,6 +2664,202 @@
),
},
),
Scenario(
"scenario11-4",
"Initate a transfers with multiple files. Wait for them to start and then stop the sender. Then restart sender and expect transfer to finish successfuly",
{
"DROP_PEER_REN": ActionList(
[
action.WaitForAnotherPeer("DROP_PEER_STIMPY"),
action.Start("DROP_PEER_REN", dbpath="/tmp/db/11-4-ren.sqlite"),
action.NewTransfer(
"DROP_PEER_STIMPY",
[
"/tmp/testfile-bulk-01",
"/tmp/testfile-bulk-02",
"/tmp/testfile-bulk-03",
"/tmp/testfile-bulk-04",
"/tmp/testfile-bulk-05",
"/tmp/testfile-bulk-06",
"/tmp/testfile-bulk-07",
"/tmp/testfile-bulk-08",
"/tmp/testfile-bulk-09",
"/tmp/testfile-bulk-10",
],
),
# fmt: off
action.Wait(
event.Queued(0, "DROP_PEER_STIMPY", [
norddrop.QueuedFile(FILES["testfile-bulk-01"].id, "testfile-bulk-01", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-02"].id, "testfile-bulk-02", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-03"].id, "testfile-bulk-03", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-04"].id, "testfile-bulk-04", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-05"].id, "testfile-bulk-05", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-06"].id, "testfile-bulk-06", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-07"].id, "testfile-bulk-07", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-08"].id, "testfile-bulk-08", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-09"].id, "testfile-bulk-09", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-10"].id, "testfile-bulk-10", 10485760, "/tmp"),
]),
),
# Wait for some of the files and the stop
action.Repeated([action.WaitForOneOf([
event.Start(0, FILES["testfile-bulk-01"].id),
event.Start(0, FILES["testfile-bulk-02"].id),
event.Start(0, FILES["testfile-bulk-03"].id),
event.Start(0, FILES["testfile-bulk-04"].id),
event.Start(0, FILES["testfile-bulk-05"].id),
event.Start(0, FILES["testfile-bulk-06"].id),
event.Start(0, FILES["testfile-bulk-07"].id),
event.Start(0, FILES["testfile-bulk-08"].id),
event.Start(0, FILES["testfile-bulk-09"].id),
event.Start(0, FILES["testfile-bulk-10"].id),
])], 4),
# fmt: on
action.Stop(),
# fmt: off
matszczygiel marked this conversation as resolved.
Show resolved Hide resolved
# Wait of paused events
action.WaitAndIgnoreExcept([
event.Paused(0, FILES["testfile-bulk-01"].id),
event.Paused(0, FILES["testfile-bulk-02"].id),
event.Paused(0, FILES["testfile-bulk-03"].id),
event.Paused(0, FILES["testfile-bulk-04"].id),
event.Paused(0, FILES["testfile-bulk-05"].id),
event.Paused(0, FILES["testfile-bulk-06"].id),
event.Paused(0, FILES["testfile-bulk-07"].id),
event.Paused(0, FILES["testfile-bulk-08"].id),
event.Paused(0, FILES["testfile-bulk-09"].id),
event.Paused(0, FILES["testfile-bulk-10"].id),
]),
# fmt: on
action.Sleep(2),
action.ClearEventQueue(),
LukasPukenis marked this conversation as resolved.
Show resolved Hide resolved
action.Start("DROP_PEER_REN", dbpath="/tmp/db/11-4-ren.sqlite"),
# fmt: off
# Wait for some of the files and the stop
action.WaitRacy([
event.Start(0, FILES["testfile-bulk-01"].id, None),
event.Start(0, FILES["testfile-bulk-02"].id, None),
event.Start(0, FILES["testfile-bulk-03"].id, None),
event.Start(0, FILES["testfile-bulk-04"].id, None),
event.Start(0, FILES["testfile-bulk-05"].id, None),
event.Start(0, FILES["testfile-bulk-06"].id, None),
event.Start(0, FILES["testfile-bulk-07"].id, None),
event.Start(0, FILES["testfile-bulk-08"].id, None),
event.Start(0, FILES["testfile-bulk-09"].id, None),
event.Start(0, FILES["testfile-bulk-10"].id, None),
event.FinishFileUploaded(0, FILES["testfile-bulk-01"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-02"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-03"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-04"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-05"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-06"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-07"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-08"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-09"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-10"].id),
]),
# fmt: on
action.ExpectCancel([0], True),
]
),
"DROP_PEER_STIMPY": ActionList(
[
action.Start("DROP_PEER_STIMPY"),
# fmt: off
action.Wait(
event.Receive(0, "DROP_PEER_REN", [
norddrop.ReceivedFile(FILES["testfile-bulk-01"].id, "testfile-bulk-01", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-02"].id, "testfile-bulk-02", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-03"].id, "testfile-bulk-03", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-04"].id, "testfile-bulk-04", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-05"].id, "testfile-bulk-05", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-06"].id, "testfile-bulk-06", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-07"].id, "testfile-bulk-07", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-08"].id, "testfile-bulk-08", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-09"].id, "testfile-bulk-09", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-10"].id, "testfile-bulk-10", 10485760),
]),
),
# fmt: on
# fmt: off
action.Download(0, FILES["testfile-bulk-01"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-02"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-03"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-04"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-05"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-06"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-07"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-08"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-09"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-10"].id, "/tmp/received/11-4"),
# Wait for all 10 pending events
action.Repeated([action.WaitForOneOf([
event.Pending(0, FILES["testfile-bulk-01"].id),
event.Pending(0, FILES["testfile-bulk-02"].id),
event.Pending(0, FILES["testfile-bulk-03"].id),
event.Pending(0, FILES["testfile-bulk-04"].id),
event.Pending(0, FILES["testfile-bulk-05"].id),
event.Pending(0, FILES["testfile-bulk-06"].id),
event.Pending(0, FILES["testfile-bulk-07"].id),
event.Pending(0, FILES["testfile-bulk-08"].id),
event.Pending(0, FILES["testfile-bulk-09"].id),
event.Pending(0, FILES["testfile-bulk-10"].id),
event.Start(0, FILES["testfile-bulk-01"].id),
event.Start(0, FILES["testfile-bulk-02"].id),
event.Start(0, FILES["testfile-bulk-03"].id),
event.Start(0, FILES["testfile-bulk-04"].id),
event.Start(0, FILES["testfile-bulk-05"].id),
event.Start(0, FILES["testfile-bulk-06"].id),
event.Start(0, FILES["testfile-bulk-07"].id),
event.Start(0, FILES["testfile-bulk-08"].id),
event.Start(0, FILES["testfile-bulk-09"].id),
event.Start(0, FILES["testfile-bulk-10"].id),
]),
], 10),
action.WaitAndIgnoreExcept([
event.Paused(0, FILES["testfile-bulk-01"].id),
event.Paused(0, FILES["testfile-bulk-02"].id),
event.Paused(0, FILES["testfile-bulk-03"].id),
event.Paused(0, FILES["testfile-bulk-04"].id),
event.Paused(0, FILES["testfile-bulk-05"].id),
event.Paused(0, FILES["testfile-bulk-06"].id),
event.Paused(0, FILES["testfile-bulk-07"].id),
event.Paused(0, FILES["testfile-bulk-08"].id),
event.Paused(0, FILES["testfile-bulk-09"].id),
event.Paused(0, FILES["testfile-bulk-10"].id),
]),
# The sender is stopped
action.WaitRacy([
event.Start(0, FILES["testfile-bulk-01"].id, None),
event.Start(0, FILES["testfile-bulk-02"].id, None),
event.Start(0, FILES["testfile-bulk-03"].id, None),
event.Start(0, FILES["testfile-bulk-04"].id, None),
event.Start(0, FILES["testfile-bulk-05"].id, None),
event.Start(0, FILES["testfile-bulk-06"].id, None),
event.Start(0, FILES["testfile-bulk-07"].id, None),
event.Start(0, FILES["testfile-bulk-08"].id, None),
event.Start(0, FILES["testfile-bulk-09"].id, None),
event.Start(0, FILES["testfile-bulk-10"].id, None),
event.FinishFileDownloaded(0, FILES["testfile-bulk-01"].id, "/tmp/received/11-4/testfile-bulk-01"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-02"].id, "/tmp/received/11-4/testfile-bulk-02"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-03"].id, "/tmp/received/11-4/testfile-bulk-03"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-04"].id, "/tmp/received/11-4/testfile-bulk-04"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-05"].id, "/tmp/received/11-4/testfile-bulk-05"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-06"].id, "/tmp/received/11-4/testfile-bulk-06"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-07"].id, "/tmp/received/11-4/testfile-bulk-07"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-08"].id, "/tmp/received/11-4/testfile-bulk-08"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-09"].id, "/tmp/received/11-4/testfile-bulk-09"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-10"].id, "/tmp/received/11-4/testfile-bulk-10"),
]),
# fmt: on
action.CancelTransferRequest([0]),
action.ExpectCancel([0], False),
action.Stop(),
]
),
},
),
Scenario(
"scenario12-1",
"Transfer file to two clients simultaneously",
Expand Down
Loading