Skip to content

Commit

Permalink
proc_macro: use crossbeam channels for the proc_macro cross-thread br…
Browse files Browse the repository at this point in the history
…idge

This is done by having the crossbeam dependency inserted into the
proc_macro server code from the server side, to avoid adding a
dependency to proc_macro.

In addition, this introduces a -Z command-line option which will switch
rustc to run proc-macros using this cross-thread executor. With the
changes to the bridge in rust-lang#98186, rust-lang#98187, rust-lang#98188 and rust-lang#98189, the
performance of the executor should be much closer to same-thread
execution.

In local testing, the crossbeam executor was substantially more
performant than either of the two existing CrossThread strategies, so
they have been removed to keep things simple.
mystor committed Jul 29, 2022
1 parent 2f847b8 commit 6d1650f
Showing 8 changed files with 133 additions and 80 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
@@ -3885,6 +3885,7 @@ dependencies = [
name = "rustc_expand"
version = "0.0.0"
dependencies = [
"crossbeam-channel",
"rustc_ast",
"rustc_ast_passes",
"rustc_ast_pretty",
1 change: 1 addition & 0 deletions compiler/rustc_expand/Cargo.toml
Original file line number Diff line number Diff line change
@@ -24,3 +24,4 @@ rustc_parse = { path = "../rustc_parse" }
rustc_session = { path = "../rustc_session" }
smallvec = { version = "1.8.1", features = ["union", "may_dangle"] }
rustc_ast = { path = "../rustc_ast" }
crossbeam-channel = "0.5.0"
44 changes: 37 additions & 7 deletions compiler/rustc_expand/src/proc_macro.rs
Original file line number Diff line number Diff line change
@@ -8,10 +8,37 @@ use rustc_ast::tokenstream::{TokenStream, TokenTree};
use rustc_data_structures::sync::Lrc;
use rustc_errors::ErrorGuaranteed;
use rustc_parse::parser::ForceCollect;
use rustc_session::config::ProcMacroExecutionStrategy;
use rustc_span::profiling::SpannedEventArgRecorder;
use rustc_span::{Span, DUMMY_SP};

const EXEC_STRATEGY: pm::bridge::server::SameThread = pm::bridge::server::SameThread;
struct CrossbeamMessagePipe<T> {
tx: crossbeam_channel::Sender<T>,
rx: crossbeam_channel::Receiver<T>,
}

impl<T> pm::bridge::server::MessagePipe<T> for CrossbeamMessagePipe<T> {
fn new() -> (Self, Self) {
let (tx1, rx1) = crossbeam_channel::bounded(1);
let (tx2, rx2) = crossbeam_channel::bounded(1);
(CrossbeamMessagePipe { tx: tx1, rx: rx2 }, CrossbeamMessagePipe { tx: tx2, rx: rx1 })
}

fn send(&mut self, value: T) {
self.tx.send(value).unwrap();
}

fn recv(&mut self) -> Option<T> {
self.rx.recv().ok()
}
}

fn exec_strategy(ecx: &ExtCtxt<'_>) -> impl pm::bridge::server::ExecutionStrategy {
pm::bridge::server::MaybeCrossThread::<CrossbeamMessagePipe<_>>::new(
ecx.sess.opts.unstable_opts.proc_macro_execution_strategy
== ProcMacroExecutionStrategy::CrossThread,
)
}

pub struct BangProcMacro {
pub client: pm::bridge::client::Client<pm::TokenStream, pm::TokenStream>,
@@ -30,8 +57,9 @@ impl base::BangProcMacro for BangProcMacro {
});

let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
self.client.run(&EXEC_STRATEGY, server, input, proc_macro_backtrace).map_err(|e| {
self.client.run(&strategy, server, input, proc_macro_backtrace).map_err(|e| {
let mut err = ecx.struct_span_err(span, "proc macro panicked");
if let Some(s) = e.as_str() {
err.help(&format!("message: {}", s));
@@ -59,16 +87,17 @@ impl base::AttrProcMacro for AttrProcMacro {
});

let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
self.client
.run(&EXEC_STRATEGY, server, annotation, annotated, proc_macro_backtrace)
.map_err(|e| {
self.client.run(&strategy, server, annotation, annotated, proc_macro_backtrace).map_err(
|e| {
let mut err = ecx.struct_span_err(span, "custom attribute panicked");
if let Some(s) = e.as_str() {
err.help(&format!("message: {}", s));
}
err.emit()
})
},
)
}
}

@@ -105,8 +134,9 @@ impl MultiItemModifier for DeriveProcMacro {
recorder.record_arg_with_span(ecx.expansion_descr(), span);
});
let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
match self.client.run(&EXEC_STRATEGY, server, input, proc_macro_backtrace) {
match self.client.run(&strategy, server, input, proc_macro_backtrace) {
Ok(stream) => stream,
Err(e) => {
let mut err = ecx.struct_span_err(span, "proc-macro derive panicked");
3 changes: 2 additions & 1 deletion compiler/rustc_interface/src/tests.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use rustc_session::config::{
};
use rustc_session::config::{
BranchProtection, Externs, OomStrategy, OutputType, OutputTypes, PAuthKey, PacRet,
SymbolManglingVersion, WasiExecModel,
ProcMacroExecutionStrategy, SymbolManglingVersion, WasiExecModel,
};
use rustc_session::config::{CFGuard, ExternEntry, LinkerPluginLto, LtoCli, SwitchWithOptPath};
use rustc_session::lint::Level;
@@ -685,6 +685,7 @@ fn test_unstable_options_tracking_hash() {
untracked!(print_mono_items, Some(String::from("abc")));
untracked!(print_type_sizes, true);
untracked!(proc_macro_backtrace, true);
untracked!(proc_macro_execution_strategy, ProcMacroExecutionStrategy::CrossThread);
untracked!(query_dep_graph, true);
untracked!(save_analysis, true);
untracked!(self_profile, SwitchWithOptPath::Enabled(None));
10 changes: 10 additions & 0 deletions compiler/rustc_session/src/config.rs
Original file line number Diff line number Diff line change
@@ -2959,3 +2959,13 @@ impl OomStrategy {
}
}
}

/// How to run proc-macro code when building this crate
#[derive(Clone, Copy, PartialEq, Hash, Debug)]
pub enum ProcMacroExecutionStrategy {
/// Run the proc-macro code on the same thread as the server.
SameThread,

/// Run the proc-macro code on a different thread.
CrossThread,
}
17 changes: 17 additions & 0 deletions compiler/rustc_session/src/options.rs
Original file line number Diff line number Diff line change
@@ -415,6 +415,8 @@ mod desc {
"one of (`none` (default), `basic`, `strong`, or `all`)";
pub const parse_branch_protection: &str =
"a `,` separated combination of `bti`, `b-key`, `pac-ret`, or `leaf`";
pub const parse_proc_macro_execution_strategy: &str =
"one of supported execution strategies (`same-thread`, or `cross-thread`)";
}

mod parse {
@@ -1062,6 +1064,18 @@ mod parse {
}
true
}

pub(crate) fn parse_proc_macro_execution_strategy(
slot: &mut ProcMacroExecutionStrategy,
v: Option<&str>,
) -> bool {
*slot = match v {
Some("same-thread") => ProcMacroExecutionStrategy::SameThread,
Some("cross-thread") => ProcMacroExecutionStrategy::CrossThread,
_ => return false,
};
true
}
}

options! {
@@ -1457,6 +1471,9 @@ options! {
"print layout information for each type encountered (default: no)"),
proc_macro_backtrace: bool = (false, parse_bool, [UNTRACKED],
"show backtraces for panics during proc-macro execution (default: no)"),
proc_macro_execution_strategy: ProcMacroExecutionStrategy = (ProcMacroExecutionStrategy::SameThread,
parse_proc_macro_execution_strategy, [UNTRACKED],
"how to run proc-macro code (default: same-thread)"),
profile: bool = (false, parse_bool, [TRACKED],
"insert profiling code (default: no)"),
profile_closures: bool = (false, parse_no_flag, [UNTRACKED],
136 changes: 64 additions & 72 deletions library/proc_macro/src/bridge/server.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@
use super::*;

use std::marker::PhantomData;

// FIXME(eddyb) generate the definition of `HandleStore` in `server.rs`.
use super::client::HandleStore;

@@ -143,6 +145,41 @@ pub trait ExecutionStrategy {
) -> Buffer;
}

pub struct MaybeCrossThread<P> {
cross_thread: bool,
marker: PhantomData<P>,
}

impl<P> MaybeCrossThread<P> {
pub const fn new(cross_thread: bool) -> Self {
MaybeCrossThread { cross_thread, marker: PhantomData }
}
}

impl<P> ExecutionStrategy for MaybeCrossThread<P>
where
P: MessagePipe<Buffer> + Send + 'static,
{
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
if self.cross_thread {
<CrossThread<P>>::new().run_bridge_and_client(
dispatcher,
input,
run_client,
force_show_panics,
)
} else {
SameThread.run_bridge_and_client(dispatcher, input, run_client, force_show_panics)
}
}
}

pub struct SameThread;

impl ExecutionStrategy for SameThread {
@@ -164,28 +201,31 @@ impl ExecutionStrategy for SameThread {
}
}

// NOTE(eddyb) Two implementations are provided, the second one is a bit
// faster but neither is anywhere near as fast as same-thread execution.
pub struct CrossThread<P>(PhantomData<P>);

pub struct CrossThread1;
impl<P> CrossThread<P> {
pub const fn new() -> Self {
CrossThread(PhantomData)
}
}

impl ExecutionStrategy for CrossThread1 {
impl<P> ExecutionStrategy for CrossThread<P>
where
P: MessagePipe<Buffer> + Send + 'static,
{
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
use std::sync::mpsc::channel;

let (req_tx, req_rx) = channel();
let (res_tx, res_rx) = channel();
let (mut server, mut client) = P::new();

let join_handle = thread::spawn(move || {
let mut dispatch = |buf| {
req_tx.send(buf).unwrap();
res_rx.recv().unwrap()
let mut dispatch = |b: Buffer| -> Buffer {
client.send(b);
client.recv().expect("server died while client waiting for reply")
};

run_client(BridgeConfig {
@@ -196,75 +236,27 @@ impl ExecutionStrategy for CrossThread1 {
})
});

for b in req_rx {
res_tx.send(dispatcher.dispatch(b)).unwrap();
while let Some(b) = server.recv() {
server.send(dispatcher.dispatch(b));
}

join_handle.join().unwrap()
}
}

pub struct CrossThread2;
/// A message pipe used for communicating between server and client threads.
pub trait MessagePipe<T>: Sized {
/// Create a new pair of endpoints for the message pipe.
fn new() -> (Self, Self);

impl ExecutionStrategy for CrossThread2 {
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
use std::sync::{Arc, Mutex};

enum State<T> {
Req(T),
Res(T),
}

let mut state = Arc::new(Mutex::new(State::Res(Buffer::new())));

let server_thread = thread::current();
let state2 = state.clone();
let join_handle = thread::spawn(move || {
let mut dispatch = |b| {
*state2.lock().unwrap() = State::Req(b);
server_thread.unpark();
loop {
thread::park();
if let State::Res(b) = &mut *state2.lock().unwrap() {
break b.take();
}
}
};

let r = run_client(BridgeConfig {
input,
dispatch: (&mut dispatch).into(),
force_show_panics,
_marker: marker::PhantomData,
});

// Wake up the server so it can exit the dispatch loop.
drop(state2);
server_thread.unpark();

r
});

// Check whether `state2` was dropped, to know when to stop.
while Arc::get_mut(&mut state).is_none() {
thread::park();
let mut b = match &mut *state.lock().unwrap() {
State::Req(b) => b.take(),
_ => continue,
};
b = dispatcher.dispatch(b.take());
*state.lock().unwrap() = State::Res(b);
join_handle.thread().unpark();
}
/// Send a message to the other endpoint of this pipe.
fn send(&mut self, value: T);

join_handle.join().unwrap()
}
/// Receive a message from the other endpoint of this pipe.
///
/// Returns `None` if the other end of the pipe has been destroyed, and no
/// message was received.
fn recv(&mut self) -> Option<T>;
}

fn run_server<
1 change: 1 addition & 0 deletions src/test/rustdoc-ui/z-help.stdout
Original file line number Diff line number Diff line change
@@ -114,6 +114,7 @@
-Z print-mono-items=val -- print the result of the monomorphization collection pass
-Z print-type-sizes=val -- print layout information for each type encountered (default: no)
-Z proc-macro-backtrace=val -- show backtraces for panics during proc-macro execution (default: no)
-Z proc-macro-execution-strategy=val -- how to run proc-macro code (default: same-thread)
-Z profile=val -- insert profiling code (default: no)
-Z profile-closures=val -- profile size of closures
-Z profile-emit=val -- file path to emit profiling data at runtime when using 'profile' (default based on relative source path)

0 comments on commit 6d1650f

Please sign in to comment.