Skip to content

Commit

Permalink
Add Jobserver::Pool class
Browse files Browse the repository at this point in the history
Add a class that implements a Jobserver pool of job slots.
This will later be used by Ninja to because a jobserver pool
itself for its own spawned sub-processes.

This CL includes a Posix and a Win32 implementation + some
unit-tests.
  • Loading branch information
digit-google committed Oct 1, 2024
1 parent f93ced2 commit a833683
Show file tree
Hide file tree
Showing 5 changed files with 351 additions and 0 deletions.
142 changes: 142 additions & 0 deletions src/jobserver-posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,145 @@ class PosixJobserverClient : public Jobserver::Client {
int read_fd_ = -1;
int write_fd_ = -1;
};

class PosixJobserverPool : public Jobserver::Pool {
public:
static std::unique_ptr<PosixJobserverPool> Create(
size_t slot_count, Jobserver::Config::Mode mode, std::string* error) {
assert(slot_count > 1 && "slot_count must be 2 or higher");
bool success;
auto pool = std::unique_ptr<PosixJobserverPool>(new PosixJobserverPool());
if (mode == Jobserver::Config::kModePipe) {
success = pool->InitWithPipe(slot_count, error);
} else if (mode == Jobserver::Config::kModePosixFifo) {
success = pool->InitWithFifo(slot_count, error);
} else {
*error = "Win32 semaphore mode not supported on Posix!";
success = false;
}
if (!success)
pool.reset(nullptr);
return pool;
}

std::string GetEnvMakeFlagsValue() const override {
std::string result;
if (!fifo_.empty()) {
result.resize(fifo_.size() + 32);
int ret = snprintf(const_cast<char*>(result.data()), result.size(),
" -j%zd --jobserver-auth=fifo:%s", job_count_,
fifo_.c_str());
if (ret < 0 || ret > static_cast<int>(result.size()))
Fatal("Could not format PosixJobserverPool MAKEFLAGS!");
result.resize(static_cast<size_t>(ret));
}
if (read_fd_ >= 0 && write_fd_ >= 0) {
result.resize(256);
// See technical note in jobserver.c for formatting justification.
int ret = snprintf(const_cast<char*>(result.data()), result.size(),
" -j%zu --jobserver-fds=%d,%d --jobserver-auth=%d,%d",
job_count_, read_fd_, write_fd_, read_fd_, write_fd_);
if (ret < 0 || ret > static_cast<int>(result.size()))
Fatal("Could not format PosixJobserverPool MAKEFLAGS!");
result.resize(static_cast<size_t>(ret));
}
return result;
}

virtual ~PosixJobserverPool() {
if (read_fd_ >= 0)
::close(read_fd_);
if (write_fd_ >= 0)
::close(write_fd_);
if (!fifo_.empty())
::unlink(fifo_.c_str());
}

private:
PosixJobserverPool() = default;

// Fill the pool to satisfy |slot_count| job slots. This
// writes |slot_count - 1| bytes to the pipe to satisfy the
// implicit job slot requirement.
bool FillSlots(size_t slot_count, std::string* error) {
job_count_ = slot_count;
for (; slot_count > 1; --slot_count) {
// Write '+' into the pipe, just like GNU Make. Note that some
// implementations write '|' instead, but so far no client or pool
// implementation cares about the exact value, though the official spec
// says this might change in the future.
const char slot_char = '+';
int ret = ::write(write_fd_, &slot_char, 1);
if (ret != 1) {
if (ret < 0 && errno == EINTR)
continue;
*error =
std::string("Could not fill job slots pool: ") + strerror(errno);
return false;
}
}
return true;
}

bool InitWithPipe(size_t slot_count, std::string* error) {
// Create anonymous pipe, then write job slot tokens into it.
int fds[2] = { -1, -1 };
int ret = pipe(fds);
if (ret < 0) {
*error =
std::string("Could not create anonymous pipe: ") + strerror(errno);
return false;
}

// The file descriptors returned by pipe() are already heritable and
// blocking, which is exactly what's needed here.
read_fd_ = fds[0];
write_fd_ = fds[1];

return FillSlots(slot_count, error);
}

bool InitWithFifo(size_t slot_count, std::string* error) {
const char* tmp_dir = getenv("TMPDIR");
if (!tmp_dir)
tmp_dir = "/tmp";

fifo_.resize(strlen(tmp_dir) + 32);
int len = snprintf(const_cast<char*>(fifo_.data()), fifo_.size(),
"%s/NinjaFIFO%d", tmp_dir, getpid());
if (len < 0) {
*error = "Cannot create fifo path!";
return false;
}
fifo_.resize(static_cast<size_t>(len));

int ret = mknod(fifo_.c_str(), S_IFIFO | 0666, 0);
if (ret < 0) {
*error = std::string("Cannot create fifo: ") + strerror(errno);
return false;
}

do {
write_fd_ = ::open(fifo_.c_str(), O_RDWR | O_CLOEXEC);
} while (write_fd_ < 0 && errno == EINTR);
if (write_fd_ < 0) {
*error = std::string("Could not open fifo: ") + strerror(errno);
// Let destructor remove the fifo.
return false;
}

return FillSlots(slot_count, error);
}

// Number of parallel job slots (including implicit one).
size_t job_count_ = 0;

// In pipe mode, these are inheritable read and write descriptors for the
// pipe. In fifo mode, read_fd_ will be -1, and write_fd_ will be a
// non-inheritable descriptor to keep the FIFO alive.
int read_fd_ = -1;
int write_fd_ = -1;

// Path to fifo, this will be empty when using an anonymous pipe.
std::string fifo_;
};
73 changes: 73 additions & 0 deletions src/jobserver-win32.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,76 @@ class Win32JobserverClient : public Jobserver::Client {
// Semaphore handle. NULL means not in use.
HANDLE handle_ = NULL;
};

