Skip to content

Commit

Permalink
chore: Return an error instead of hanging when workers unavailable
Browse files Browse the repository at this point in the history
Bug: N/A
Change-Id: I93c34aff1c46b30660e2f3ebb00316bc18f53538
GitOrigin-RevId: 02347f41f50ec8ab6717a3af66950fdaae4f559a
  • Loading branch information
Privacy Sandbox Team authored and copybara-github committed Sep 13, 2024
1 parent 6e03f66 commit aa88dc5
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 3 deletions.
8 changes: 8 additions & 0 deletions src/roma/byob/container/run_workers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ int main(int argc, char** argv) {
PLOG(ERROR) << "connect() to " << socket_name << " failed";
return -1;
}
absl::Cleanup fd_cleanup = [fd] {
if (::shutdown(fd, SHUT_RDWR) == -1) {
PLOG(INFO) << "shutdown()";
}
if (::close(fd) == -1) {
PLOG(INFO) << "close()";
}
};
struct UdfInstanceMetadata {
std::string pivot_root_dir;
std::string code_token;
Expand Down
22 changes: 19 additions & 3 deletions src/roma/byob/dispatcher/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ std::string Dispatcher::LoadBinary(std::filesystem::path binary_path,
}

void Dispatcher::AcceptorImpl() {
{
absl::MutexLock lock(&mu_);
acceptor_active_ = true;
}
while (true) {
const int fd = ::accept(listen_fd_, nullptr, nullptr);
if (fd == -1) {
Expand All @@ -111,6 +115,10 @@ void Dispatcher::AcceptorImpl() {
absl::MutexLock lock(&mu_);
code_token_to_fds_[buffer].push(fd);
}
{
absl::MutexLock lock(&mu_);
acceptor_active_ = false;
}
}

namespace {
Expand All @@ -134,13 +142,21 @@ void Dispatcher::ExecutorImpl(
{
auto fn = [&] {
mu_.AssertReaderHeld();
return !code_token_to_fds_[code_token].empty();
return !code_token_to_fds_[code_token].empty() || !acceptor_active_;
};
absl::MutexLock l(&mu_);
mu_.Await(absl::Condition(&fn));
auto& fds = code_token_to_fds_[code_token];
fd = fds.front();
fds.pop();
if (!fds.empty()) {
fd = fds.front();
fds.pop();
} else {
// No workers are available and the acceptor is no longer accepting
// connections from new workers.
std::move(callback)(
absl::FailedPreconditionError("No workers available."));
return;
}
}
SerializeDelimitedToFileDescriptor(request, fd);
FileInputStream input(fd);
Expand Down
1 change: 1 addition & 0 deletions src/roma/byob/dispatcher/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class Dispatcher {
int connection_fd_;
std::optional<std::thread> acceptor_;
absl::Mutex mu_;
bool acceptor_active_ ABSL_GUARDED_BY(mu_) = false;
int executor_threads_in_flight_ ABSL_GUARDED_BY(mu_) = 0;
absl::flat_hash_map<std::string, std::queue<int>> code_token_to_fds_
ABSL_GUARDED_BY(mu_);
Expand Down
8 changes: 8 additions & 0 deletions src/roma/byob/dispatcher/run_workers_without_sandbox.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ int main(int argc, char** argv) {
PLOG(ERROR) << "connect() to " << socket_name << " failed";
return -1;
}
absl::Cleanup fd_cleanup = [fd] {
if (::shutdown(fd, SHUT_RDWR) == -1) {
PLOG(INFO) << "shutdown()";
}
if (::close(fd) == -1) {
PLOG(INFO) << "close()";
}
};
struct UdfInstanceMetadata {
std::string code_token;
std::string binary_path;
Expand Down

0 comments on commit aa88dc5

Please sign in to comment.