Skip to content

Commit

Permalink
Use a thread per connection
Browse files Browse the repository at this point in the history
  • Loading branch information
edolstra committed Oct 6, 2021
1 parent 777e7f8 commit 75b4e88
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 67 deletions.
127 changes: 60 additions & 67 deletions src/libstore/gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ void LocalStore::removeUnusedLinks(const GCState & state)
}



void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
{
GCState state(options, results);
Expand Down Expand Up @@ -674,83 +675,75 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
shutdownPipe.create();

std::thread serverThread([&]() {
std::map<int, std::pair<std::unique_ptr<AutoCloseFD>, std::string>> fdClients;
bool quit = false;
Sync<std::map<int, std::thread>> connections;
std::atomic_bool quit = false;

Finally cleanup([&]() {
debug("GC roots server shutting down");
while (true) {
auto item = remove_begin(*connections.lock());
if (!item) break;
auto & [fd, thread] = *item;
shutdown(fd, SHUT_RDWR);
thread.join();
}
});

while (!quit) {
while (true) {
std::vector<struct pollfd> fds;
fds.push_back({.fd = shutdownPipe.readSide.get(), .events = POLLIN});
fds.push_back({.fd = fdServer.get(), .events = POLLIN});
for (auto & i : fdClients)
fds.push_back({.fd = i.first, .events = POLLIN});
auto count = poll(fds.data(), fds.size(), -1);
assert(count != -1);

for (auto & fd : fds) {
if (!fd.revents) continue;
if (fd.fd == shutdownPipe.readSide.get())
/* Parent is asking us to quit. */
quit = true;
else if (fd.fd == fdServer.get()) {
/* Accept a new connection. */
assert(fd.revents & POLLIN);
auto fdClient = std::make_unique<AutoCloseFD>(accept(fdServer.get(), nullptr, nullptr));
if (*fdClient) {
auto fd = fdClient->get();
fdClients.insert({fd, std::make_pair(std::move(fdClient), "")});
if (fds[0].revents)
/* Parent is asking us to quit. */
break;

if (fds[1].revents) {
/* Accept a new connection. */
assert(fds[1].revents & POLLIN);
AutoCloseFD fdClient = accept(fdServer.get(), nullptr, nullptr);
if (!fdClient) continue;

/* Process the connection in a separate thread. */
auto fdClient_ = fdClient.get();
std::thread clientThread([&, fdClient = std::move(fdClient)]() {
Finally cleanup([&]() {
connections.lock()->erase(fdClient.get());
});

while (true) {
try {
auto path = readLine(fdClient.get());
auto storePath = maybeParseStorePath(path);
if (storePath) {
debug("got new GC root '%s'", path);
auto hashPart = std::string(storePath->hashPart());
auto shared(state.shared.lock());
shared->tempRoots.insert(hashPart);
/* If this path is currently being
deleted, then we have to wait until
deletion is finished to ensure that
the client doesn't start
re-creating it before we're
done. FIXME: ideally we would use a
FD for this so we don't block the
poll loop. */
while (shared->pending == hashPart) {
debug("synchronising with deletion of path '%s'", path);
shared.wait(state.wakeup);
}
} else
printError("received garbage instead of a root from client");
writeFull(fdClient.get(), "1", false);
} catch (Error &) { break; }
}
}
else {
/* Receive data from a client. */
auto fdClient = fdClients.find(fd.fd);
assert(fdClient != fdClients.end());
if (fd.revents & POLLIN) {
char buf[16384];
auto n = read(fd.fd, buf, sizeof(buf));
if (n > 0) {
fdClient->second.second.append(buf, n);
/* Split the input into lines. */
while (true) {
auto p = fdClient->second.second.find('\n');
if (p == std::string::npos) break;
/* We got a full line. Send ack back
to the client. */
auto path = fdClient->second.second.substr(0, p);
fdClient->second.second = fdClient->second.second.substr(p + 1);
auto storePath = maybeParseStorePath(path);
if (storePath) {
debug("got new GC root '%s'", path);
auto hashPart = std::string(storePath->hashPart());
auto shared(state.shared.lock());
shared->tempRoots.insert(hashPart);
/* If this path is currently being
deleted, then we have to wait
until deletion is finished to
ensure that the client doesn't
start re-creating it before
we're done. FIXME: ideally we
would use a FD for this so we
don't block the poll loop. */
while (shared->pending == hashPart) {
debug("synchronising with deletion of path '%s'", path);
shared.wait(state.wakeup);
}
} else
printError("received garbage instead of a root from client");
// This could block, but meh.
try {
writeFull(fd.fd, "1", false);
} catch (SysError &) { }
}
} else if (n == 0)
fdClients.erase(fdClient);
} else
fdClients.erase(fdClient);
}
});

connections.lock()->insert({fdClient_, std::move(clientThread)});
}
}

debug("GC roots server shut down");
});

Finally stopServer([&]() {
Expand Down
12 changes: 12 additions & 0 deletions src/libutil/util.hh
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,18 @@ std::optional<typename T::mapped_type> get(const T & map, const typename T::key_
}


/* Remove and return the first item from a container. */
template <class T>
std::optional<typename T::value_type> remove_begin(T & c)
{
auto i = c.begin();
if (i == c.end()) return {};
auto v = std::move(*i);
c.erase(i);
return v;
}


template<typename T>
class Callback;

Expand Down

0 comments on commit 75b4e88

Please sign in to comment.