Skip to content

Commit

Permalink
[8.0.0] Extract methods to acquire and release a filesystem lock. (#2…
Browse files Browse the repository at this point in the history
…4223)

They are currently only used to acquire a lock on the output base, but a
future change will use them to lock the install base as well.

Human-readable output is also amended to refer to the "output base lock"
instead of the "client lock", as the latter term becomes ambiguous once
multiple locks exist.

Progress on #2109.

PiperOrigin-RevId: 693354279
Change-Id: I2b39e6f5ddb83bbc2be15a31d7de9655358776c5
  • Loading branch information
tjgq authored Nov 6, 2024
1 parent 54e7b4d commit a01a1fa
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 89 deletions.
47 changes: 34 additions & 13 deletions src/main/cpp/blaze.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@
#include <grpcpp/security/credentials.h>
#include <limits.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <algorithm>
#include <chrono> // NOLINT (gRPC requires this)
#include <cinttypes>
#include <map>
#include <memory>
#include <mutex> // NOLINT
#include <optional>
#include <set>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -195,8 +195,8 @@ class BlazeServer final {
public:
explicit BlazeServer(const StartupOptions &startup_options);

// Acquire a lock for the server running in this output base. Returns the
// number of milliseconds spent waiting for the lock.
// Acquire a lock for the output base this server is running in.
// Returns the number of milliseconds spent waiting for the lock.
uint64_t AcquireLock();

// Whether there is an active connection to a server.
Expand Down Expand Up @@ -232,7 +232,7 @@ class BlazeServer final {
const ServerProcessInfo &ProcessInfo() const { return process_info_; }

private:
BlazeLock blaze_lock_;
std::optional<LockHandle> output_base_lock_;

enum CancelThreadAction { NOTHING, JOIN, CANCEL, COMMAND_ID_RECEIVED };

Expand All @@ -250,6 +250,7 @@ class BlazeServer final {
// actions from.
std::unique_ptr<blaze_util::IPipe> pipe_;

void ReleaseLock();
bool TryConnect(CommandServer::Stub *client);
void CancelThread();
void SendAction(CancelThreadAction action);
Expand All @@ -274,8 +275,28 @@ static BlazeServer *blaze_server;
// objects before those.

uint64_t BlazeServer::AcquireLock() {
return blaze::AcquireLock(output_base_, batch_, block_for_lock_,
&blaze_lock_);
if (output_base_lock_.has_value()) {
BAZEL_DIE(blaze_exit_code::INTERNAL_ERROR)
<< "AcquireLock() called but the lock is already held.";
}
// Take an exclusive lock on the output base, because two simultaneous
// commands may not run against the same output base. Note that this lock will
// be released by ReleaseLock() once the server is running, as it can handle
// concurrent clients on its own.
uint64_t wait_time;
output_base_lock_ = blaze::AcquireLock(
"output base", output_base_.GetRelative("lock"), LockMode::kExclusive,
batch_, block_for_lock_, &wait_time);
return wait_time;
}

void BlazeServer::ReleaseLock() {
if (!output_base_lock_.has_value()) {
BAZEL_DIE(blaze_exit_code::INTERNAL_ERROR)
<< "ReleaseLock() called without a lock to release.";
}
blaze::ReleaseLock(*output_base_lock_);
output_base_lock_ = std::nullopt;
}

////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1857,12 +1878,12 @@ unsigned int BlazeServer::Communicate(
std::unique_ptr<grpc::ClientReader<command_server::RunResponse>> reader(
client_->Run(context.get(), request));

// Release the server lock because the gRPC handles concurrent clients just
// fine. Note that this may result in two "waiting for other client" messages
// (one during server startup and one emitted by the server)
BAZEL_LOG(INFO)
<< "Releasing client lock, let the server manage concurrent requests.";
blaze::ReleaseLock(&blaze_lock_);
// Release the client lock because the gRPC server handles concurrent clients
// just fine. Note that this may result in two "waiting for other client"
// messages (one during server startup and one emitted by the server).
BAZEL_LOG(INFO) << "Releasing the client lock, as the server can manage "
"concurrent clients on its own.";
ReleaseLock();

std::thread cancel_thread(&BlazeServer::CancelThread, this);
bool command_id_set = false;
Expand Down
32 changes: 17 additions & 15 deletions src/main/cpp/blaze_util_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,23 +198,25 @@ extern const char kListSeparator;
bool SymlinkDirectories(const std::string& target,
const blaze_util::Path& link);

struct BlazeLock {
#if defined(_WIN32) || defined(__CYGWIN__)
/* HANDLE */ void* handle;
#else
int lockfd;
#endif
};
typedef uintptr_t LockHandle;

// Acquires a lock on the output base. Exits if the lock cannot be acquired.
// Sets `blaze_lock` to a value that can be later passed to ReleaseLock().
// Returns the number of milliseconds spent with waiting for the lock.
uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
bool block, BlazeLock* blaze_lock);
enum class LockMode {
kShared,
kExclusive,
};

// Releases the lock on the output base. In case of an error, continues as
// usual.
void ReleaseLock(BlazeLock* blaze_lock);
// Acquires a `mode` lock on `path`, busy-waiting until it becomes available if
// `block` is true, and releasing it on exec if `batch_mode` is false.
// Crashes if the lock cannot be acquired. Returns a handle that can be
// subsequently passed to ReleaseLock. Sets `wait_time` to the number of
// milliseconds spent waiting for the lock. The `name` argument is used to
// distinguish it from other locks in human-readable error messages.
LockHandle AcquireLock(const std::string& name, const blaze_util::Path& path,
LockMode mode, bool batch_mode, bool block,
uint64_t* wait_time);

// Releases a lock previously obtained from AcquireLock.
void ReleaseLock(LockHandle lock_handle);

// Verifies whether the server process still exists. Returns true if it does.
bool VerifyServerProcess(int pid, const blaze_util::Path& output_base);
Expand Down
87 changes: 49 additions & 38 deletions src/main/cpp/blaze_util_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ void SigPrintf(const char *format, ...) {
}
}

