diff --git a/src/python/pants/engine/internals/native_engine.pyi b/src/python/pants/engine/internals/native_engine.pyi index 913a880c8af..555243f64eb 100644 --- a/src/python/pants/engine/internals/native_engine.pyi +++ b/src/python/pants/engine/internals/native_engine.pyi @@ -120,6 +120,7 @@ def session_get_observation_histograms(scheduler: PyScheduler, session: PySessio def session_record_test_observation( scheduler: PyScheduler, session: PySession, value: int ) -> None: ... +def session_isolated_shallow_clone(session: PySession) -> PySession: ... def all_counter_names() -> list[str]: ... def graph_len(scheduler: PyScheduler) -> int: ... def graph_visualize(scheduler: PyScheduler, session: PySession, path: str) -> None: ... diff --git a/src/python/pants/engine/internals/scheduler.py b/src/python/pants/engine/internals/scheduler.py index 7f7167b5e5b..68c2e588f2a 100644 --- a/src/python/pants/engine/internals/scheduler.py +++ b/src/python/pants/engine/internals/scheduler.py @@ -130,6 +130,7 @@ def __init__( """ self.include_trace_on_error = include_trace_on_error self._visualize_to_dir = visualize_to_dir + self._visualize_run_count = 0 # Validate and register all provided and intrinsic tasks. rule_index = RuleIndex.create(rules) tasks = register_rules(rule_index, union_membership) @@ -311,7 +312,6 @@ class SchedulerSession: def __init__(self, scheduler: Scheduler, session: PySession) -> None: self._scheduler = scheduler self._py_session = session - self._run_count = 0 @property def scheduler(self) -> Scheduler: @@ -325,6 +325,11 @@ def py_scheduler(self) -> PyScheduler: def py_session(self) -> PySession: return self._py_session + def isolated_shallow_clone(self) -> SchedulerSession: + return SchedulerSession( + self._scheduler, native_engine.session_isolated_shallow_clone(self._py_session) + ) + def poll_workunits(self, max_log_verbosity: LogLevel) -> PolledWorkunits: result = native_engine.session_poll_workunits( self.py_scheduler, self.py_session, max_log_verbosity.level @@ -401,8 +406,9 @@ def metrics(self) -> dict[str, int]: def _maybe_visualize(self) -> None: if self._scheduler.visualize_to_dir is not None: - name = f"graph.{self._run_count:03d}.dot" - self._run_count += 1 + # TODO: This increment-and-get is racey. + name = f"graph.{self._scheduler._visualize_run_count:03d}.dot" + self._scheduler._visualize_run_count += 1 self.visualize_graph_to_file(os.path.join(self._scheduler.visualize_to_dir, name)) def teardown_dynamic_ui(self) -> None: diff --git a/src/python/pants/engine/streaming_workunit_handler.py b/src/python/pants/engine/streaming_workunit_handler.py index 21bec46deba..72ba2f12e2d 100644 --- a/src/python/pants/engine/streaming_workunit_handler.py +++ b/src/python/pants/engine/streaming_workunit_handler.py @@ -185,6 +185,7 @@ def __init__( pantsd: bool, max_workunit_verbosity: LogLevel = LogLevel.TRACE, ) -> None: + scheduler = scheduler.isolated_shallow_clone() self.callbacks = callbacks self.context = StreamingWorkunitContext( _scheduler=scheduler, diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 2aa2d4da81f..90c5f3c74e6 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -318,6 +318,12 @@ py_module_initializer!(native_engine, |py, m| { session_record_test_observation(a: PyScheduler, b: PySession, c: u64) ), )?; + m.add( + py, + "session_isolated_shallow_clone", + py_fn!(py, session_isolated_shallow_clone(a: PySession)), + )?; + m.add(py, "all_counter_names", py_fn!(py, all_counter_names()))?; m.add( @@ -1389,6 +1395,12 @@ fn session_record_test_observation( }) } +fn session_isolated_shallow_clone(py: Python, session_ptr: PySession) -> CPyResult { + with_session(py, session_ptr, |session| { + PySession::create_instance(py, session.isolated_shallow_clone()) + }) +} + fn validate_reachability(py: Python, scheduler_ptr: PyScheduler) -> PyUnitResult { with_scheduler(py, scheduler_ptr, |scheduler| { scheduler diff --git a/src/rust/engine/src/session.rs b/src/rust/engine/src/session.rs index ddcd7c03d96..92e74c9a0ca 100644 --- a/src/rust/engine/src/session.rs +++ b/src/rust/engine/src/session.rs @@ -46,17 +46,30 @@ enum SessionDisplay { }, } +impl SessionDisplay { + fn new( + workunit_store: &WorkunitStore, + parallelism: usize, + should_render_ui: bool, + ) -> SessionDisplay { + if should_render_ui { + SessionDisplay::ConsoleUI(ConsoleUI::new(workunit_store.clone(), parallelism)) + } else { + SessionDisplay::Logging { + // TODO: This threshold should likely be configurable, but the interval we render at + // probably does not need to be. + straggler_threshold: Duration::from_secs(60), + straggler_deadline: None, + } + } + } +} + /// -/// A Session represents a related series of requests (generally: one run of the pants CLI) on an -/// underlying Scheduler, and is a useful scope for metrics. -/// -/// Both Scheduler and Session are exposed to python and expected to be used by multiple threads, so -/// they use internal mutability in order to avoid exposing locks to callers. +/// The portion of a Session that uniquely identifies it and holds metrics and the history of +/// requests made on it. /// -struct InnerSession { - // Whether or not this Session has been cancelled. If a Session has been cancelled, all work that - // it started should attempt to exit in an orderly fashion. - cancelled: AsyncLatch, +struct SessionState { // The Core that this Session is running on. core: Arc, // The total size of the graph at Session-creation time. @@ -64,8 +77,6 @@ struct InnerSession { // The set of roots that have been requested within this session, with associated LastObserved // times if they were polled. roots: Mutex>>, - // The display mechanism to use in this Session. - display: Mutex, // A place to store info about workunits in rust part workunit_store: WorkunitStore, // The unique id for this Session: used for metrics gathering purposes. @@ -76,14 +87,25 @@ struct InnerSession { // entire Session, but in some cases (in particular, a `--loop`) the caller wants to retain the // same Session while still observing new values for uncacheable rules like Goals. // - // TODO: Figure out how the `--loop` interplays with metrics. It's possible that for metrics + // TODO: Figure out how the `--loop` flag interplays with metrics. It's possible that for metrics // purposes, each iteration of a loop should be considered to be a new Session, but for now the // Session/build_id would be stable. run_id: Mutex, workunit_metadata_map: RwLock>, } -impl InnerSession { +/// +/// A cancellable handle to a Session, with an optional associated UI. +/// +struct SessionHandle { + // Whether or not this Session has been cancelled. If a Session has been cancelled, all work that + // it started should attempt to exit in an orderly fashion. + cancelled: AsyncLatch, + // The display mechanism to use in this Session. + display: Mutex, +} + +impl SessionHandle { /// /// Cancels this Session. /// @@ -92,8 +114,22 @@ impl InnerSession { } } +/// +/// A Session represents a related series of requests (generally: one run of the pants CLI) on an +/// underlying Scheduler, and is a useful scope for metrics. +/// +/// Both Scheduler and Session are exposed to python and expected to be used by multiple threads, so +/// they use internal mutability in order to avoid exposing locks to callers. +/// +/// NB: The `SessionState` and `SessionHandle` structs are independent in order to allow for a +/// shallow clone of a Session with independent cancellation and display properties, but which +/// shares the same metrics and identity. +/// #[derive(Clone)] -pub struct Session(Arc); +pub struct Session { + handle: Arc, + state: Arc, +} impl Session { pub fn new( @@ -104,68 +140,85 @@ impl Session { cancelled: AsyncLatch, ) -> Session { let workunit_store = WorkunitStore::new(!should_render_ui); - let display = Mutex::new(if should_render_ui { - SessionDisplay::ConsoleUI(ConsoleUI::new( - workunit_store.clone(), - scheduler.core.local_parallelism, - )) - } else { - SessionDisplay::Logging { - // TODO: This threshold should likely be configurable, but the interval we render at - // probably does not need to be. - straggler_threshold: Duration::from_secs(60), - straggler_deadline: None, - } - }); + let display = Mutex::new(SessionDisplay::new( + &workunit_store, + scheduler.core.local_parallelism, + should_render_ui, + )); + + let handle = Arc::new(SessionHandle { cancelled, display }); + scheduler.core.sessions.add(&handle); + Session { + handle, + state: Arc::new(SessionState { + core: scheduler.core.clone(), + preceding_graph_size: scheduler.core.graph.len(), + roots: Mutex::new(HashMap::new()), + workunit_store, + build_id, + session_values: Mutex::new(session_values), + run_id: Mutex::new(Uuid::new_v4()), + workunit_metadata_map: RwLock::new(HashMap::new()), + }), + } + } - let inner_session = Arc::new(InnerSession { - cancelled, - core: scheduler.core.clone(), - preceding_graph_size: scheduler.core.graph.len(), - roots: Mutex::new(HashMap::new()), + /// + /// Creates a shallow clone of this Session which is independently cancellable, but which shares + /// metrics, identity, and state with the original. + /// + /// Useful when executing background work "on behalf of a Session" which should not be torn down + /// when a client disconnects. + /// + pub fn isolated_shallow_clone(&self) -> Session { + let display = Mutex::new(SessionDisplay::new( + &self.state.workunit_store, + self.state.core.local_parallelism, + false, + )); + let handle = Arc::new(SessionHandle { + cancelled: AsyncLatch::new(), display, - workunit_store, - build_id, - session_values: Mutex::new(session_values), - run_id: Mutex::new(Uuid::new_v4()), - workunit_metadata_map: RwLock::new(HashMap::new()), }); - scheduler.core.sessions.add(&inner_session); - Session(inner_session) + self.state.core.sessions.add(&handle); + Session { + handle, + state: self.state.clone(), + } } pub fn core(&self) -> &Arc { - &self.0.core + &self.state.core } /// /// Cancels this Session. /// pub fn cancel(&self) { - self.0.cancel(); + self.handle.cancel(); } /// /// Returns only if this Session has been cancelled. /// pub async fn cancelled(&self) { - self.0.cancelled.triggered().await; + self.handle.cancelled.triggered().await; } pub fn with_metadata_map(&self, f: F) -> T where F: Fn(&mut HashMap) -> T, { - f(&mut self.0.workunit_metadata_map.write()) + f(&mut self.state.workunit_metadata_map.write()) } pub fn roots_extend(&self, new_roots: Vec<(Root, Option)>) { - let mut roots = self.0.roots.lock(); + let mut roots = self.state.roots.lock(); roots.extend(new_roots); } pub fn roots_zip_last_observed(&self, inputs: &[Root]) -> Vec<(Root, Option)> { - let roots = self.0.roots.lock(); + let roots = self.state.roots.lock(); inputs .iter() .map(|root| { @@ -176,45 +229,45 @@ impl Session { } pub fn roots_nodes(&self) -> Vec { - let roots = self.0.roots.lock(); + let roots = self.state.roots.lock(); roots.keys().map(|r| r.clone().into()).collect() } pub fn session_values(&self) -> Value { - self.0.session_values.lock().clone() + self.state.session_values.lock().clone() } pub fn preceding_graph_size(&self) -> usize { - self.0.preceding_graph_size + self.state.preceding_graph_size } pub fn workunit_store(&self) -> WorkunitStore { - self.0.workunit_store.clone() + self.state.workunit_store.clone() } pub fn build_id(&self) -> &String { - &self.0.build_id + &self.state.build_id } pub fn run_id(&self) -> Uuid { - let run_id = self.0.run_id.lock(); + let run_id = self.state.run_id.lock(); *run_id } pub fn new_run_id(&self) { - let mut run_id = self.0.run_id.lock(); + let mut run_id = self.state.run_id.lock(); *run_id = Uuid::new_v4(); } pub async fn with_console_ui_disabled(&self, f: impl Future) -> T { - match *self.0.display.lock() { + match *self.handle.display.lock() { SessionDisplay::ConsoleUI(ref mut ui) => ui.with_console_ui_disabled(f).await, SessionDisplay::Logging { .. } => f.await, } } pub fn maybe_display_initialize(&self, executor: &Executor) { - let result = match *self.0.display.lock() { + let result = match *self.handle.display.lock() { SessionDisplay::ConsoleUI(ref mut ui) => ui.initialize(executor.clone()), SessionDisplay::Logging { ref mut straggler_deadline, @@ -230,7 +283,7 @@ impl Session { } pub async fn maybe_display_teardown(&self) { - let teardown = match *self.0.display.lock() { + let teardown = match *self.handle.display.lock() { SessionDisplay::ConsoleUI(ref mut ui) => ui.teardown().boxed(), SessionDisplay::Logging { ref mut straggler_deadline, @@ -246,7 +299,7 @@ impl Session { } pub fn maybe_display_render(&self) { - match *self.0.display.lock() { + match *self.handle.display.lock() { SessionDisplay::ConsoleUI(ref mut ui) => ui.render(), SessionDisplay::Logging { straggler_threshold, @@ -258,7 +311,7 @@ impl Session { { *straggler_deadline = Some(Instant::now() + STRAGGLER_LOGGING_INTERVAL); self - .0 + .state .workunit_store .log_straggling_workunits(straggler_threshold); } @@ -276,14 +329,14 @@ impl Session { pub struct Sessions { /// Live sessions. Completed Sessions (i.e., those for which the Weak reference is dead) are /// removed from this collection on a best effort when new Sessions are created. - sessions: Arc>>>, + sessions: Arc>>>, /// Handle to kill the signal monitoring task when this object is killed. signal_task_abort_handle: AbortHandle, } impl Sessions { pub fn new(executor: &Executor) -> Result { - let sessions: Arc>>> = Arc::default(); + let sessions: Arc>>> = Arc::default(); let signal_task_abort_handle = { let mut signal_stream = signal(SignalKind::interrupt()) .map_err(|err| format!("Failed to install interrupt handler: {}", err))?; @@ -310,7 +363,7 @@ impl Sessions { }) } - fn add(&self, session: &Arc) { + fn add(&self, session: &Arc) { let mut sessions = self.sessions.lock(); sessions.retain(|weak_session| weak_session.upgrade().is_some()); sessions.push(Arc::downgrade(session));