Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WsClient::on_disconnect is not cancel-safe/footgun #996

Closed
xlc opened this issue Jan 31, 2023 · 2 comments
Closed

WsClient::on_disconnect is not cancel-safe/footgun #996

xlc opened this issue Jan 31, 2023 · 2 comments

Comments

@xlc
Copy link
Contributor

xlc commented Jan 31, 2023

/// Completes when the client is disconnected or the client's background task encountered an error.
/// If the client is already disconnected, the future produced by this method will complete immediately.
///
/// # Cancel safety
///
/// This method is cancel safe.
pub async fn on_disconnect(&self) {
// Wait until the `background_task` exits.
let mut notify_lock = self.notify.lock().await;
if let Some(notify) = notify_lock.take() {
let _ = notify.await;
}
}

on_disconnect can only be called once which makes this code incorrect https://github.com/AcalaNetwork/subway/blob/1026a5110837f96564732922ada96c9800dd495d/src/client/mod.rs#L114

We need to either improve the docs & API design to ensure the repeated call either works as expected or fail with some error indicate repeated call is not supported.

@niklasad1
Copy link
Member

niklasad1 commented Jan 31, 2023

EDIT: The documentation is wrong the method is not cancel-safe because after notify.lock() has been called and then Option::take is called and each call after that point will be regarded as disconnected.

So for now you need use futures::future::select and handle the future which isn't resolved and poll it again to workaround that when using tokio::select!.

If you do this instead it should work on the latest release:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
	let filter = tracing_subscriber::EnvFilter::try_from_default_env()?
		.add_directive("jsonrpsee[method_call{name = \"say_hello\"}]=trace".parse()?);

	tracing_subscriber::FmtSubscriber::builder().with_env_filter(filter).finish().try_init()?;

	let interval = interval(Duration::from_millis(200));
	let stream = IntervalStream::new(interval).map(move |_| 1);

	let addr = run_server().await?;
	let url = format!("ws://{}", addr);

	let client = WsClientBuilder::default().build(&url).await?;

	let disconnect = client.on_disconnect();

	tokio::pin!(disconnect, stream);

	let mut next_event = stream.next();

	loop {
		match futures::future::select(disconnect, next_event).await {
			Either::Left((_, _)) => {
				// drop the stream here because the client is disconnected.
				break;
			}
			Either::Right((item, disconnect_fut)) => {
				println!("stream item: {:?}", item);
				next_event = stream.next();
				disconnect = disconnect_fut;
			}
		}
	}

	Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
	let server = ServerBuilder::default().build("127.0.0.1:0").await?;
	let mut module = RpcModule::new(());
	module.register_method("say_hello", |_, _| Ok("lo"))?;
	let addr = server.local_addr()?;
	let handle = server.start(module)?;

	// In this example we don't care about doing shutdown so let's it run forever.
	// You may use the `ServerHandle` to shut it down or manage it yourself.
	tokio::spawn(async move {
		tokio::time::sleep(std::time::Duration::from_secs(20)).await;
		println!("server stopped");
		handle.stop().unwrap();
	});

	Ok(addr)
}

@niklasad1 niklasad1 changed the title on_disconnect is a footgun WsClient::on_disconnect is not cancel-safe/footgun Jan 31, 2023
@niklasad1
Copy link
Member

Closed by #999

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants