Skip to content

Commit

Permalink
Pin future in RequestSender
Browse files Browse the repository at this point in the history
+ Bump deps
+ Remove deprecated feature
  • Loading branch information
bemyak committed Mar 24, 2023
1 parent d839039 commit 086f085
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 15 deletions.
13 changes: 5 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@ repository = "https://github.com/cetra3/tmq"
readme = "README.md"
edition = "2018"

[features]
zmq-vendored = ["zmq/vendored"]

[dependencies]
futures = { version = "0.3", default-features = false, features = ["alloc"] }
tokio = { version = "1.0", features = ["net"] }
zmq = ">=0.9.2"
tokio = { version = "1.26", features = ["net"] }
zmq = "0.10"
log = "0.4"
thiserror = "1"

[dev-dependencies]
tokio = { version = "1.0", features = ["full"] }
tokio = { version = "1", features = ["full"] }
pretty_env_logger = "0.4"
rand = "0.7"
criterion = "0.3"
rand = "0.8"
criterion = "0.4"

[[bench]]
name = "poll"
Expand Down
4 changes: 2 additions & 2 deletions examples/client_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn client(ctx: Rc<Context>, client_id: u64, frontend: String) -> tmq::Resu
vec![client_id.as_bytes(), request_id_str.as_bytes(), b"response"].into();
assert_eq!(expected, response);

let sleep_time = rng.gen_range(200, 1000);
let sleep_time = rng.gen_range(200..1000);
sleep(Duration::from_millis(sleep_time)).await;
request_id += 1;
}
Expand All @@ -62,7 +62,7 @@ async fn worker(ctx: Rc<Context>, worker_id: u64, backend: String) -> Result<(),
);

// simulate work
let sleep_time = rng.gen_range(100, 3000);
let sleep_time = rng.gen_range(100..3000);
sleep(Duration::from_millis(sleep_time)).await;

let response = vec![identity, client_id, request_id, "response".into()];
Expand Down
12 changes: 8 additions & 4 deletions src/socket_types/request_reply.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::pin::Pin;

use crate::{poll::ZmqPoller, FromZmqSocket, Multipart, SocketBuilder};
use zmq::{self, Context as ZmqContext};

Expand All @@ -23,8 +25,9 @@ impl_as_socket!(RequestSender, inner);

impl RequestSender {
/// Send a multipart message and return a `RequestReceiver`
pub async fn send(self, mut msg: Multipart) -> crate::Result<RequestReceiver> {
futures::future::poll_fn(|cx| self.inner.multipart_flush(cx, &mut msg)).await?;
pub async fn send(mut self, mut msg: Multipart) -> crate::Result<RequestReceiver> {
futures::future::poll_fn(|cx| Pin::new(&mut self.inner).multipart_flush(cx, &mut msg))
.await?;
Ok(RequestReceiver { inner: self.inner })
}
}
Expand All @@ -50,8 +53,9 @@ impl_as_socket!(RequestReceiver, inner);

impl RequestReceiver {
/// Receive a multipart message and return a `RequestSender`
pub async fn recv(self) -> crate::Result<(Multipart, RequestSender)> {
let msg = futures::future::poll_fn(|cx| self.inner.multipart_recv(cx)).await?;
pub async fn recv(mut self) -> crate::Result<(Multipart, RequestSender)> {
let msg =
futures::future::poll_fn(|cx| Pin::new(&mut self.inner).multipart_recv(cx)).await?;
Ok((msg, RequestSender { inner: self.inner }))
}
}
2 changes: 1 addition & 1 deletion tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub async fn hammer_receive<S: Stream<Item = Result<Multipart>> + Unpin>(
/// Helper functions
pub fn generate_tcp_address() -> String {
let mut rng = rand::thread_rng();
let port = rng.gen_range(2000, 65000);
let port = rng.gen_range(2000..65000);
format!("tcp://127.0.0.1:{}", port)
}

Expand Down

0 comments on commit 086f085

Please sign in to comment.