static int setlk(int fd, struct flock *lock) {
static int setlk(int fd, struct flock* lock) {
#ifdef __linux__
// If we're building with glibc <2.20, or another libc which predates
// OFD locks, define the constant ourselves. This assumes that the libc
Expand Down Expand Up @@ -634,34 +634,41 @@ static int setlk(int fd, struct flock *lock) {
return -1;
}

uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
bool block, BlazeLock* blaze_lock) {
blaze_util::Path lockfile = output_base.GetRelative("lock");

int flags = O_CREAT | O_RDWR;
LockHandle AcquireLock(const std::string& name, const blaze_util::Path& path,
LockMode mode, bool batch_mode, bool block,
uint64_t* wait_time) {
int flags = O_CREAT;
switch (mode) {
case LockMode::kShared:
flags |= O_RDONLY;
break;
case LockMode::kExclusive:
flags |= O_RDWR;
break;
}
// Keep server from inheriting a useless fd if we are not in batch mode.
if (!batch_mode) {
flags |= O_CLOEXEC;
}

int lockfd = open(lockfile.AsNativePath().c_str(), flags, 0644);
if (lockfd < 0) {
int fd = open(path.AsNativePath().c_str(), flags, 0644);
if (fd < 0) {
string err = GetLastErrorString();
BAZEL_DIE(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR)
<< "cannot open lockfile '" << lockfile.AsPrintablePath()
<< "' for writing: " << err;
<< "open failed for " << name << " lock: " << err;
}

struct flock lock = {};
lock.l_type = F_WRLCK;
lock.l_type = static_cast<short>( // NOLINT (short is the right type)
mode == LockMode::kShared ? F_RDLCK : F_WRLCK);
lock.l_whence = SEEK_SET;
lock.l_start = 0;
// This doesn't really matter now, but allows us to subdivide the lock
// later if that becomes meaningful. (Ranges beyond EOF can be locked.)
lock.l_len = 4096;

// Take the exclusive server lock. If we fail, we busy-wait until the lock
// becomes available.
// Take the lock. If it fails, busy-wait until it becomes available unless
// --noblock_for_lock was set.
//
// We used to rely on fcntl(F_SETLKW) to lazy-wait for the lock to become
// available, which is theoretically fine, but doing so prevents us from
Expand All @@ -673,18 +680,19 @@ uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
bool multiple_attempts = false;
string owner;
const uint64_t start_time = GetMillisecondsMonotonic();
while (setlk(lockfd, &lock) == -1) {
while (setlk(fd, &lock) == -1) {
string buffer(4096, 0);
ssize_t r = pread(lockfd, &buffer[0], buffer.size(), 0);
ssize_t r = pread(fd, &buffer[0], buffer.size(), 0);
if (r < 0) {
BAZEL_LOG(WARNING) << "pread() lock file: " << strerror(errno);
BAZEL_LOG(WARNING) << "pread() " << name << " lock: " << strerror(errno);
r = 0;
}
buffer.resize(r);
if (owner != buffer) {
// Each time we learn a new lock owner, print it out.
owner = buffer;
BAZEL_LOG(USER) << "Another command holds the client lock: \n" << owner;
BAZEL_LOG(USER) << "Another command holds the " << name << " lock: \n"
<< owner;
if (block) {
BAZEL_LOG(USER) << "Waiting for it to complete...";
fflush(stderr);
Expand All @@ -693,8 +701,8 @@ uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,

if (!block) {
BAZEL_DIE(blaze_exit_code::LOCK_HELD_NOBLOCK_FOR_LOCK)
<< "Exiting because the lock is held and --noblock_for_lock was "
"given.";
<< "Exiting because the " << name
<< " lock is held and --noblock_for_lock was given.";
}

TrySleep(500);
Expand All @@ -706,28 +714,31 @@ uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
// avoid unnecessary noise in the logs. In this metric, we are only
// interested in knowing how long it took for other commands to complete, not
// how fast acquiring a lock is.
const uint64_t wait_time = !multiple_attempts ? 0 : end_time - start_time;
const uint64_t elapsed_time = !multiple_attempts ? 0 : end_time - start_time;

// Identify ourselves in the lockfile.
// If taking an exclusive lock, identify ourselves in the lockfile.
// The contents are printed for human consumption when another client
// fails to take the lock, but not parsed otherwise.
(void) ftruncate(lockfd, 0);
lseek(lockfd, 0, SEEK_SET);
// Arguably we should ensure this fits in the 4KB we lock. In practice no one
// will have a cwd long enough to overflow that, and nothing currently uses
// the rest of the lock file anyway.
dprintf(lockfd, "pid=%d\nowner=client\n", getpid());
string cwd = blaze_util::GetCwd();
dprintf(lockfd, "cwd=%s\n", cwd.c_str());
if (const char *tty = ttyname(STDIN_FILENO)) { // NOLINT (single-threaded)
dprintf(lockfd, "tty=%s\n", tty);
}
blaze_lock->lockfd = lockfd;
return wait_time;
}

void ReleaseLock(BlazeLock* blaze_lock) {
close(blaze_lock->lockfd);
if (mode == LockMode::kExclusive) {
(void)ftruncate(fd, 0);
lseek(fd, 0, SEEK_SET);
// Arguably we should ensure this fits in the 4KB we lock. In practice no
// one will have a cwd long enough to overflow that, and nothing currently
// uses the rest of the lock file anyway.
dprintf(fd, "pid=%d\nowner=client\n", getpid());
string cwd = blaze_util::GetCwd();
dprintf(fd, "cwd=%s\n", cwd.c_str());
if (const char* tty = ttyname(STDIN_FILENO)) { // NOLINT (single-threaded)
dprintf(fd, "tty=%s\n", tty);
}
}

*wait_time = elapsed_time;
return static_cast<LockHandle>(fd);
}

void ReleaseLock(LockHandle lock_handle) {
close(static_cast<int>(lock_handle));
}

bool KillServerProcess(int pid, const blaze_util::Path& output_base) {
Expand Down
58 changes: 35 additions & 23 deletions src/main/cpp/blaze_util_windows.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1085,36 +1085,44 @@ uint64_t WindowsClock::GetMilliseconds() const {
return GetMillisecondsAsLargeInt(kFrequency).QuadPart;
}

uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
bool block, BlazeLock* blaze_lock) {
blaze_util::Path lockfile = output_base.GetRelative("lock");
LockHandle AcquireLock(const std::string& name, const blaze_util::Path& path,
LockMode mode, bool batch_mode, bool block,
uint64_t* wait_time) {
DWORD desired_access = GENERIC_READ;
if (mode == LockMode::kExclusive) {
desired_access |= GENERIC_WRITE;
}

// CreateFile defaults to opening the file exclusively. We intentionally open
// it in shared mode and instead rely on LockFileEx to obtain an exclusive
// lock, mimicking the behavior of FileChannel in the JVM, to make locks
// obtained on the client side compatible with the server side.
// it in shared mode and instead use LockFileEx to obtain a lock. This mimicks
// the FileChannel implementation in the JVM, making locks obtained on the
// client side compatible with the server side.
HANDLE handle = ::CreateFileW(
/* lpFileName */ lockfile.AsNativePath().c_str(),
/* dwDesiredAccess */ GENERIC_READ | GENERIC_WRITE,
/* lpFileName */ path.AsNativePath().c_str(),
/* dwDesiredAccess */ desired_access,
/* dwShareMode */ FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
/* lpSecurityAttributes */ nullptr,
/* dwCreationDisposition */ CREATE_ALWAYS,
/* dwFlagsAndAttributes */ FILE_ATTRIBUTE_NORMAL,
/* hTemplateFile */ nullptr);
if (handle == INVALID_HANDLE_VALUE) {
BAZEL_DIE(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR)
<< "Failed to CreateFile(" << lockfile.AsPrintablePath()
<< "): " << GetLastErrorString();
<< "CreateFile failed for " << name
<< " lock: " << GetLastErrorString();
}

bool first_lock_attempt = true;
uint64_t start_time = GetMillisecondsMonotonic();

while (true) {
OVERLAPPED overlapped = {};
DWORD flags = LOCKFILE_FAIL_IMMEDIATELY;
if (mode == LockMode::kExclusive) {
flags |= LOCKFILE_EXCLUSIVE_LOCK;
}
BOOL success = LockFileEx(
/* hFile */ handle,
/* dwFlags */ LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
/* dwFlags */ flags,
/* dwReserved */ 0,
/* nNumberOfBytesToLockLow */ 1,
/* nNumberOfBytesToLockHigh */ 0,
Expand All @@ -1129,22 +1137,22 @@ uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
// See https://devblogs.microsoft.com/oldnewthing/20140905-00/?p=63.
if (GetLastError() != ERROR_LOCK_VIOLATION) {
BAZEL_DIE(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR)
<< "Unexpected result from LockFileEx(" << lockfile.AsPrintablePath()
<< "):" << GetLastErrorString();
<< "LockFileEx failed for " << name
<< " lock: " << GetLastErrorString();
}
// Someone else has the lock.
if (first_lock_attempt) {
first_lock_attempt = false;
BAZEL_LOG(USER) << "Another command holds the client lock.";
BAZEL_LOG(USER) << "Another command holds the " << name << " lock.";
if (block) {
BAZEL_LOG(USER) << "Waiting for it to complete...";
}
fflush(stderr);
}
if (!block) {
BAZEL_DIE(blaze_exit_code::LOCK_HELD_NOBLOCK_FOR_LOCK)
<< "Exiting because the lock is held and --noblock_for_lock was "
"given.";
<< "Exiting because the " << name
<< " lock is held and --noblock_for_lock was given.";
}
Sleep(/* dwMilliseconds */ 500);
}
Expand All @@ -1154,17 +1162,21 @@ uint64_t AcquireLock(const blaze_util::Path& output_base, bool batch_mode,
// a concurrent process can read and display it. On Windows we can't do so
// because locks are mandatory, thus we cannot read the file concurrently.

uint64_t wait_time = GetMillisecondsMonotonic() - start_time;
blaze_lock->handle = handle;
return wait_time;
*wait_time = GetMillisecondsMonotonic() - start_time;
return reinterpret_cast<LockHandle>(handle);
}

void ReleaseLock(BlazeLock* blaze_lock) {
void ReleaseLock(LockHandle lock_handle) {
HANDLE handle = reinterpret_cast<HANDLE>(lock_handle);
OVERLAPPED overlapped = {0};
UnlockFileEx(blaze_lock->handle, 0, 1, 0, &overlapped);
CloseHandle(blaze_lock->handle);
UnlockFileEx(
/* hFile */ handle,
/* dwReserved */ 0,
/* nNumberOfBytesToUnlockLow */ 1,
/* nNumberOfBytesToUnlockHigh */ 0,
/* lpOverlapped */ &overlapped);
CloseHandle(handle);
}

#ifdef GetUserName
// By including <windows.h>, we have GetUserName defined either as
// GetUserNameA or GetUserNameW.
Expand Down

0 comments on commit a01a1fa

Please sign in to comment.