From a8619c80df64856350320b8fc5b93fb827c7a4ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89mile=20Gr=C3=A9goire?= Date: Mon, 1 Nov 2021 16:17:54 -0400 Subject: [PATCH] Fix leak of tracing::Span due to missing impl. Fix blocking async commands. --- CHANGELOG.md | 4 ++++ ffi/rodbus-bindings/Cargo.toml | 2 +- ffi/rodbus-ffi/Cargo.toml | 3 ++- ffi/rodbus-ffi/src/client.rs | 22 ++++++++++++---------- ffi/rodbus-ffi/src/logging.rs | 12 ++++++++++++ ffi/rodbus-ffi/src/runtime.rs | 9 ++++----- ffi/rodbus-ffi/src/server.rs | 4 ++-- ffi/rodbus-schema/Cargo.toml | 2 +- guide/docs/api/client/tcp_client.mdx | 3 +-- rodbus/src/client/task.rs | 2 +- rodbus/src/server/mod.rs | 11 +++++------ 11 files changed, 45 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a6ae4ca9..b51cccf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ ### Next ### +* Client callbacks are now not blocking. + See [#53](https://github.com/stepfunc/rodbus/pull/53). +* Fix leak of `tracing::Span` in bindings. + See [#53](https://github.com/stepfunc/rodbus/pull/53). * Add Linux AArch64 support in Java and .NET. See [#51](https://github.com/stepfunc/rodbus/pull/51). diff --git a/ffi/rodbus-bindings/Cargo.toml b/ffi/rodbus-bindings/Cargo.toml index b1024f5b..f074a0c5 100644 --- a/ffi/rodbus-bindings/Cargo.toml +++ b/ffi/rodbus-bindings/Cargo.toml @@ -11,6 +11,6 @@ repository = "https://github.com/stepfunc/rodbus" readme = "../README.md" [dependencies] -ci-script = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.3" } +ci-script = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.4" } rodbus-schema = { path = "../rodbus-schema" } rodbus-ffi = { path = "../rodbus-ffi" } diff --git a/ffi/rodbus-ffi/Cargo.toml b/ffi/rodbus-ffi/Cargo.toml index 390d6ead..1dbf963b 100644 --- a/ffi/rodbus-ffi/Cargo.toml +++ b/ffi/rodbus-ffi/Cargo.toml @@ -14,6 +14,7 @@ crate-type = ["rlib", "staticlib", "cdylib"] [dependencies] tracing = "0.1" +tracing-core = "0.1" tracing-subscriber = "0.2" rodbus = { path = "../../rodbus" } tokio = { version = "1.5", features = ["rt-multi-thread"]} @@ -21,4 +22,4 @@ num_cpus = "1" [build-dependencies] rodbus-schema = { path = "../rodbus-schema" } -rust-oo-bindgen = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.3" } +rust-oo-bindgen = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.4" } diff --git a/ffi/rodbus-ffi/src/client.rs b/ffi/rodbus-ffi/src/client.rs index eaffb82e..95cbfc95 100644 --- a/ffi/rodbus-ffi/src/client.rs +++ b/ffi/rodbus-ffi/src/client.rs @@ -51,7 +51,7 @@ pub(crate) unsafe fn channel_read_coils( let mut session = param.build_session(channel); channel .runtime - .block_on(session.read_coils(range, callback))?; + .spawn(async move { session.read_coils(range, callback).await })?; Ok(()) } @@ -69,7 +69,7 @@ pub(crate) unsafe fn channel_read_discrete_inputs( let mut session = param.build_session(channel); channel .runtime - .block_on(session.read_discrete_inputs(range, callback))?; + .spawn(async move { session.read_discrete_inputs(range, callback).await })?; Ok(()) } @@ -87,7 +87,7 @@ pub(crate) unsafe fn channel_read_holding_registers( let mut session = param.build_session(channel); channel .runtime - .block_on(session.read_holding_registers(range, callback))?; + .spawn(async move { session.read_holding_registers(range, callback).await })?; Ok(()) } @@ -105,7 +105,7 @@ pub(crate) unsafe fn channel_read_input_registers( let mut session = param.build_session(channel); channel .runtime - .block_on(session.read_input_registers(range, callback))?; + .spawn(async move { session.read_input_registers(range, callback).await })?; Ok(()) } @@ -122,7 +122,7 @@ pub(crate) unsafe fn channel_write_single_coil( let mut session = param.build_session(channel); channel .runtime - .block_on(session.write_single_coil(bit.into(), callback))?; + .spawn(async move { session.write_single_coil(bit.into(), callback).await })?; Ok(()) } @@ -137,9 +137,11 @@ pub(crate) unsafe fn channel_write_single_register( let callback = callback.convert_to_fn_once(); let mut session = param.build_session(channel); - channel - .runtime - .block_on(session.write_single_register(register.into(), callback))?; + channel.runtime.spawn(async move { + session + .write_single_register(register.into(), callback) + .await + })?; Ok(()) } @@ -159,7 +161,7 @@ pub(crate) unsafe fn channel_write_multiple_coils( let mut session = param.build_session(channel); channel .runtime - .block_on(session.write_multiple_coils(args, callback))?; + .spawn(async move { session.write_multiple_coils(args, callback).await })?; Ok(()) } @@ -179,7 +181,7 @@ pub(crate) unsafe fn channel_write_multiple_registers( let mut session = param.build_session(channel); channel .runtime - .block_on(session.write_multiple_registers(args, callback))?; + .spawn(async move { session.write_multiple_registers(args, callback).await })?; Ok(()) } diff --git a/ffi/rodbus-ffi/src/logging.rs b/ffi/rodbus-ffi/src/logging.rs index cb3a232a..ee9eacae 100644 --- a/ffi/rodbus-ffi/src/logging.rs +++ b/ffi/rodbus-ffi/src/logging.rs @@ -129,6 +129,18 @@ impl tracing::Subscriber for Adapter { fn exit(&self, span: &Id) { self.inner.exit(span) } + + fn clone_span(&self, span: &Id) -> Id { + self.inner.clone_span(span) + } + + fn try_close(&self, span: Id) -> bool { + self.inner.try_close(span) + } + + fn current_span(&self) -> tracing_core::span::Current { + self.inner.current_span() + } } impl From for ffi::LogLevel { diff --git a/ffi/rodbus-ffi/src/runtime.rs b/ffi/rodbus-ffi/src/runtime.rs index 0effc88e..40e25838 100644 --- a/ffi/rodbus-ffi/src/runtime.rs +++ b/ffi/rodbus-ffi/src/runtime.rs @@ -1,7 +1,6 @@ use std::future::Future; use crate::ffi; -use tokio::runtime::Handle; pub struct Runtime { pub(crate) inner: std::sync::Arc, @@ -27,7 +26,7 @@ pub(crate) struct RuntimeHandle { } impl RuntimeHandle { - pub(crate) fn block_on(&self, future: F) -> Result { + /*pub(crate) fn block_on(&self, future: F) -> Result { let inner = self .inner .upgrade() @@ -36,9 +35,9 @@ impl RuntimeHandle { return Err(ffi::ParamError::RuntimeCannotBlockWithinAsync); } Ok(inner.block_on(future)) - } + }*/ - /*pub(crate) fn spawn(&self, future: F) -> Result<(), ffi::ParamError> + pub(crate) fn spawn(&self, future: F) -> Result<(), ffi::ParamError> where F: Future + Send + 'static, F::Output: Send + 'static, @@ -49,7 +48,7 @@ impl RuntimeHandle { .ok_or(ffi::ParamError::RuntimeDestroyed)?; inner.spawn(future); Ok(()) - }*/ + } } fn build_runtime(f: F) -> std::result::Result diff --git a/ffi/rodbus-ffi/src/server.rs b/ffi/rodbus-ffi/src/server.rs index 0711d9d3..11afe65e 100644 --- a/ffi/rodbus-ffi/src/server.rs +++ b/ffi/rodbus-ffi/src/server.rs @@ -184,10 +184,10 @@ pub(crate) unsafe fn create_tcp_server( decode_level.into(), )) .map_err(|_| ffi::ParamError::ServerBindError)?; - let join_handle = runtime.inner.spawn(task); + runtime.inner.spawn(task); let server_handle = Server { - _server: ServerHandle::new(tx, join_handle), + _server: ServerHandle::new(tx), map: handler_map, }; diff --git a/ffi/rodbus-schema/Cargo.toml b/ffi/rodbus-schema/Cargo.toml index 9d5642aa..98aeede1 100644 --- a/ffi/rodbus-schema/Cargo.toml +++ b/ffi/rodbus-schema/Cargo.toml @@ -11,5 +11,5 @@ repository = "https://github.com/stepfunc/rodbus" readme = "../README.md" [dependencies] -oo-bindgen = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.3" } +oo-bindgen = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.4" } rodbus = { path = "../../rodbus" } diff --git a/guide/docs/api/client/tcp_client.mdx b/guide/docs/api/client/tcp_client.mdx index 3e9b9905..77f486ac 100644 --- a/guide/docs/api/client/tcp_client.mdx +++ b/guide/docs/api/client/tcp_client.mdx @@ -60,8 +60,7 @@ values={[ ## Maximum Queued Requests -Each channel sends one request at a time and has a fixed-length buffer of requests to send. If the queue is full when demanding -a request, this call **will block** until the queue has enough space. +Each channel sends one request at a time and has a fixed-length buffer of requests to send. ## Endpoint Configuration diff --git a/rodbus/src/client/task.rs b/rodbus/src/client/task.rs index 64f00523..14bcb72c 100644 --- a/rodbus/src/client/task.rs +++ b/rodbus/src/client/task.rs @@ -91,7 +91,7 @@ where tracing::warn!("error occurred making request: {}", e); } - result.as_ref().err().and_then(|e| SessionError::from(e)) + result.as_ref().err().and_then(SessionError::from) } async fn execute_request( diff --git a/rodbus/src/server/mod.rs b/rodbus/src/server/mod.rs index dad9331a..df9d38ff 100644 --- a/rodbus/src/server/mod.rs +++ b/rodbus/src/server/mod.rs @@ -20,16 +20,15 @@ pub use types::*; /// A handle to the server async task. The task is shutdown when the handle is dropped. #[derive(Debug)] pub struct ServerHandle { - tx: tokio::sync::mpsc::Sender<()>, - handle: tokio::task::JoinHandle<()>, + _tx: tokio::sync::mpsc::Sender<()>, } impl ServerHandle { /// Construct a [ServerHandle] from its fields /// /// This function is only required for the C bindings - pub fn new(tx: tokio::sync::mpsc::Sender<()>, handle: tokio::task::JoinHandle<()>) -> Self { - ServerHandle { tx, handle } + pub fn new(tx: tokio::sync::mpsc::Sender<()>) -> Self { + ServerHandle { _tx: tx } } } @@ -52,7 +51,7 @@ pub async fn spawn_tcp_server_task( let listener = crate::tokio::net::TcpListener::bind(addr).await?; let (tx, rx) = tokio::sync::mpsc::channel(1); - let handle = tokio::spawn(create_tcp_server_task_impl( + tokio::spawn(create_tcp_server_task_impl( rx, max_sessions, addr, @@ -61,7 +60,7 @@ pub async fn spawn_tcp_server_task( decode, )); - Ok(ServerHandle::new(tx, handle)) + Ok(ServerHandle::new(tx)) } /// Creates a TCP server task that can then be spawned onto the runtime manually.