Skip to content

Commit

Permalink
Merge branch 'main' into release/v7-moose-no-context
Browse files Browse the repository at this point in the history
  • Loading branch information
matszczygiel committed May 23, 2024
2 parents bf87696 + b1e9f40 commit da73d27
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/analytics-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
default: true
- name: docker sdk
run: |
pip3 install docker
pip3 install -r test/requirements.txt
- name: Print docker version
run: docker version
- name: Build Docker image
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
default: true
- name: docker sdk
run: |
pip3 install docker
pip3 install -r test/requirements.txt
- name: Print docker version
run: docker version
- name: Build Docker image
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This means the whole API has changed. Even though semantics are the same
the mechanism is now different and requires new implementation to use properly.
* Split checksum events into finalize and verify
* Add `base_dir` field in the `RequestQueued` event files
* Fix rare issue of missing receiver's in-progress events

---
<br>
Expand Down
1 change: 1 addition & 0 deletions drop-transfer/src/ws/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub trait HandlerLoop {
async fn on_bin_msg(&mut self, ws: &mut WebSocket, bytes: Vec<u8>) -> anyhow::Result<()>;

async fn finalize_success(self);
async fn finalize_failure(self);
}

pub trait Request {
Expand Down
1 change: 1 addition & 0 deletions drop-transfer/src/ws/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ impl RunContext<'_> {
"WS connection broke for {}: {err:?}",
xfer.id()
);
handler.finalize_failure().await;
} else {
info!(self.logger, "Sucesfully finalizing transfer loop");
handler.finalize_success().await;
Expand Down
28 changes: 19 additions & 9 deletions drop-transfer/src/ws/server/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,18 @@ impl<const PING: bool> HandlerLoop<'_, PING> {
self.stop_task(&file_id, Status::FileRejected).await;
}
}

fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
let jobs = std::mem::take(&mut self.jobs);

async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -359,20 +371,18 @@ impl<const PING: bool> handler::HandlerLoop for HandlerLoop<'_, PING> {
async fn finalize_success(mut self) {
debug!(self.logger, "Finalizing");
}

// While the destructor ensures the events are paused this function waits for
// the execution to be finished
async fn finalize_failure(mut self) {
self.take_pause_futures().await;
}
}

impl<const PING: bool> Drop for HandlerLoop<'_, PING> {
fn drop(&mut self) {
debug!(self.logger, "Stopping server handler");

let jobs = std::mem::take(&mut self.jobs);
tokio::spawn(async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
});
tokio::spawn(self.take_pause_futures());
}
}

Expand Down
28 changes: 19 additions & 9 deletions drop-transfer/src/ws/server/v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,18 @@ impl HandlerLoop<'_> {
}
}
}

fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
let jobs = std::mem::take(&mut self.jobs);

async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -476,20 +488,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
);
}

// While the destructor ensures the events are paused this function waits for
// the execution to be finished
async fn finalize_failure(mut self) {
self.take_pause_futures().await;
}
}

impl Drop for HandlerLoop<'_> {
fn drop(&mut self) {
debug!(self.logger, "Stopping server handler");

let jobs = std::mem::take(&mut self.jobs);
tokio::spawn(async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
});
tokio::spawn(self.take_pause_futures());
}
}

Expand Down
28 changes: 19 additions & 9 deletions drop-transfer/src/ws/server/v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,18 @@ impl HandlerLoop<'_> {
}
}
}

fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
let jobs = std::mem::take(&mut self.jobs);

async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -535,20 +547,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
);
}

// While the destructor ensures the events are paused this function waits for
// the execution to be finished
async fn finalize_failure(mut self) {
self.take_pause_futures().await;
}
}

impl Drop for HandlerLoop<'_> {
fn drop(&mut self) {
debug!(self.logger, "Stopping server handler");

let jobs = std::mem::take(&mut self.jobs);
tokio::spawn(async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
});
tokio::spawn(self.take_pause_futures());
}
}

Expand Down
3 changes: 0 additions & 3 deletions go.mod

This file was deleted.

11 changes: 11 additions & 0 deletions test/drop_test/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,17 @@ def __str__(self):
return f"DrainEvents({self._count})"


class ClearEventQueue(Action):
def __init__(self):
pass

async def run(self, drop: ffi.Drop):
_ = await drop._events.gather_all(0)

def __str__(self):
return "ClearEventQueue()"


class NoEvent(Action):
def __init__(self, duration: int = 3):
self._duration = duration
Expand Down
2 changes: 2 additions & 0 deletions test/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
osxmetadata ~= 1.2; sys_platform == 'darwin'
requests < 2.32.0
docker == 7.0.0
Loading

0 comments on commit da73d27

Please sign in to comment.