Skip to content

Commit

Permalink
\## Overview
Browse files Browse the repository at this point in the history
Simplifies the host program by removing the race between the native
client / native server.
  • Loading branch information
clabby committed Jun 4, 2024
1 parent 8c920a2 commit 241e6ca
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 90 deletions.
114 changes: 28 additions & 86 deletions bin/host/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,7 @@ use std::{
panic::AssertUnwindSafe,
sync::Arc,
};
use tokio::{
process::Command,
sync::{
watch::{Receiver, Sender},
RwLock,
},
task,
};
use tokio::{process::Command, sync::RwLock, task};
use tracing::{error, info};
use types::NativePipeFiles;

Expand Down Expand Up @@ -90,22 +83,12 @@ async fn start_server_and_native_client(cfg: HostCli) -> Result<()> {
Arc::new(RwLock::new(Fetcher::new(kv_store.clone(), l1_provider, l2_provider)))
});

// Create a channel to signal the server and the client program to exit.
let (tx_server, rx_server) = tokio::sync::watch::channel(());
let (tx_program, rx_program) = (tx_server.clone(), rx_server.clone());

// Create the server and start it.
let server_task = task::spawn(start_native_preimage_server(
kv_store,
fetcher,
preimage_pipe,
hint_pipe,
tx_server,
rx_server,
));
let server_task =
task::spawn(start_native_preimage_server(kv_store, fetcher, preimage_pipe, hint_pipe));

// Start the client program in a separate child process.
let program_task = task::spawn(start_native_client_program(cfg, files, tx_program, rx_program));
let program_task = task::spawn(start_native_client_program(cfg, files));

// Execute both tasks and wait for them to complete.
info!("Starting preimage server and client program.");
Expand All @@ -126,8 +109,6 @@ async fn start_native_preimage_server<KV>(
fetcher: Option<Arc<RwLock<Fetcher<KV>>>>,
preimage_pipe: PipeHandle,
hint_pipe: PipeHandle,
tx: Sender<()>,
mut rx: Receiver<()>,
) -> Result<()>
where
KV: KeyValueStore + Send + Sync + ?Sized + 'static,
Expand All @@ -136,34 +117,17 @@ where
let hint_reader = HintReader::new(hint_pipe);

let server = PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher);

let server_pair_task = task::spawn(async move {
AssertUnwindSafe(server.start())
.catch_unwind()
.await
.map_err(|_| {
error!(target: "preimage_server", "Preimage server panicked");
anyhow!("Preimage server panicked")
})?
.map_err(|e| {
error!(target: "preimage_server", "Preimage server exited with an error");
anyhow!("Preimage server exited with an error: {:?}", e)
})
});
let rx_server_task = task::spawn(async move { rx.changed().await });

// Block the current task until either the client program exits or the server exits.
tokio::select! {
_ = rx_server_task => {
info!(target: "preimage_server", "Received shutdown signal from preimage server task.")
},
res = util::flatten_join_result(server_pair_task) => {
res?;
}
}

// Signal to the client program that the server has exited.
let _ = tx.send(());
AssertUnwindSafe(server.start())
.catch_unwind()
.await
.map_err(|_| {
error!(target: "preimage_server", "Preimage server panicked");
anyhow!("Preimage server panicked")
})?
.map_err(|e| {
error!(target: "preimage_server", "Preimage server exited with an error");
anyhow!("Preimage server exited with an error: {:?}", e)
})?;

info!("Preimage server has exited.");
Ok(())
Expand All @@ -181,12 +145,7 @@ where
/// ## Returns
/// - `Ok(())` if the client program exits successfully.
/// - `Err(_)` if the client program exits with a non-zero status.
async fn start_native_client_program(
cfg: HostCli,
files: NativePipeFiles,
tx: Sender<()>,
mut rx: Receiver<()>,
) -> Result<()> {
async fn start_native_client_program(cfg: HostCli, files: NativePipeFiles) -> Result<()> {
// Map the file descriptors to the standard streams and the preimage oracle and hint
// reader's special file descriptors.
let mut command = Command::new(cfg.exec);
Expand All @@ -202,37 +161,20 @@ async fn start_native_client_program(
])
.expect("No errors may occur when mapping file descriptors.");

let exec_task = task::spawn(async move {
let status = command
.status()
.await
.map_err(|e| {
error!(target: "client_program", "Failed to execute client program: {:?}", e);
anyhow!("Failed to execute client program: {:?}", e)
})?
.success();
Ok::<_, anyhow::Error>(status)
});
let rx_program_task = task::spawn(async move { rx.changed().await });

// Block the current task until either the client program exits or the server exits.
tokio::select! {
_ = rx_program_task => {
info!(target: "client_program", "Received shutdown signal from preimage server task.")
},
res = util::flatten_join_result(exec_task) => {
if !(res?) {
// Signal to the preimage server that the client program has exited.
let _ = tx.send(());
error!(target: "client_program", "Client program exited with a non-zero status.");
return Err(anyhow!("Client program exited with a non-zero status."));
}
}
let status = command
.status()
.await
.map_err(|e| {
error!(target: "client_program", "Failed to execute client program: {:?}", e);
anyhow!("Failed to execute client program: {:?}", e)
})?
.success();

if !status {
error!(target: "client_program", "Client program exited with a non-zero status.");
return Err(anyhow!("Client program exited with a non-zero status."));
}

// Signal to the preimage server that the client program has exited.
let _ = tx.send(());

info!(target: "client_program", "Client program has exited.");
Ok(())
}
2 changes: 1 addition & 1 deletion bin/programs/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ fn main() -> Result<()> {
let caching_oracle = CachingOracle::<16>::new();
let boot = BootInfo::load(&caching_oracle).await?;
io::print(&alloc::format!("{:?}\n", boot));
Ok(())
Ok::<_, anyhow::Error>(())
})
}
11 changes: 8 additions & 3 deletions crates/common-proc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ pub fn client_entry(attr: TokenStream, input: TokenStream) -> TokenStream {
use anyhow::Result as AnyhowResult;

fn #fn_name() -> AnyhowResult<()> {
#fn_body
match #fn_body {
Ok(_) => kona_common::io::exit(0),
Err(e) => {
kona_common::io::print_err(alloc::format!("Program encountered fatal error: {:?}\n", e).as_ref());
kona_common::io::exit(1);
}
}
}

cfg_if::cfg_if! {
Expand All @@ -43,8 +49,7 @@ pub fn client_entry(attr: TokenStream, input: TokenStream) -> TokenStream {
#[no_mangle]
pub extern "C" fn _start() {
kona_common::alloc_heap!(HEAP_SIZE);
#fn_name().unwrap();
kona_common::io::exit(0);
#fn_name()
}

#[panic_handler]
Expand Down

0 comments on commit 241e6ca

Please sign in to comment.