Skip to content

Commit

Permalink
[pantsd] Address pantsd-runner hang on Linux and re-enable integratio…
Browse files Browse the repository at this point in the history
…n test. (#4407)

### Problem

Currently, on Linux the first thin client call to the daemon can deadlock just after the pantsd->fork->pantsd-runner workflow. Connecting to the process with `gdb` reveals a deadlock in the following stack in the `post_fork` `drop` of the `CpuPool`:

```
#0  0x00007f63f04c31bd in __lll_lock_wait () from /lib64/libpthread.so.0
No symbol table info available.
#1  0x00007f63f04c0ded in pthread_cond_signal@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
No symbol table info available.
#2  0x00007f63d3cfa438 in notify_one ()
    at /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/src/libstd/sys/unix/condvar.rs:52
No locals.
#3  notify_one () at /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/src/libstd/sys_common/condvar.rs:39
No locals.
#4  notify_one () at /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/src/libstd/sync/condvar.rs:208
No locals.
#5  std::thread::{{impl}}::unpark () at /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/src/libstd/thread/mod.rs:633
No locals.
#6  0x00007f63d3c583d1 in crossbeam::sync::ms_queue::{{impl}}::push<futures_cpupool::Message> (self=<optimized out>, t=...)
    at /home/kwilson/.cache/pants/rust-toolchain/registry/src/github.com-1ecc6299db9ec823/crossbeam-0.2.10/src/sync/ms_queue.rs:178
        guard = <optimized out>
        self = <optimized out>
#7  0x00007f63d3c588ed in futures_cpupool::{{impl}}::drop (self=<optimized out>)
    at /home/kwilson/.cache/pants/rust-toolchain/git/checkouts/futures-rs-a4f11d094efefb0a/f7e6bc8/futures-cpupool/src/lib.rs:236
        self = 0x37547a0
#8  0x00007f63d3be871c in engine::fs::{{impl}}::post_fork (self=0x3754778) at /home/kwilson/dev/pants/src/rust/engine/src/fs.rs:355
        self = 0x3754778
#9  0x00007f63d3be10e4 in engine::context::{{impl}}::post_fork (self=0x37545b0)
    at /home/kwilson/dev/pants/src/rust/engine/src/context.rs:93
        self = 0x37545b0
#10 0x00007f63d3c0de5a in {{closure}} (scheduler=<optimized out>) at /home/kwilson/dev/pants/src/rust/engine/src/lib.rs:275
        scheduler = 0x3740580
#11 with_scheduler<closure,()> (scheduler_ptr=<optimized out>, f=...) at /home/kwilson/dev/pants/src/rust/engine/src/lib.rs:584
        scheduler = 0x3740580
        scheduler_ptr = 0x3740580
#12 engine::scheduler_post_fork (scheduler_ptr=0x3740580) at /home/kwilson/dev/pants/src/rust/engine/src/lib.rs:274
        scheduler_ptr = 0x3740580
#13 0x00007f63d3c1be8c in _cffi_f_scheduler_post_fork (self=<optimized out>, arg0=0x35798f0) at src/cffi/native_engine.c:2234
        _save = 0x34a65a0
        x0 = 0x3740580
        datasize = <optimized out>
#14 0x00007f63f07b5a62 in PyEval_EvalFrameEx () from /lib64/libpython2.7.so.1.0
```

This presents as a hang in the thin client, because the pailgun socket is left open in the pantsd-runner.

### Solution

Add pre-fork hooks and tear down the `CpuPool` instances prior to forking and rebuilding them.

### Result

Can no longer reproduce the hang.
  • Loading branch information
kwlzn authored Mar 31, 2017
1 parent 528f3a0 commit 177deb9
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 39 deletions.
5 changes: 5 additions & 0 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ def run(self):
"""Fork, daemonize and invoke self.post_fork_child() (via ProcessManager)."""
self.daemonize(write_pid=False)

def pre_fork(self):
"""Pre-fork callback executed via ProcessManager.daemonize()."""
if self._graph_helper:
self._graph_helper.scheduler.pre_fork()

def post_fork_child(self):
"""Post-fork child process callback executed via ProcessManager.daemonize()."""
# Set the Exiter exception hook post-fork so as not to affect the pantsd processes exception
Expand Down
6 changes: 6 additions & 0 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ def visualize_to_dir(self):
def to_keys(self, subjects):
return list(self._to_key(subject) for subject in subjects)

def pre_fork(self):
self._native.lib.scheduler_pre_fork(self._scheduler)

def post_fork(self):
self._native.lib.scheduler_post_fork(self._scheduler)

Expand Down Expand Up @@ -442,6 +445,9 @@ def _execution_add_roots(self, execution_request):
for subject, selector in execution_request.roots:
self._scheduler.add_root_selection(subject, selector)

def pre_fork(self):
self._scheduler.pre_fork()

def post_fork(self):
self._scheduler.post_fork()

Expand Down
1 change: 1 addition & 0 deletions src/python/pants/engine/subsystem/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
Buffer,
Buffer,
BufferBuffer);
void scheduler_pre_fork(Scheduler*);
void scheduler_post_fork(Scheduler*);
void scheduler_destroy(Scheduler*);
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/engine/subsystem/native_engine_version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7357cc56f4a01aca2b89f98706d41d9202802129
a780f41d9addd75db310272b010ae3ad65b08045
3 changes: 3 additions & 0 deletions src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ def _run(self):

