Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to build a thread_local client #302

Closed
Forsworns opened this issue Nov 4, 2022 · 3 comments
Closed

How to build a thread_local client #302

Forsworns opened this issue Nov 4, 2022 · 3 comments

Comments

@Forsworns
Copy link

In fact , this issue is to share the problem I met during building a thread_local capnp-rpc client.

Intuitively, I create a thread_local client as

thread_local! {
    pub static CLIENT: RefCell<HelloServiceClientImpl> = {
        let client = HelloServiceClientImpl::new(ADDRESS)
            RefCell::new(client)
    };
}

See tokio-rs/tokio#5162, I get an error in 'cannot access a Thread Local Storage value during or after destruction: AccessError'

After struggling, now I found a workaround and it works well. Now the thread_local variable is defined as

thread_local! {
    pub static CLIENT: Cell<Option<HelloServiceClientImpl>> = Cell::new(None);
}

pub fn build_capn_client(){
    CLIENT.with(|f|{
        let client = 
        tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(HelloServiceClientImpl::new(ADDRESS))
                .unwrap();
        f.set(Some(client))
    });
}


pub fn destory_capn_client(){
    CLIENT.with(|f|{
        f.take().unwrap();
    });
}
@Forsworns
Copy link
Author

In case that someone interested in the detailed implementation of HelloServiceClientImpl:

I separate the connect and request steps into different methods, so that the request can reuse the same connection. Then we have to make sure the requests are in the same tokio::task::LocalSet. Therefore, the client needs to be defined as follows:

use crate::hello_capnp::hello_service::{hello_results::Owned as RespOwned, Client};

pub struct HelloServiceClientImpl {
    path: String,
    rt: Rc<Runtime>,
    client: Option<Rc<Client>>,
    local: Rc<LocalSet>,
}

impl HelloServiceClientImpl {
    pub fn new(path: &str) -> Result<HelloServiceClientImpl> {
        Ok(HelloServiceClientImpl {
            path: path.to_string(),
            rt: Rc::new(
                tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                    .unwrap(),
            ),
            local: Rc::new(LocalSet::new()),
            client: None,
        })
    }

    async fn connect(&self, path: impl ToSocketAddrs) -> Client {
        self.local
            .run_until(async move {
                let addr = path
                    .to_socket_addrs()
                    .unwrap()
                    .next()
                    .expect("could not parse address");
                let stream = tokio::net::TcpStream::connect(&addr)
                    .await
                    .expect("Cannot connect to the addr");
                stream
                    .set_nodelay(crate::TCP_NO_DELAY)
                    .expect("Cannot configure nuggle.");
                let (reader, writer) =
                    tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
                let rpc_network = Box::new(twoparty::VatNetwork::new(
                    reader,
                    writer,
                    rpc_twoparty_capnp::Side::Client,
                    ReaderOptions {
                        traversal_limit_in_words: Some(super::MAX_MESSAGE_SIZE),
                        ..Default::default()
                    },
                ));
                let mut rpc_system = RpcSystem::new(rpc_network, None);
                let client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
                tokio::task::spawn_local(rpc_system);
                client
            })
            .await
    }

    async fn request_async(
        client: &Client,
        req: &[u8],
    ) -> Result<Response<RespOwned>, capnp::Error> {
        // real service request here
    }

    fn request(&mut self, req: Vec<u8>) -> Result<Vec<u8>> {
        if self.client.is_none() {
            self.client = Some(Rc::new(self.rt.block_on(self.connect(self.path.clone()))));
        }
        let client = self.client.clone();
        let local = self.local.clone();
        let resp = self.rt.block_on(async move {
            local
                .run_until(async move {
                    let resp =
                        HelloServiceClientImpl::request_async(client.as_ref().unwrap(), &req)
                            .await
                            .unwrap();
                    resp
                })
                .await
        });
        let resp = resp.get()?.get_resp()?.get_resp_data()?;
        Ok(Vec::from(resp))
    }
}

@dwrensha
Copy link
Member

dwrensha commented Nov 7, 2022

Is there a specific question that you are asking here? I don't recommend using thread local storage like that, but it looks like you managed to get it to work.

@Forsworns
Copy link
Author

In fact, no. Just share a problem I encountered with TLS :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants