Skip to content

Commit

Permalink
Merge pull request NixOS#6612 from NixOS/parallel-nix-copy
Browse files Browse the repository at this point in the history
Make nix copy parallel again
  • Loading branch information
edolstra authored and Minion3665 committed Feb 23, 2023
2 parents 47e1ea3 + 1649136 commit 84416a2
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 93 deletions.
17 changes: 17 additions & 0 deletions src/libstore/remote-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,23 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
}


void RemoteStore::addMultipleToStore(
PathsSource & pathsToCopy,
Activity & act,
RepairFlag repair,
CheckSigsFlag checkSigs)
{
auto source = sinkToSource([&](Sink & sink) {
sink << pathsToCopy.size();
for (auto & [pathInfo, pathSource] : pathsToCopy) {
pathInfo.write(sink, *this, 16);
pathSource->drainInto(sink);
}
});

addMultipleToStore(*source, repair, checkSigs);
}

void RemoteStore::addMultipleToStore(
Source & source,
RepairFlag repair,
Expand Down
6 changes: 6 additions & 0 deletions src/libstore/remote-store.hh
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public:
RepairFlag repair,
CheckSigsFlag checkSigs) override;

void addMultipleToStore(
PathsSource & pathsToCopy,
Activity & act,
RepairFlag repair,
CheckSigsFlag checkSigs) override;

StorePath addTextToStore(
std::string_view name,
std::string_view s,
Expand Down
212 changes: 119 additions & 93 deletions src/libstore/store-api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,84 @@ StorePath Store::addToStore(
return addToStoreFromDump(*source, name, method, hashAlgo, repair, references);
}

void Store::addMultipleToStore(
PathsSource & pathsToCopy,
Activity & act,
RepairFlag repair,
CheckSigsFlag checkSigs)
{
std::atomic<size_t> nrDone{0};
std::atomic<size_t> nrFailed{0};
std::atomic<uint64_t> bytesExpected{0};
std::atomic<uint64_t> nrRunning{0};

using PathWithInfo = std::pair<ValidPathInfo, std::unique_ptr<Source>>;

std::map<StorePath, PathWithInfo *> infosMap;
StorePathSet storePathsToAdd;
for (auto & thingToAdd : pathsToCopy) {
infosMap.insert_or_assign(thingToAdd.first.path, &thingToAdd);
storePathsToAdd.insert(thingToAdd.first.path);
}

auto showProgress = [&]() {
act.progress(nrDone, pathsToCopy.size(), nrRunning, nrFailed);
};

ThreadPool pool;

processGraph<StorePath>(pool,
storePathsToAdd,

[&](const StorePath & path) {

auto & [info, _] = *infosMap.at(path);

if (isValidPath(info.path)) {
nrDone++;
showProgress();
return StorePathSet();
}

bytesExpected += info.narSize;
act.setExpected(actCopyPath, bytesExpected);

return info.references;
},

[&](const StorePath & path) {
checkInterrupt();

auto & [info_, source_] = *infosMap.at(path);
auto info = info_;
info.ultimate = false;

/* Make sure that the Source object is destroyed when
we're done. In particular, a SinkToSource object must
be destroyed to ensure that the destructors on its
stack frame are run; this includes
LegacySSHStore::narFromPath()'s connection lock. */
auto source = std::move(source_);

if (!isValidPath(info.path)) {
MaintainCount<decltype(nrRunning)> mc(nrRunning);
showProgress();
try {
addToStore(info, *source, repair, checkSigs);
} catch (Error & e) {
nrFailed++;
if (!settings.keepGoing)
throw e;
printMsg(lvlError, "could not copy %s: %s", printStorePath(path), e.what());
showProgress();
return;
}
}

nrDone++;
showProgress();
});
}

void Store::addMultipleToStore(
Source & source,
Expand Down Expand Up @@ -992,113 +1070,61 @@ std::map<StorePath, StorePath> copyPaths(
for (auto & path : storePaths)
if (!valid.count(path)) missing.insert(path);

Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size()));

// In the general case, `addMultipleToStore` requires a sorted list of
// store paths to add, so sort them right now
auto sortedMissing = srcStore.topoSortPaths(missing);
std::reverse(sortedMissing.begin(), sortedMissing.end());

std::map<StorePath, StorePath> pathsMap;
for (auto & path : storePaths)
pathsMap.insert_or_assign(path, path);

Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size()));
Store::PathsSource pathsToCopy;

auto computeStorePathForDst = [&](const ValidPathInfo & currentPathInfo) -> StorePath {
auto storePathForSrc = currentPathInfo.path;
auto storePathForDst = storePathForSrc;
if (currentPathInfo.ca && currentPathInfo.references.empty()) {
storePathForDst = dstStore.makeFixedOutputPathFromCA(storePathForSrc.name(), *currentPathInfo.ca);
if (dstStore.storeDir == srcStore.storeDir)
assert(storePathForDst == storePathForSrc);
if (storePathForDst != storePathForSrc)
debug("replaced path '%s' to '%s' for substituter '%s'",
srcStore.printStorePath(storePathForSrc),
dstStore.printStorePath(storePathForDst),
dstStore.getUri());
}
return storePathForDst;
};

