From 086f085398746d1f8009ac5d87c4288178a4619f Mon Sep 17 00:00:00 2001 From: Sergei Gureev Date: Fri, 24 Mar 2023 09:53:23 +0200 Subject: [PATCH] Pin future in RequestSender + Bump deps + Remove deprecated feature --- Cargo.toml | 13 +++++-------- examples/client_worker.rs | 4 ++-- src/socket_types/request_reply.rs | 12 ++++++++---- tests/utils.rs | 2 +- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d354130..45dcb2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,21 +8,18 @@ repository = "https://github.com/cetra3/tmq" readme = "README.md" edition = "2018" -[features] -zmq-vendored = ["zmq/vendored"] - [dependencies] futures = { version = "0.3", default-features = false, features = ["alloc"] } -tokio = { version = "1.0", features = ["net"] } -zmq = ">=0.9.2" +tokio = { version = "1.26", features = ["net"] } +zmq = "0.10" log = "0.4" thiserror = "1" [dev-dependencies] -tokio = { version = "1.0", features = ["full"] } +tokio = { version = "1", features = ["full"] } pretty_env_logger = "0.4" -rand = "0.7" -criterion = "0.3" +rand = "0.8" +criterion = "0.4" [[bench]] name = "poll" diff --git a/examples/client_worker.rs b/examples/client_worker.rs index 20b39db..ed99ff6 100644 --- a/examples/client_worker.rs +++ b/examples/client_worker.rs @@ -37,7 +37,7 @@ async fn client(ctx: Rc, client_id: u64, frontend: String) -> tmq::Resu vec![client_id.as_bytes(), request_id_str.as_bytes(), b"response"].into(); assert_eq!(expected, response); - let sleep_time = rng.gen_range(200, 1000); + let sleep_time = rng.gen_range(200..1000); sleep(Duration::from_millis(sleep_time)).await; request_id += 1; } @@ -62,7 +62,7 @@ async fn worker(ctx: Rc, worker_id: u64, backend: String) -> Result<(), ); // simulate work - let sleep_time = rng.gen_range(100, 3000); + let sleep_time = rng.gen_range(100..3000); sleep(Duration::from_millis(sleep_time)).await; let response = vec![identity, client_id, request_id, "response".into()]; diff --git a/src/socket_types/request_reply.rs b/src/socket_types/request_reply.rs index 4436411..e47e03d 100644 --- a/src/socket_types/request_reply.rs +++ b/src/socket_types/request_reply.rs @@ -1,3 +1,5 @@ +use std::pin::Pin; + use crate::{poll::ZmqPoller, FromZmqSocket, Multipart, SocketBuilder}; use zmq::{self, Context as ZmqContext}; @@ -23,8 +25,9 @@ impl_as_socket!(RequestSender, inner); impl RequestSender { /// Send a multipart message and return a `RequestReceiver` - pub async fn send(self, mut msg: Multipart) -> crate::Result { - futures::future::poll_fn(|cx| self.inner.multipart_flush(cx, &mut msg)).await?; + pub async fn send(mut self, mut msg: Multipart) -> crate::Result { + futures::future::poll_fn(|cx| Pin::new(&mut self.inner).multipart_flush(cx, &mut msg)) + .await?; Ok(RequestReceiver { inner: self.inner }) } } @@ -50,8 +53,9 @@ impl_as_socket!(RequestReceiver, inner); impl RequestReceiver { /// Receive a multipart message and return a `RequestSender` - pub async fn recv(self) -> crate::Result<(Multipart, RequestSender)> { - let msg = futures::future::poll_fn(|cx| self.inner.multipart_recv(cx)).await?; + pub async fn recv(mut self) -> crate::Result<(Multipart, RequestSender)> { + let msg = + futures::future::poll_fn(|cx| Pin::new(&mut self.inner).multipart_recv(cx)).await?; Ok((msg, RequestSender { inner: self.inner })) } } diff --git a/tests/utils.rs b/tests/utils.rs index 67298b4..f402b99 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -225,7 +225,7 @@ pub async fn hammer_receive> + Unpin>( /// Helper functions pub fn generate_tcp_address() -> String { let mut rng = rand::thread_rng(); - let port = rng.gen_range(2000, 65000); + let port = rng.gen_range(2000..65000); format!("tcp://127.0.0.1:{}", port) }