Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ipfs binary cache with upload support (sync version) #11

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 67 additions & 64 deletions src/libstore/ipfs-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,15 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
std::string cacheUri;
std::string daemonUri;

std::string ipfsHash;

enum struct AddrType { IPFS, IPNS } addrType;

std::string getIpfsPath() {
switch (addrType) {
case AddrType::IPFS: {
return "/ipfs/" + ipfsHash;
}
case AddrType::IPNS: {
return "/ipns/" + ipfsHash;
}
}
auto state(_state.lock());
return state->ipfsPath;
}
std::optional<string> optIpnsPath;

struct State
{
bool inProgressUpsert = false;
std::string ipfsPath;
};
Sync<State> _state;

Expand All @@ -45,19 +36,17 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
: BinaryCacheStore(params)
, cacheUri(_cacheUri)
{
auto state(_state.lock());

if (cacheUri.back() == '/')
cacheUri.pop_back();

if (hasPrefix(cacheUri, "ipfs://")) {
ipfsHash = std::string(cacheUri, 7);
addrType = AddrType::IPFS;
}
else if (hasPrefix(cacheUri, "ipns://")) {
ipfsHash = std::string(cacheUri, 7);
addrType = AddrType::IPNS;
}
if (hasPrefix(cacheUri, "ipfs://"))
state->ipfsPath = "/ipfs/" + std::string(cacheUri, 7);
else if (hasPrefix(cacheUri, "ipns://"))
optIpnsPath = "/ipns/" + std::string(cacheUri, 7);
else
throw Error("unknown IPFS URI '%s'", cacheUri);
throw Error("unknown IPNS URI '%s'", cacheUri);

std::string ipfsAPIHost(get(params, "host").value_or("127.0.0.1"));
std::string ipfsAPIPort(get(params, "port").value_or("5001"));
Expand All @@ -72,9 +61,20 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
if (versionInfo.find("Version") == versionInfo.end())
throw Error("daemon for IPFS is not running properly");

// root should already exist
if (!fileExists("") && addrType == AddrType::IPFS)
throw Error("path '%s' is not found", getIpfsPath());
// Resolve the IPNS name to an IPFS object
if (optIpnsPath) {
auto ipnsPath = *optIpnsPath;
debug("Resolving IPFS object of '%s', this could take a while.", ipnsPath);
auto uri = daemonUri + "/api/v0/name/resolve?offline=true&arg=" + getFileTransfer()->urlEncode(ipnsPath);
FileTransferRequest request(uri);
request.post = true;
request.tries = 1;
auto res = getFileTransfer()->download(request);
auto json = nlohmann::json::parse(*res.data);
if (json.find("Path") == json.end())
throw Error("daemon for IPFS is not running properly");
state->ipfsPath = json["Path"];
}
}

std::string getUri() override
Expand All @@ -85,17 +85,16 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
void init() override
{
std::string cacheInfoFile = "nix-cache-info";
if (!fileExists(cacheInfoFile)) {
if (!fileExists(cacheInfoFile))
upsertFile(cacheInfoFile, "StoreDir: " + storeDir + "\n", "text/x-nix-cache-info");
}
BinaryCacheStore::init();
}

protected:

bool fileExists(const std::string & path) override
{
auto uri = daemonUri + "/api/v0/object/stat?offline=true&arg=" + getFileTransfer()->urlEncode(getIpfsPath());
auto uri = daemonUri + "/api/v0/object/stat?arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path);

FileTransferRequest request(uri);
request.post = true;
Expand All @@ -112,52 +111,56 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
}
}