auto sorted = srcStore.topoSortPaths(missing);
std::reverse(sorted.begin(), sorted.end());
for (auto & missingPath : sortedMissing) {
auto info = srcStore.queryPathInfo(missingPath);

auto source = sinkToSource([&](Sink & sink) {
sink << sorted.size();
for (auto & storePath : sorted) {
auto storePathForDst = computeStorePathForDst(*info);
pathsMap.insert_or_assign(missingPath, storePathForDst);

ValidPathInfo infoForDst = *info;
infoForDst.path = storePathForDst;

auto source = sinkToSource([&](Sink & sink) {
// We can reasonably assume that the copy will happen whenever we
// read the path, so log something about that at that point
auto srcUri = srcStore.getUri();
auto dstUri = dstStore.getUri();
auto storePathS = srcStore.printStorePath(storePath);
auto storePathS = srcStore.printStorePath(missingPath);
Activity act(*logger, lvlInfo, actCopyPath,
makeCopyPathMessage(srcUri, dstUri, storePathS),
{storePathS, srcUri, dstUri});
PushActivity pact(act.id);

auto info = srcStore.queryPathInfo(storePath);
info->write(sink, srcStore, 16);
srcStore.narFromPath(storePath, sink);
}
});

dstStore.addMultipleToStore(*source, repair, checkSigs);

#if 0
std::atomic<size_t> nrDone{0};
std::atomic<size_t> nrFailed{0};
std::atomic<uint64_t> bytesExpected{0};
std::atomic<uint64_t> nrRunning{0};

auto showProgress = [&]() {
act.progress(nrDone, missing.size(), nrRunning, nrFailed);
};

ThreadPool pool;

processGraph<StorePath>(pool,
StorePathSet(missing.begin(), missing.end()),

[&](const StorePath & storePath) {
auto info = srcStore.queryPathInfo(storePath);
auto storePathForDst = storePath;
if (info->ca && info->references.empty()) {
storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca);
if (dstStore.storeDir == srcStore.storeDir)
assert(storePathForDst == storePath);
if (storePathForDst != storePath)
debug("replaced path '%s' to '%s' for substituter '%s'",
srcStore.printStorePath(storePath),
dstStore.printStorePath(storePathForDst),
dstStore.getUri());
}
pathsMap.insert_or_assign(storePath, storePathForDst);

if (dstStore.isValidPath(storePath)) {
nrDone++;
showProgress();
return StorePathSet();
}

bytesExpected += info->narSize;
act.setExpected(actCopyPath, bytesExpected);

return info->references;
},

[&](const StorePath & storePath) {
checkInterrupt();

auto info = srcStore.queryPathInfo(storePath);

auto storePathForDst = storePath;
if (info->ca && info->references.empty()) {
storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca);
if (dstStore.storeDir == srcStore.storeDir)
assert(storePathForDst == storePath);
if (storePathForDst != storePath)
debug("replaced path '%s' to '%s' for substituter '%s'",
srcStore.printStorePath(storePath),
dstStore.printStorePath(storePathForDst),
dstStore.getUri());
}
pathsMap.insert_or_assign(storePath, storePathForDst);

if (!dstStore.isValidPath(storePathForDst)) {
MaintainCount<decltype(nrRunning)> mc(nrRunning);
showProgress();
try {
copyStorePath(srcStore, dstStore, storePath, repair, checkSigs);
} catch (Error &e) {
nrFailed++;
if (!settings.keepGoing)
throw e;
printMsg(lvlError, "could not copy %s: %s", dstStore.printStorePath(storePath), e.what());
showProgress();
return;
}
}

nrDone++;
showProgress();
srcStore.narFromPath(missingPath, sink);
});
#endif
pathsToCopy.push_back(std::pair{infoForDst, std::move(source)});
}

dstStore.addMultipleToStore(pathsToCopy, act, repair, checkSigs);

return pathsMap;
}
Expand Down
11 changes: 11 additions & 0 deletions src/libstore/store-api.hh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "nar-info.hh"
#include "realisation.hh"
#include "path.hh"
#include "derived-path.hh"
Expand Down Expand Up @@ -359,12 +360,22 @@ public:
virtual void addToStore(const ValidPathInfo & info, Source & narSource,
RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs) = 0;

// A list of paths infos along with a source providing the content of the
// associated store path
using PathsSource = std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>>;

/* Import multiple paths into the store. */
virtual void addMultipleToStore(
Source & source,
RepairFlag repair = NoRepair,
CheckSigsFlag checkSigs = CheckSigs);

virtual void addMultipleToStore(
PathsSource & pathsToCopy,
Activity & act,
RepairFlag repair = NoRepair,
CheckSigsFlag checkSigs = CheckSigs);

/* Copy the contents of a path to the store and register the
validity the resulting path. The resulting path is returned.
The function object `filter' can be used to exclude files (see
Expand Down

0 comments on commit 84416a2

Please sign in to comment.