diff --git a/src/libstore/gc.cc b/src/libstore/gc.cc index ff66c3938ae..bb76ee08456 100644 --- a/src/libstore/gc.cc +++ b/src/libstore/gc.cc @@ -636,6 +636,7 @@ void LocalStore::removeUnusedLinks(const GCState & state) } + void LocalStore::collectGarbage(const GCOptions & options, GCResults & results) { GCState state(options, results); @@ -674,83 +675,75 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results) shutdownPipe.create(); std::thread serverThread([&]() { - std::map, std::string>> fdClients; - bool quit = false; + Sync> connections; + std::atomic_bool quit = false; + + Finally cleanup([&]() { + debug("GC roots server shutting down"); + while (true) { + auto item = remove_begin(*connections.lock()); + if (!item) break; + auto & [fd, thread] = *item; + shutdown(fd, SHUT_RDWR); + thread.join(); + } + }); - while (!quit) { + while (true) { std::vector fds; fds.push_back({.fd = shutdownPipe.readSide.get(), .events = POLLIN}); fds.push_back({.fd = fdServer.get(), .events = POLLIN}); - for (auto & i : fdClients) - fds.push_back({.fd = i.first, .events = POLLIN}); auto count = poll(fds.data(), fds.size(), -1); assert(count != -1); - for (auto & fd : fds) { - if (!fd.revents) continue; - if (fd.fd == shutdownPipe.readSide.get()) - /* Parent is asking us to quit. */ - quit = true; - else if (fd.fd == fdServer.get()) { - /* Accept a new connection. */ - assert(fd.revents & POLLIN); - auto fdClient = std::make_unique(accept(fdServer.get(), nullptr, nullptr)); - if (*fdClient) { - auto fd = fdClient->get(); - fdClients.insert({fd, std::make_pair(std::move(fdClient), "")}); + if (fds[0].revents) + /* Parent is asking us to quit. */ + break; + + if (fds[1].revents) { + /* Accept a new connection. */ + assert(fds[1].revents & POLLIN); + AutoCloseFD fdClient = accept(fdServer.get(), nullptr, nullptr); + if (!fdClient) continue; + + /* Process the connection in a separate thread. */ + auto fdClient_ = fdClient.get(); + std::thread clientThread([&, fdClient = std::move(fdClient)]() { + Finally cleanup([&]() { + connections.lock()->erase(fdClient.get()); + }); + + while (true) { + try { + auto path = readLine(fdClient.get()); + auto storePath = maybeParseStorePath(path); + if (storePath) { + debug("got new GC root '%s'", path); + auto hashPart = std::string(storePath->hashPart()); + auto shared(state.shared.lock()); + shared->tempRoots.insert(hashPart); + /* If this path is currently being + deleted, then we have to wait until + deletion is finished to ensure that + the client doesn't start + re-creating it before we're + done. FIXME: ideally we would use a + FD for this so we don't block the + poll loop. */ + while (shared->pending == hashPart) { + debug("synchronising with deletion of path '%s'", path); + shared.wait(state.wakeup); + } + } else + printError("received garbage instead of a root from client"); + writeFull(fdClient.get(), "1", false); + } catch (Error &) { break; } } - } - else { - /* Receive data from a client. */ - auto fdClient = fdClients.find(fd.fd); - assert(fdClient != fdClients.end()); - if (fd.revents & POLLIN) { - char buf[16384]; - auto n = read(fd.fd, buf, sizeof(buf)); - if (n > 0) { - fdClient->second.second.append(buf, n); - /* Split the input into lines. */ - while (true) { - auto p = fdClient->second.second.find('\n'); - if (p == std::string::npos) break; - /* We got a full line. Send ack back - to the client. */ - auto path = fdClient->second.second.substr(0, p); - fdClient->second.second = fdClient->second.second.substr(p + 1); - auto storePath = maybeParseStorePath(path); - if (storePath) { - debug("got new GC root '%s'", path); - auto hashPart = std::string(storePath->hashPart()); - auto shared(state.shared.lock()); - shared->tempRoots.insert(hashPart); - /* If this path is currently being - deleted, then we have to wait - until deletion is finished to - ensure that the client doesn't - start re-creating it before - we're done. FIXME: ideally we - would use a FD for this so we - don't block the poll loop. */ - while (shared->pending == hashPart) { - debug("synchronising with deletion of path '%s'", path); - shared.wait(state.wakeup); - } - } else - printError("received garbage instead of a root from client"); - // This could block, but meh. - try { - writeFull(fd.fd, "1", false); - } catch (SysError &) { } - } - } else if (n == 0) - fdClients.erase(fdClient); - } else - fdClients.erase(fdClient); - } + }); + + connections.lock()->insert({fdClient_, std::move(clientThread)}); } } - - debug("GC roots server shut down"); }); Finally stopServer([&]() { diff --git a/src/libutil/util.hh b/src/libutil/util.hh index 3cb84c1256b..59a2a9fa7fd 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -511,6 +511,18 @@ std::optional get(const T & map, const typename T::key_ } +/* Remove and return the first item from a container. */ +template +std::optional remove_begin(T & c) +{ + auto i = c.begin(); + if (i == c.end()) return {}; + auto v = std::move(*i); + c.erase(i); + return v; +} + + template class Callback;