def pre_fork(self):
"""Pre-fork() callback for ProcessManager.daemonize()."""
for service in self._services:
service.pre_fork()

# Teardown the RunTracker's SubprocPool pre-fork.
RunTracker.global_instance().shutdown_worker_pool()
# TODO(kwlzn): This currently aborts tracking of the remainder of the pants run that launched
Expand Down
3 changes: 3 additions & 0 deletions src/python/pants/pantsd/service/pants_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def is_killed(self):
"""
return self._kill_switch.is_set()

def pre_fork(self):
"""Called pre-fork, before `run` to allow for service->service or other side-effecting setup."""

def setup(self):
"""Called before `run` to allow for service->service or other side-effecting setup."""

Expand Down
4 changes: 4 additions & 0 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ def change_calculator(self):
"""Surfaces the change calculator."""
return self._graph_helper.change_calculator

def pre_fork(self):
"""Pre-fork controls."""
self._scheduler.pre_fork()

def setup(self):
"""Service setup."""
# Register filesystem event handlers on an FSEventService instance.
Expand Down
18 changes: 12 additions & 6 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Core {
pub vfs: PosixFS,
// TODO: This is a second pool (relative to the VFS pool), upon which all work is
// submitted. See https://github.com/pantsbuild/pants/issues/4298
pool: RwLock<CpuPool>,
pool: RwLock<Option<CpuPool>>,
}

impl Core {
Expand Down Expand Up @@ -66,11 +66,11 @@ impl Core {
.unwrap_or_else(|e| {
panic!("Could not initialize VFS: {:?}", e);
}),
pool: RwLock::new(Core::create_pool()),
pool: RwLock::new(Some(Core::create_pool())),
}
}

pub fn pool(&self) -> RwLockReadGuard<CpuPool> {
pub fn pool(&self) -> RwLockReadGuard<Option<CpuPool>> {
self.pool.read().unwrap()
}

Expand All @@ -80,6 +80,12 @@ impl Core {
.create()
}

pub fn pre_fork(&self) {
self.vfs.pre_fork();
let mut pool = self.pool.write().unwrap();
*pool = None;
}

/**
* Reinitializes a Core in a new process (basically, recreates its CpuPool).
*/
Expand All @@ -88,7 +94,7 @@ impl Core {
self.vfs.post_fork();
// And our own.
let mut pool = self.pool.write().unwrap();
*pool = Core::create_pool();
*pool = Some(Core::create_pool());
}
}

Expand All @@ -109,7 +115,7 @@ impl Context {

pub trait ContextFactory {
fn create(&self, entry_id: EntryId) -> Context;
fn pool(&self) -> RwLockReadGuard<CpuPool>;
fn pool(&self) -> RwLockReadGuard<Option<CpuPool>>;
}

impl ContextFactory for Context {
Expand All @@ -124,7 +130,7 @@ impl ContextFactory for Context {
}
}

fn pool(&self) -> RwLockReadGuard<CpuPool> {
fn pool(&self) -> RwLockReadGuard<Option<CpuPool>> {
self.core.pool()
}
}
67 changes: 39 additions & 28 deletions src/rust/engine/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ struct PathGlobsExpansion<T: Sized> {
pub struct PosixFS {
build_root: Dir,
// The pool needs to be reinitialized after a fork, so it is protected by a lock.
pool: RwLock<CpuPool>,
pool: RwLock<Option<CpuPool>>,
ignore: Gitignore,
}

Expand All @@ -285,7 +285,7 @@ impl PosixFS {
build_root: PathBuf,
ignore_patterns: Vec<String>,
) -> Result<PosixFS, String> {
let pool = RwLock::new(PosixFS::create_pool());
let pool = RwLock::new(Some(PosixFS::create_pool()));
let canonical_build_root =
build_root.canonicalize().and_then(|canonical|
canonical.metadata().and_then(|metadata|
Expand Down Expand Up @@ -346,13 +346,18 @@ impl PosixFS {
Ok(stats)
}

fn pool(&self) -> RwLockReadGuard<CpuPool> {
fn pool(&self) -> RwLockReadGuard<Option<CpuPool>> {
self.pool.read().unwrap()
}

pub fn pre_fork(&self) {
let mut pool = self.pool.write().unwrap();
*pool = None;
}

pub fn post_fork(&self) {
let mut pool = self.pool.write().unwrap();
*pool = PosixFS::create_pool();
*pool = Some(PosixFS::create_pool());
}

pub fn ignore<P: AsRef<Path>>(&self, path: P, is_dir: bool) -> bool {
Expand All @@ -365,7 +370,8 @@ impl PosixFS {
pub fn read_link(&self, link: &Link) -> BoxFuture<PathBuf, io::Error> {
let link_parent = link.0.parent().map(|p| p.to_owned());
let link_abs = self.build_root.0.join(link.0.as_path()).to_owned();
self.pool()
let pool = self.pool();
pool.as_ref().expect("Uninitialized CpuPool!")
.spawn_fn(move || {
link_abs
.read_link()
Expand Down Expand Up @@ -393,7 +399,8 @@ impl PosixFS {
pub fn scandir(&self, dir: &Dir) -> BoxFuture<Vec<Stat>, io::Error> {
let dir = dir.to_owned();
let dir_abs = self.build_root.0.join(dir.0.as_path());
self.pool()
let pool = self.pool();
pool.as_ref().expect("Uninitialized CpuPool!")
.spawn_fn(move || {
PosixFS::scandir_sync(dir, dir_abs)
})
Expand Down Expand Up @@ -839,24 +846,26 @@ impl Snapshots {
let build_root = fs.build_root.clone();
let temp_path = self.next_temp_path().expect("Couldn't get the next temp path.");

fs.pool().spawn_fn(move || {
// Write the tar deterministically to a temporary file while fingerprinting.
let fingerprint =
Snapshots::tar_create_fingerprinted(temp_path.as_path(), &paths, &build_root)?;

// Rename to the final path if it does not already exist.
Snapshots::finalize(
temp_path.as_path(),
Snapshots::path_under_for(&dest_dir, &fingerprint).as_path()
)?;
let pool = fs.pool();
pool.as_ref().expect("Uninitialized CpuPool!")
.spawn_fn(move || {
// Write the tar deterministically to a temporary file while fingerprinting.
let fingerprint =
Snapshots::tar_create_fingerprinted(temp_path.as_path(), &paths, &build_root)?;

// Rename to the final path if it does not already exist.
Snapshots::finalize(
temp_path.as_path(),
Snapshots::path_under_for(&dest_dir, &fingerprint).as_path()
)?;

Ok(
Snapshot {
fingerprint: fingerprint,
path_stats: paths,
}
)
})
Ok(
Snapshot {
fingerprint: fingerprint,
path_stats: paths,
}
)
})
}

fn contents_for_sync(snapshot: Snapshot, path: PathBuf) -> Result<Vec<FileContent>, io::Error> {
Expand Down Expand Up @@ -887,10 +896,12 @@ impl Snapshots {

pub fn contents_for(&self, fs: &PosixFS, snapshot: Snapshot) -> CpuFuture<Vec<FileContent>, String> {
let archive_path = self.path_for(&snapshot.fingerprint);
fs.pool().spawn_fn(move || {
let snapshot_str = format!("{:?}", snapshot);
Snapshots::contents_for_sync(snapshot, archive_path)
.map_err(|e| format!("Failed to open Snapshot {}: {:?}", snapshot_str, e))
})
let pool = fs.pool();
pool.as_ref().expect("Uninitialized CpuPool!")
.spawn_fn(move || {
let snapshot_str = format!("{:?}", snapshot);
Snapshots::contents_for_sync(snapshot, archive_path)
.map_err(|e| format!("Failed to open Snapshot {}: {:?}", snapshot_str, e))
})
}
}
3 changes: 2 additions & 1 deletion src/rust/engine/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ impl Entry {
let state =
match self.node.clone() {
EntryKey::Valid(n) => {
let pool = context_factory.pool();
let pool_opt = context_factory.pool();
let pool = pool_opt.as_ref().expect("Uninitialized CpuPool!");
let context = context_factory.create(entry_id);
pool
.spawn_fn(move || {
Expand Down
7 changes: 7 additions & 0 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,13 @@ pub extern fn scheduler_create(
)
}

#[no_mangle]
pub extern fn scheduler_pre_fork(scheduler_ptr: *mut Scheduler) {
with_scheduler(scheduler_ptr, |scheduler| {
scheduler.core.pre_fork();
})
}

#[no_mangle]
pub extern fn scheduler_post_fork(scheduler_ptr: *mut Scheduler) {
with_scheduler(scheduler_ptr, |scheduler| {
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl ContextFactory for Arc<Core> {
Context::new(entry_id, self.clone())
}

fn pool(&self) -> RwLockReadGuard<CpuPool> {
fn pool(&self) -> RwLockReadGuard<Option<CpuPool>> {
Core::pool(self)
}
}
Expand Down
2 changes: 0 additions & 2 deletions tests/python/pants_test/pantsd/test_pantsd_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
unicode_literals, with_statement)

import os
import unittest
from contextlib import contextmanager

from pants.pantsd.process_manager import ProcessManager
Expand Down Expand Up @@ -78,7 +77,6 @@ def test_pantsd_run(self):
for line in read_pantsd_log(workdir):
print(line)

@unittest.skip('TODO: See https://github.com/pantsbuild/pants/issues/4301')
def test_pantsd_run_with_watchman(self):
config = {'pantsd': {'fs_event_detection': True},
# The absolute paths in CI can exceed the UNIX socket path limitation
Expand Down

0 comments on commit 177deb9

Please sign in to comment.