class Win32JobserverPool : public Jobserver::Pool {
public:
static std::unique_ptr<Win32JobserverPool> Create(
size_t slot_count, Jobserver::Config::Mode mode, std::string* error) {
assert(slot_count > 1 && "slot_count must be 2 or higher");
bool success;
auto pool = std::unique_ptr<Win32JobserverPool>(new Win32JobserverPool());
if (mode == Jobserver::Config::kModeWin32Semaphore) {
success = pool->InitWithSemaphore(slot_count, error);
} else {
*error = "Unsupported jobserver mode";
success = false;
}
if (!success)
pool.reset(nullptr);
return pool;
}

std::string GetEnvMakeFlagsValue() const override {
std::string result;
result.resize(sem_name_.size() + 32);
int ret =
snprintf(const_cast<char*>(result.data()), result.size(),
" -j%zd --jobserver-auth=%s", job_count_, sem_name_.c_str());
if (ret < 0 || ret > static_cast<int>(result.size()))
Fatal("Could not format Win32JobserverPool MAKEFLAGS!");

return result;
}

virtual ~Win32JobserverPool() {
if (IsValid())
::CloseHandle(handle_);
}

private:
Win32JobserverPool() = default;

// CreateSemaphore returns NULL on failure.
bool IsValid() const { return handle_ != NULL; }

// Compute semaphore name for new instance.
static std::string GetSemaphoreName() {
static int counter = 0;
counter += 1;
char name[64];
snprintf(name, sizeof(name), "ninja_jobserver_pool_%d_%d",
GetCurrentProcessId(), counter);
return std::string(name);
}

bool InitWithSemaphore(size_t slot_count, std::string* error) {
job_count_ = slot_count;
sem_name_ = GetSemaphoreName();
LONG count = static_cast<LONG>(slot_count - 1);
handle_ = ::CreateSemaphoreA(NULL, count, count, sem_name_.c_str());
if (!IsValid()) {
*error = "Could not create semaphore: " + GetLastErrorString();
return false;
}
return true;
}

// Semaphore handle.
HANDLE handle_ = NULL;

// Saved slot count.
size_t job_count_ = 0;

// Semaphore name.
std::string sem_name_;
};
15 changes: 15 additions & 0 deletions src/jobserver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,18 @@ std::unique_ptr<Jobserver::Client> Jobserver::Client::Create(
*error = "Unsupported jobserver mode";
return nullptr;
}

// static
std::unique_ptr<Jobserver::Pool> Jobserver::Pool::Create(
size_t num_job_slots, Jobserver::Config::Mode mode, std::string* error) {
if (num_job_slots < 2) {
*error = "At least 2 job slots needed";
return nullptr;
}

#ifdef _WIN32
return Win32JobserverPool::Create(num_job_slots, mode, error);
#else // !_WIN32
return PosixJobserverPool::Create(num_job_slots, mode, error);
#endif // !_WIN32
}
32 changes: 32 additions & 0 deletions src/jobserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,36 @@ struct Jobserver {
protected:
Client() = default;
};

