From 177deb9038e5e128c57f89af2e17840d025a3fd3 Mon Sep 17 00:00:00 2001 From: Kris Wilson Date: Fri, 31 Mar 2017 12:46:19 -0700 Subject: [PATCH] [pantsd] Address pantsd-runner hang on Linux and re-enable integration test. (#4407) ### Problem Currently, on Linux the first thin client call to the daemon can deadlock just after the pantsd->fork->pantsd-runner workflow. Connecting to the process with `gdb` reveals a deadlock in the following stack in the `post_fork` `drop` of the `CpuPool`: ``` #0 0x00007f63f04c31bd in __lll_lock_wait () from /lib64/libpthread.so.0 No symbol table info available. #1 0x00007f63f04c0ded in pthread_cond_signal@@GLIBC_2.3.2 () from /lib64/libpthread.so.0 No symbol table info available. #2 0x00007f63d3cfa438 in notify_one () at /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/src/libstd/sys/unix/condvar.rs:52 No locals. #3 notify_one () at /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/src/libstd/sys_common/condvar.rs:39 No locals. #4 notify_one () at /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/src/libstd/sync/condvar.rs:208 No locals. #5 std::thread::{{impl}}::unpark () at /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/src/libstd/thread/mod.rs:633 No locals. #6 0x00007f63d3c583d1 in crossbeam::sync::ms_queue::{{impl}}::push (self=, t=...) at /home/kwilson/.cache/pants/rust-toolchain/registry/src/github.com-1ecc6299db9ec823/crossbeam-0.2.10/src/sync/ms_queue.rs:178 guard = self = #7 0x00007f63d3c588ed in futures_cpupool::{{impl}}::drop (self=) at /home/kwilson/.cache/pants/rust-toolchain/git/checkouts/futures-rs-a4f11d094efefb0a/f7e6bc8/futures-cpupool/src/lib.rs:236 self = 0x37547a0 #8 0x00007f63d3be871c in engine::fs::{{impl}}::post_fork (self=0x3754778) at /home/kwilson/dev/pants/src/rust/engine/src/fs.rs:355 self = 0x3754778 #9 0x00007f63d3be10e4 in engine::context::{{impl}}::post_fork (self=0x37545b0) at /home/kwilson/dev/pants/src/rust/engine/src/context.rs:93 self = 0x37545b0 #10 0x00007f63d3c0de5a in {{closure}} (scheduler=) at /home/kwilson/dev/pants/src/rust/engine/src/lib.rs:275 scheduler = 0x3740580 #11 with_scheduler (scheduler_ptr=, f=...) at /home/kwilson/dev/pants/src/rust/engine/src/lib.rs:584 scheduler = 0x3740580 scheduler_ptr = 0x3740580 #12 engine::scheduler_post_fork (scheduler_ptr=0x3740580) at /home/kwilson/dev/pants/src/rust/engine/src/lib.rs:274 scheduler_ptr = 0x3740580 #13 0x00007f63d3c1be8c in _cffi_f_scheduler_post_fork (self=, arg0=0x35798f0) at src/cffi/native_engine.c:2234 _save = 0x34a65a0 x0 = 0x3740580 datasize = #14 0x00007f63f07b5a62 in PyEval_EvalFrameEx () from /lib64/libpython2.7.so.1.0 ``` This presents as a hang in the thin client, because the pailgun socket is left open in the pantsd-runner. ### Solution Add pre-fork hooks and tear down the `CpuPool` instances prior to forking and rebuilding them. ### Result Can no longer reproduce the hang. --- src/python/pants/bin/daemon_pants_runner.py | 5 ++ src/python/pants/engine/scheduler.py | 6 ++ src/python/pants/engine/subsystem/native.py | 1 + .../engine/subsystem/native_engine_version | 2 +- src/python/pants/pantsd/pants_daemon.py | 3 + .../pants/pantsd/service/pants_service.py | 3 + .../pants/pantsd/service/scheduler_service.py | 4 ++ src/rust/engine/src/context.rs | 18 +++-- src/rust/engine/src/fs.rs | 67 +++++++++++-------- src/rust/engine/src/graph.rs | 3 +- src/rust/engine/src/lib.rs | 7 ++ src/rust/engine/src/scheduler.rs | 2 +- .../pantsd/test_pantsd_integration.py | 2 - 13 files changed, 84 insertions(+), 39 deletions(-) diff --git a/src/python/pants/bin/daemon_pants_runner.py b/src/python/pants/bin/daemon_pants_runner.py index e563e326aa3..13801a314af 100644 --- a/src/python/pants/bin/daemon_pants_runner.py +++ b/src/python/pants/bin/daemon_pants_runner.py @@ -130,6 +130,11 @@ def run(self): """Fork, daemonize and invoke self.post_fork_child() (via ProcessManager).""" self.daemonize(write_pid=False) + def pre_fork(self): + """Pre-fork callback executed via ProcessManager.daemonize().""" + if self._graph_helper: + self._graph_helper.scheduler.pre_fork() + def post_fork_child(self): """Post-fork child process callback executed via ProcessManager.daemonize().""" # Set the Exiter exception hook post-fork so as not to affect the pantsd processes exception diff --git a/src/python/pants/engine/scheduler.py b/src/python/pants/engine/scheduler.py index 9999bccd834..e1f1e1557b0 100644 --- a/src/python/pants/engine/scheduler.py +++ b/src/python/pants/engine/scheduler.py @@ -258,6 +258,9 @@ def visualize_to_dir(self): def to_keys(self, subjects): return list(self._to_key(subject) for subject in subjects) + def pre_fork(self): + self._native.lib.scheduler_pre_fork(self._scheduler) + def post_fork(self): self._native.lib.scheduler_post_fork(self._scheduler) @@ -442,6 +445,9 @@ def _execution_add_roots(self, execution_request): for subject, selector in execution_request.roots: self._scheduler.add_root_selection(subject, selector) + def pre_fork(self): + self._scheduler.pre_fork() + def post_fork(self): self._scheduler.post_fork() diff --git a/src/python/pants/engine/subsystem/native.py b/src/python/pants/engine/subsystem/native.py index 2023f1f9552..584fb6f8b1c 100644 --- a/src/python/pants/engine/subsystem/native.py +++ b/src/python/pants/engine/subsystem/native.py @@ -183,6 +183,7 @@ Buffer, Buffer, BufferBuffer); +void scheduler_pre_fork(Scheduler*); void scheduler_post_fork(Scheduler*); void scheduler_destroy(Scheduler*); diff --git a/src/python/pants/engine/subsystem/native_engine_version b/src/python/pants/engine/subsystem/native_engine_version index 1b2e45cf17e..d6081358ec8 100644 --- a/src/python/pants/engine/subsystem/native_engine_version +++ b/src/python/pants/engine/subsystem/native_engine_version @@ -1 +1 @@ -7357cc56f4a01aca2b89f98706d41d9202802129 +a780f41d9addd75db310272b010ae3ad65b08045 diff --git a/src/python/pants/pantsd/pants_daemon.py b/src/python/pants/pantsd/pants_daemon.py index 4caf69cb8ea..b63eecfa101 100644 --- a/src/python/pants/pantsd/pants_daemon.py +++ b/src/python/pants/pantsd/pants_daemon.py @@ -182,6 +182,9 @@ def _run(self): def pre_fork(self): """Pre-fork() callback for ProcessManager.daemonize().""" + for service in self._services: + service.pre_fork() + # Teardown the RunTracker's SubprocPool pre-fork. RunTracker.global_instance().shutdown_worker_pool() # TODO(kwlzn): This currently aborts tracking of the remainder of the pants run that launched diff --git a/src/python/pants/pantsd/service/pants_service.py b/src/python/pants/pantsd/service/pants_service.py index fa251d7bc25..7ac98e5ebae 100644 --- a/src/python/pants/pantsd/service/pants_service.py +++ b/src/python/pants/pantsd/service/pants_service.py @@ -31,6 +31,9 @@ def is_killed(self): """ return self._kill_switch.is_set() + def pre_fork(self): + """Called pre-fork, before `run` to allow for service->service or other side-effecting setup.""" + def setup(self): """Called before `run` to allow for service->service or other side-effecting setup.""" diff --git a/src/python/pants/pantsd/service/scheduler_service.py b/src/python/pants/pantsd/service/scheduler_service.py index c3422605eaa..891e62e7ba0 100644 --- a/src/python/pants/pantsd/service/scheduler_service.py +++ b/src/python/pants/pantsd/service/scheduler_service.py @@ -45,6 +45,10 @@ def change_calculator(self): """Surfaces the change calculator.""" return self._graph_helper.change_calculator + def pre_fork(self): + """Pre-fork controls.""" + self._scheduler.pre_fork() + def setup(self): """Service setup.""" # Register filesystem event handlers on an FSEventService instance. diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 0a88cf99189..e3098718ce9 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -26,7 +26,7 @@ pub struct Core { pub vfs: PosixFS, // TODO: This is a second pool (relative to the VFS pool), upon which all work is // submitted. See https://github.com/pantsbuild/pants/issues/4298 - pool: RwLock, + pool: RwLock>, } impl Core { @@ -66,11 +66,11 @@ impl Core { .unwrap_or_else(|e| { panic!("Could not initialize VFS: {:?}", e); }), - pool: RwLock::new(Core::create_pool()), + pool: RwLock::new(Some(Core::create_pool())), } } - pub fn pool(&self) -> RwLockReadGuard { + pub fn pool(&self) -> RwLockReadGuard> { self.pool.read().unwrap() } @@ -80,6 +80,12 @@ impl Core { .create() } + pub fn pre_fork(&self) { + self.vfs.pre_fork(); + let mut pool = self.pool.write().unwrap(); + *pool = None; + } + /** * Reinitializes a Core in a new process (basically, recreates its CpuPool). */ @@ -88,7 +94,7 @@ impl Core { self.vfs.post_fork(); // And our own. let mut pool = self.pool.write().unwrap(); - *pool = Core::create_pool(); + *pool = Some(Core::create_pool()); } } @@ -109,7 +115,7 @@ impl Context { pub trait ContextFactory { fn create(&self, entry_id: EntryId) -> Context; - fn pool(&self) -> RwLockReadGuard; + fn pool(&self) -> RwLockReadGuard>; } impl ContextFactory for Context { @@ -124,7 +130,7 @@ impl ContextFactory for Context { } } - fn pool(&self) -> RwLockReadGuard { + fn pool(&self) -> RwLockReadGuard> { self.core.pool() } } diff --git a/src/rust/engine/src/fs.rs b/src/rust/engine/src/fs.rs index 0b4f1b13d36..d5d412380d5 100644 --- a/src/rust/engine/src/fs.rs +++ b/src/rust/engine/src/fs.rs @@ -276,7 +276,7 @@ struct PathGlobsExpansion { pub struct PosixFS { build_root: Dir, // The pool needs to be reinitialized after a fork, so it is protected by a lock. - pool: RwLock, + pool: RwLock>, ignore: Gitignore, } @@ -285,7 +285,7 @@ impl PosixFS { build_root: PathBuf, ignore_patterns: Vec, ) -> Result { - let pool = RwLock::new(PosixFS::create_pool()); + let pool = RwLock::new(Some(PosixFS::create_pool())); let canonical_build_root = build_root.canonicalize().and_then(|canonical| canonical.metadata().and_then(|metadata| @@ -346,13 +346,18 @@ impl PosixFS { Ok(stats) } - fn pool(&self) -> RwLockReadGuard { + fn pool(&self) -> RwLockReadGuard> { self.pool.read().unwrap() } + pub fn pre_fork(&self) { + let mut pool = self.pool.write().unwrap(); + *pool = None; + } + pub fn post_fork(&self) { let mut pool = self.pool.write().unwrap(); - *pool = PosixFS::create_pool(); + *pool = Some(PosixFS::create_pool()); } pub fn ignore>(&self, path: P, is_dir: bool) -> bool { @@ -365,7 +370,8 @@ impl PosixFS { pub fn read_link(&self, link: &Link) -> BoxFuture { let link_parent = link.0.parent().map(|p| p.to_owned()); let link_abs = self.build_root.0.join(link.0.as_path()).to_owned(); - self.pool() + let pool = self.pool(); + pool.as_ref().expect("Uninitialized CpuPool!") .spawn_fn(move || { link_abs .read_link() @@ -393,7 +399,8 @@ impl PosixFS { pub fn scandir(&self, dir: &Dir) -> BoxFuture, io::Error> { let dir = dir.to_owned(); let dir_abs = self.build_root.0.join(dir.0.as_path()); - self.pool() + let pool = self.pool(); + pool.as_ref().expect("Uninitialized CpuPool!") .spawn_fn(move || { PosixFS::scandir_sync(dir, dir_abs) }) @@ -839,24 +846,26 @@ impl Snapshots { let build_root = fs.build_root.clone(); let temp_path = self.next_temp_path().expect("Couldn't get the next temp path."); - fs.pool().spawn_fn(move || { - // Write the tar deterministically to a temporary file while fingerprinting. - let fingerprint = - Snapshots::tar_create_fingerprinted(temp_path.as_path(), &paths, &build_root)?; - - // Rename to the final path if it does not already exist. - Snapshots::finalize( - temp_path.as_path(), - Snapshots::path_under_for(&dest_dir, &fingerprint).as_path() - )?; + let pool = fs.pool(); + pool.as_ref().expect("Uninitialized CpuPool!") + .spawn_fn(move || { + // Write the tar deterministically to a temporary file while fingerprinting. + let fingerprint = + Snapshots::tar_create_fingerprinted(temp_path.as_path(), &paths, &build_root)?; + + // Rename to the final path if it does not already exist. + Snapshots::finalize( + temp_path.as_path(), + Snapshots::path_under_for(&dest_dir, &fingerprint).as_path() + )?; - Ok( - Snapshot { - fingerprint: fingerprint, - path_stats: paths, - } - ) - }) + Ok( + Snapshot { + fingerprint: fingerprint, + path_stats: paths, + } + ) + }) } fn contents_for_sync(snapshot: Snapshot, path: PathBuf) -> Result, io::Error> { @@ -887,10 +896,12 @@ impl Snapshots { pub fn contents_for(&self, fs: &PosixFS, snapshot: Snapshot) -> CpuFuture, String> { let archive_path = self.path_for(&snapshot.fingerprint); - fs.pool().spawn_fn(move || { - let snapshot_str = format!("{:?}", snapshot); - Snapshots::contents_for_sync(snapshot, archive_path) - .map_err(|e| format!("Failed to open Snapshot {}: {:?}", snapshot_str, e)) - }) + let pool = fs.pool(); + pool.as_ref().expect("Uninitialized CpuPool!") + .spawn_fn(move || { + let snapshot_str = format!("{:?}", snapshot); + Snapshots::contents_for_sync(snapshot, archive_path) + .map_err(|e| format!("Failed to open Snapshot {}: {:?}", snapshot_str, e)) + }) } } diff --git a/src/rust/engine/src/graph.rs b/src/rust/engine/src/graph.rs index f02127416ca..d7f69a911d6 100644 --- a/src/rust/engine/src/graph.rs +++ b/src/rust/engine/src/graph.rs @@ -112,7 +112,8 @@ impl Entry { let state = match self.node.clone() { EntryKey::Valid(n) => { - let pool = context_factory.pool(); + let pool_opt = context_factory.pool(); + let pool = pool_opt.as_ref().expect("Uninitialized CpuPool!"); let context = context_factory.create(entry_id); pool .spawn_fn(move || { diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index d41a54b9949..31093697d18 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -262,6 +262,13 @@ pub extern fn scheduler_create( ) } +#[no_mangle] +pub extern fn scheduler_pre_fork(scheduler_ptr: *mut Scheduler) { + with_scheduler(scheduler_ptr, |scheduler| { + scheduler.core.pre_fork(); + }) +} + #[no_mangle] pub extern fn scheduler_post_fork(scheduler_ptr: *mut Scheduler) { with_scheduler(scheduler_ptr, |scheduler| { diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index ea5f79ac9f6..ac792fd7a12 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -151,7 +151,7 @@ impl ContextFactory for Arc { Context::new(entry_id, self.clone()) } - fn pool(&self) -> RwLockReadGuard { + fn pool(&self) -> RwLockReadGuard> { Core::pool(self) } } diff --git a/tests/python/pants_test/pantsd/test_pantsd_integration.py b/tests/python/pants_test/pantsd/test_pantsd_integration.py index c8a83483dc3..3a41e9d5ee2 100644 --- a/tests/python/pants_test/pantsd/test_pantsd_integration.py +++ b/tests/python/pants_test/pantsd/test_pantsd_integration.py @@ -6,7 +6,6 @@ unicode_literals, with_statement) import os -import unittest from contextlib import contextmanager from pants.pantsd.process_manager import ProcessManager @@ -78,7 +77,6 @@ def test_pantsd_run(self): for line in read_pantsd_log(workdir): print(line) - @unittest.skip('TODO: See https://github.com/pantsbuild/pants/issues/4301') def test_pantsd_run_with_watchman(self): config = {'pantsd': {'fs_event_detection': True}, # The absolute paths in CI can exceed the UNIX socket path limitation