Skip to content

Commit

Permalink
refactor: simplify UDP server receiver
Browse files Browse the repository at this point in the history
It only gets new UDP requests, whitout spwaning tasks to handle them.
  • Loading branch information
josecelano committed Jun 25, 2024
1 parent a5e2baf commit 35b6c84
Showing 1 changed file with 11 additions and 16 deletions.
27 changes: 11 additions & 16 deletions src/servers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,22 +290,20 @@ impl Debug for BoundSocket {

struct Receiver {
bound_socket: Arc<BoundSocket>,
tracker: Arc<Tracker>,
data: RefCell<[u8; MAX_PACKET_SIZE]>,
}

impl Receiver {
pub fn new(bound_socket: Arc<BoundSocket>, tracker: Arc<Tracker>) -> Self {
pub fn new(bound_socket: Arc<BoundSocket>) -> Self {
Receiver {
bound_socket,
tracker,
data: RefCell::new([0; MAX_PACKET_SIZE]),
}
}
}

impl Stream for Receiver {
type Item = std::io::Result<AbortHandle>;
type Item = std::io::Result<UdpRequest>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buf = *self.data.borrow_mut();
Expand All @@ -319,13 +317,7 @@ impl Stream for Receiver {
Ok(from) => {
let payload = buf.filled().to_vec();
let request = UdpRequest { payload, from };

Some(Ok(tokio::task::spawn(Udp::process_request(
request,
self.tracker.clone(),
self.bound_socket.clone(),
))
.abort_handle()))
Some(Ok(request))
}
Err(err) => Some(Err(err)),
};
Expand Down Expand Up @@ -375,15 +367,15 @@ impl Udp {

tracing::info!(target: UDP_TRACKER_LOG_TARGET, "{STARTED_ON}: {local_udp_url}");

let receiver = Receiver::new(bound_socket.into(), tracker);
let receiver = Receiver::new(bound_socket.into());

tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (spawning main loop)");

let running = {
let local_addr = local_udp_url.clone();
tokio::task::spawn(async move {
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)");
let () = Self::run_udp_server_main(receiver).await;
let () = Self::run_udp_server_main(receiver, tracker.clone()).await;
})
};

Expand All @@ -404,7 +396,7 @@ impl Udp {
tokio::task::yield_now().await; // lets allow the other threads to complete.
}

async fn run_udp_server_main(mut receiver: Receiver) {
async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc<Tracker>) {
let reqs = &mut ActiveRequests::default();

let addr = receiver.bound_socket.local_addr();
Expand All @@ -429,12 +421,15 @@ impl Udp {
}
};

if req.is_finished() {
let abort_handle =
tokio::task::spawn(Udp::process_request(req, tracker.clone(), receiver.bound_socket.clone())).abort_handle();

if abort_handle.is_finished() {
continue;
}

// fill buffer with requests
let Err(req) = reqs.rb.try_push(req) else {
let Err(req) = reqs.rb.try_push(abort_handle) else {
continue;
};

Expand Down

0 comments on commit 35b6c84

Please sign in to comment.