void upsertFile(const std::string & path, const std::string & data, const std::string & mimeType) override
// IPNS publish can be slow, we try to do it rarely.
void sync() override
{
if (addrType == AddrType::IPFS)
throw Error("%s is immutable, cannot modify", getIpfsPath());
if (!optIpnsPath)
return;
auto ipnsPath = *optIpnsPath;

// TODO: use callbacks

auto req1 = FileTransferRequest(daemonUri + "/api/v0/add");
req1.data = std::make_shared<string>(data);
req1.post = true;
req1.tries = 1;
try {
auto res1 = getFileTransfer()->upload(req1);
auto json1 = nlohmann::json::parse(*res1.data);
auto state(_state.lock());

auto addedPath = "/ipfs/" + (std::string) json1["Hash"];
debug("Publishing '%s' to '%s', this could take a while.", state->ipfsPath, ipnsPath);

auto state(_state.lock());
auto uri = daemonUri + "/api/v0/name/publish?offline=true&arg=" + getFileTransfer()->urlEncode(state->ipfsPath);
uri += "&key=" + std::string(ipnsPath, 6);

if (state->inProgressUpsert)
throw Error("a modification to the IPNS is already in progress");

state->inProgressUpsert = true;
auto req = FileTransferRequest(uri);
req.post = true;
req.tries = 1;
getFileTransfer()->download(req);
}

auto uri1 = daemonUri + "/api/v0/object/patch/add-link?offline=true&create=true";
uri1 += "&arg=" + getFileTransfer()->urlEncode(getIpfsPath());
uri1 += "&arg=" + getFileTransfer()->urlEncode(path);
uri1 += "&arg=" + getFileTransfer()->urlEncode(addedPath);
void addLink(std::string name, std::string ipfsObject)
{
auto state(_state.lock());

auto req2 = FileTransferRequest(uri1);
req2.post = true;
req2.tries = 1;
auto res2 = getFileTransfer()->download(req2);
auto json2 = nlohmann::json::parse(*res2.data);
auto uri = daemonUri + "/api/v0/object/patch/add-link?create=true";
uri += "&arg=" + getFileTransfer()->urlEncode(state->ipfsPath);
uri += "&arg=" + getFileTransfer()->urlEncode(name);
uri += "&arg=" + getFileTransfer()->urlEncode(ipfsObject);

auto newRoot = json2["Hash"];
auto req = FileTransferRequest(uri);
req.post = true;
req.tries = 1;
auto res = getFileTransfer()->download(req);
auto json = nlohmann::json::parse(*res.data);

auto uri2 = daemonUri + "/api/v0/name/publish?offline=true&arg=" + getFileTransfer()->urlEncode(newRoot);
uri2 += "&key=" + std::string(getIpfsPath(), 6);
state->ipfsPath = "/ipfs/" + (std::string) json["Hash"];
}

auto req3 = FileTransferRequest(uri2);
req3.post = true;
req3.tries = 1;
getFileTransfer()->download(req3);
void upsertFile(const std::string & path, const std::string & data, const std::string & mimeType) override
{
// TODO: use callbacks

state->inProgressUpsert = false;
auto req = FileTransferRequest(daemonUri + "/api/v0/add");
req.data = std::make_shared<string>(data);
req.post = true;
req.tries = 1;
try {
auto res = getFileTransfer()->upload(req);
auto json = nlohmann::json::parse(*res.data);
addLink(path, "/ipfs/" + (std::string) json["Hash"]);
} catch (FileTransferError & e) {
throw UploadToIPFS("while uploading to IPFS binary cache at '%s': %s", cacheUri, e.msg());
}
Expand All @@ -166,7 +169,7 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
void getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback) noexcept override
{
auto uri = daemonUri + "/api/v0/cat?offline=true&arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path);
auto uri = daemonUri + "/api/v0/cat?arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path);

FileTransferRequest request(uri);
request.post = true;
Expand Down
2 changes: 2 additions & 0 deletions src/libstore/store-api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,8 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const StorePathSet & st
nrDone++;
showProgress();
});

dstStore->sync();
}


Expand Down
4 changes: 4 additions & 0 deletions src/libstore/store-api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ public:
virtual void createUser(const std::string & userName, uid_t userId)
{ }

/* Sync writes to commits written data, usually a no-op. */
virtual void sync()
{ };

protected:

Stats stats;
Expand Down
5 changes: 5 additions & 0 deletions src/nix-store/nix-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ static void opAdd(Strings opFlags, Strings opArgs)

for (auto & i : opArgs)
cout << fmt("%s\n", store->printStorePath(store->addToStore(std::string(baseNameOf(i)), i)));

store->sync();
}


Expand All @@ -188,6 +190,8 @@ static void opAddFixed(Strings opFlags, Strings opArgs)

for (auto & i : opArgs)
cout << fmt("%s\n", store->printStorePath(store->addToStore(std::string(baseNameOf(i)), i, recursive, hashAlgo)));

store->sync();
}


Expand Down Expand Up @@ -952,6 +956,7 @@ static void opServe(Strings opFlags, Strings opArgs)
SizedSource sizedSource(in, info.narSize);

store->addToStore(info, sizedSource, NoRepair, NoCheckSigs);
store->sync();

// consume all the data that has been sent before continuing.
sizedSource.drainAll();
Expand Down
1 change: 1 addition & 0 deletions src/nix/add-to-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct CmdAddToStore : MixDryRun, StoreCommand
if (!dryRun) {
auto source = StringSource { *sink.s };
store->addToStore(info, source);
store->sync();
}

logger->stdout("%s", store->printStorePath(info.path));
Expand Down
2 changes: 2 additions & 0 deletions src/nix/make-content-addressable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ struct CmdMakeContentAddressable : StorePathsCommand, MixJSON

remappings.insert_or_assign(std::move(path), std::move(info.path));
}

store->sync();
}
};

Expand Down