From 241e6ca6b39b93c160532401521e47682c87a725 Mon Sep 17 00:00:00 2001 From: clabby Date: Tue, 4 Jun 2024 01:34:45 -0400 Subject: [PATCH] \## Overview Simplifies the host program by removing the race between the native client / native server. --- bin/host/src/main.rs | 114 ++++++++------------------------ bin/programs/client/src/main.rs | 2 +- crates/common-proc/src/lib.rs | 11 ++- 3 files changed, 37 insertions(+), 90 deletions(-) diff --git a/bin/host/src/main.rs b/bin/host/src/main.rs index 2e53c01c6..7d760ed2d 100644 --- a/bin/host/src/main.rs +++ b/bin/host/src/main.rs @@ -16,14 +16,7 @@ use std::{ panic::AssertUnwindSafe, sync::Arc, }; -use tokio::{ - process::Command, - sync::{ - watch::{Receiver, Sender}, - RwLock, - }, - task, -}; +use tokio::{process::Command, sync::RwLock, task}; use tracing::{error, info}; use types::NativePipeFiles; @@ -90,22 +83,12 @@ async fn start_server_and_native_client(cfg: HostCli) -> Result<()> { Arc::new(RwLock::new(Fetcher::new(kv_store.clone(), l1_provider, l2_provider))) }); - // Create a channel to signal the server and the client program to exit. - let (tx_server, rx_server) = tokio::sync::watch::channel(()); - let (tx_program, rx_program) = (tx_server.clone(), rx_server.clone()); - // Create the server and start it. - let server_task = task::spawn(start_native_preimage_server( - kv_store, - fetcher, - preimage_pipe, - hint_pipe, - tx_server, - rx_server, - )); + let server_task = + task::spawn(start_native_preimage_server(kv_store, fetcher, preimage_pipe, hint_pipe)); // Start the client program in a separate child process. - let program_task = task::spawn(start_native_client_program(cfg, files, tx_program, rx_program)); + let program_task = task::spawn(start_native_client_program(cfg, files)); // Execute both tasks and wait for them to complete. info!("Starting preimage server and client program."); @@ -126,8 +109,6 @@ async fn start_native_preimage_server( fetcher: Option>>>, preimage_pipe: PipeHandle, hint_pipe: PipeHandle, - tx: Sender<()>, - mut rx: Receiver<()>, ) -> Result<()> where KV: KeyValueStore + Send + Sync + ?Sized + 'static, @@ -136,34 +117,17 @@ where let hint_reader = HintReader::new(hint_pipe); let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher); - - let server_pair_task = task::spawn(async move { - AssertUnwindSafe(server.start()) - .catch_unwind() - .await - .map_err(|_| { - error!(target: "preimage_server", "Preimage server panicked"); - anyhow!("Preimage server panicked") - })? - .map_err(|e| { - error!(target: "preimage_server", "Preimage server exited with an error"); - anyhow!("Preimage server exited with an error: {:?}", e) - }) - }); - let rx_server_task = task::spawn(async move { rx.changed().await }); - - // Block the current task until either the client program exits or the server exits. - tokio::select! { - _ = rx_server_task => { - info!(target: "preimage_server", "Received shutdown signal from preimage server task.") - }, - res = util::flatten_join_result(server_pair_task) => { - res?; - } - } - - // Signal to the client program that the server has exited. - let _ = tx.send(()); + AssertUnwindSafe(server.start()) + .catch_unwind() + .await + .map_err(|_| { + error!(target: "preimage_server", "Preimage server panicked"); + anyhow!("Preimage server panicked") + })? + .map_err(|e| { + error!(target: "preimage_server", "Preimage server exited with an error"); + anyhow!("Preimage server exited with an error: {:?}", e) + })?; info!("Preimage server has exited."); Ok(()) @@ -181,12 +145,7 @@ where /// ## Returns /// - `Ok(())` if the client program exits successfully. /// - `Err(_)` if the client program exits with a non-zero status. -async fn start_native_client_program( - cfg: HostCli, - files: NativePipeFiles, - tx: Sender<()>, - mut rx: Receiver<()>, -) -> Result<()> { +async fn start_native_client_program(cfg: HostCli, files: NativePipeFiles) -> Result<()> { // Map the file descriptors to the standard streams and the preimage oracle and hint // reader's special file descriptors. let mut command = Command::new(cfg.exec); @@ -202,37 +161,20 @@ async fn start_native_client_program( ]) .expect("No errors may occur when mapping file descriptors."); - let exec_task = task::spawn(async move { - let status = command - .status() - .await - .map_err(|e| { - error!(target: "client_program", "Failed to execute client program: {:?}", e); - anyhow!("Failed to execute client program: {:?}", e) - })? - .success(); - Ok::<_, anyhow::Error>(status) - }); - let rx_program_task = task::spawn(async move { rx.changed().await }); - - // Block the current task until either the client program exits or the server exits. - tokio::select! { - _ = rx_program_task => { - info!(target: "client_program", "Received shutdown signal from preimage server task.") - }, - res = util::flatten_join_result(exec_task) => { - if !(res?) { - // Signal to the preimage server that the client program has exited. - let _ = tx.send(()); - error!(target: "client_program", "Client program exited with a non-zero status."); - return Err(anyhow!("Client program exited with a non-zero status.")); - } - } + let status = command + .status() + .await + .map_err(|e| { + error!(target: "client_program", "Failed to execute client program: {:?}", e); + anyhow!("Failed to execute client program: {:?}", e) + })? + .success(); + + if !status { + error!(target: "client_program", "Client program exited with a non-zero status."); + return Err(anyhow!("Client program exited with a non-zero status.")); } - // Signal to the preimage server that the client program has exited. - let _ = tx.send(()); - info!(target: "client_program", "Client program has exited."); Ok(()) } diff --git a/bin/programs/client/src/main.rs b/bin/programs/client/src/main.rs index 8d4ec6248..ba152ab7b 100644 --- a/bin/programs/client/src/main.rs +++ b/bin/programs/client/src/main.rs @@ -17,6 +17,6 @@ fn main() -> Result<()> { let caching_oracle = CachingOracle::<16>::new(); let boot = BootInfo::load(&caching_oracle).await?; io::print(&alloc::format!("{:?}\n", boot)); - Ok(()) + Ok::<_, anyhow::Error>(()) }) } diff --git a/crates/common-proc/src/lib.rs b/crates/common-proc/src/lib.rs index 7d23bca5b..502b42dc2 100644 --- a/crates/common-proc/src/lib.rs +++ b/crates/common-proc/src/lib.rs @@ -33,7 +33,13 @@ pub fn client_entry(attr: TokenStream, input: TokenStream) -> TokenStream { use anyhow::Result as AnyhowResult; fn #fn_name() -> AnyhowResult<()> { - #fn_body + match #fn_body { + Ok(_) => kona_common::io::exit(0), + Err(e) => { + kona_common::io::print_err(alloc::format!("Program encountered fatal error: {:?}\n", e).as_ref()); + kona_common::io::exit(1); + } + } } cfg_if::cfg_if! { @@ -43,8 +49,7 @@ pub fn client_entry(attr: TokenStream, input: TokenStream) -> TokenStream { #[no_mangle] pub extern "C" fn _start() { kona_common::alloc_heap!(HEAP_SIZE); - #fn_name().unwrap(); - kona_common::io::exit(0); + #fn_name() } #[panic_handler]