/// Jobserver::Pool implements a jobserver pool of job slots according
/// to the GNU Make protocol. Usage is the following:
///
/// - Use Create() method to create new instances.
///
/// - Retrieve the value of the MAKEFLAGS environment variable, and
/// ensure it is passed to each client.
///
class Pool {
public:
/// Destructor.
virtual ~Pool() {}

/// Default implementation mode for the current platform.
#ifdef _WIN32
static constexpr Config::Mode kDefaultMode = Config::kModeWin32Semaphore;
#else // !_WIN32
static constexpr Config::Mode kDefaultMode = Config::kModePipe;
#endif // !_WIN32

/// Create new instance to use |num_slots| job slots, using a specific
/// implementation mode. On failure, set |*error| and return null.
///
/// Note that it is an error to use a value of |num_slots| that is <= 1.
static std::unique_ptr<Pool> Create(size_t num_job_slots, Config::Mode mode,
std::string* error);

/// Return the value of the MAKEFLAGS variable, corresponding to this
/// instance, to pass to sub-processes.
virtual std::string GetEnvMakeFlagsValue() const = 0;
};
};
89 changes: 89 additions & 0 deletions src/jobserver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,93 @@ TEST(Jobserver, PosixFifoClientWithWrongPath) {
EXPECT_FALSE(error.empty());
EXPECT_EQ("Empty fifo path", error);
}
#endif // _WIN32

TEST(Jobserver, DefaultPool) {
const size_t kSlotCount = 10;
std::string error;
auto pool = Jobserver::Pool::Create(kSlotCount,
Jobserver::Config::kModeDefault, &error);
ASSERT_TRUE(pool.get()) << error;
EXPECT_TRUE(error.empty());

std::string makeflags = pool->GetEnvMakeFlagsValue();
#ifdef _WIN32
std::string auth_prefix = " -j10 --jobserver-auth=";
#else // !_WIN32
std::string auth_prefix = " -j10 --jobserver-fds=";
#endif // !_WIN32
ASSERT_EQ(auth_prefix, makeflags.substr(0, auth_prefix.size()));

// Parse the MAKEFLAGS value to create a JobServer::Config
Jobserver::Config config;
ASSERT_TRUE(
Jobserver::ParseMakeFlagsValue(makeflags.c_str(), &config, &error));
EXPECT_EQ(config.mode, Jobserver::Config::kModeDefault);

// Create a client from the Config, and try to read all slots.
std::unique_ptr<Jobserver::Client> client =
Jobserver::Client::Create(config, &error);
EXPECT_TRUE(client.get());
EXPECT_TRUE(error.empty()) << error;

// First slot is always implicit.
Jobserver::Slot slot = client->TryAcquire();
EXPECT_TRUE(slot.IsValid());
EXPECT_TRUE(slot.IsImplicit());

// Then read kSlotCount - 1 slots from the pipe.
for (size_t n = 1; n < kSlotCount; ++n) {
Jobserver::Slot slot = client->TryAcquire();
EXPECT_TRUE(slot.IsValid()) << "Slot #" << n + 1;
EXPECT_TRUE(slot.IsExplicit()) << "Slot #" << n + 1;
}

// Pool should be empty now, so next TryAcquire() will fail.
slot = client->TryAcquire();
EXPECT_FALSE(slot.IsValid());
}

#ifndef _WIN32
TEST(Jobserver, PosixFifoPool) {
const size_t kSlotCount = 10;
std::string error;
auto pool = Jobserver::Pool::Create(
kSlotCount, Jobserver::Config::kModePosixFifo, &error);
ASSERT_TRUE(pool.get()) << error;
EXPECT_TRUE(error.empty());

std::string makeflags = pool->GetEnvMakeFlagsValue();

std::string auth_prefix = " -j10 --jobserver-auth=fifo:";
ASSERT_EQ(auth_prefix, makeflags.substr(0, auth_prefix.size()));

// Parse the MAKEFLAGS value to create a JobServer::Config
Jobserver::Config config;
ASSERT_TRUE(
Jobserver::ParseMakeFlagsValue(makeflags.c_str(), &config, &error));
EXPECT_EQ(config.mode, Jobserver::Config::kModePosixFifo);

// Create a client from the Config, and try to read all slots.
std::unique_ptr<Jobserver::Client> client =
Jobserver::Client::Create(config, &error);
EXPECT_TRUE(client.get());
EXPECT_TRUE(error.empty()) << error;

// First slot is always implicit.
Jobserver::Slot slot = client->TryAcquire();
EXPECT_TRUE(slot.IsValid());
EXPECT_TRUE(slot.IsImplicit());

// Then read kSlotCount - 1 slots from the pipe.
for (size_t n = 1; n < kSlotCount; ++n) {
Jobserver::Slot slot = client->TryAcquire();
EXPECT_TRUE(slot.IsValid()) << "Slot #" << n + 1;
EXPECT_TRUE(slot.IsExplicit()) << "Slot #" << n + 1;
}

// Pool should be empty now, so next TryAcquire() will fail.
slot = client->TryAcquire();
EXPECT_FALSE(slot.IsValid());
}
#endif // !_WIN32

0 comments on commit a833683

Please sign in to comment.