Skip to content

Commit

Permalink
Merge pull request #11 from obsidiansystems/ipfs-binary-cache-with-up…
Browse files Browse the repository at this point in the history
…load-sync

Update ipfs binary cache with upload support (sync version)
  • Loading branch information
Ericson2314 authored Jun 11, 2020
2 parents 904ab3c + 25c5ced commit e899575
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 64 deletions.
131 changes: 67 additions & 64 deletions src/libstore/ipfs-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,15 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
std::string cacheUri;
std::string daemonUri;

std::string ipfsHash;

enum struct AddrType { IPFS, IPNS } addrType;

std::string getIpfsPath() {
switch (addrType) {
case AddrType::IPFS: {
return "/ipfs/" + ipfsHash;
}
case AddrType::IPNS: {
return "/ipns/" + ipfsHash;
}
}
auto state(_state.lock());
return state->ipfsPath;
}
std::optional<string> optIpnsPath;

struct State
{
bool inProgressUpsert = false;
std::string ipfsPath;
};
Sync<State> _state;

Expand All @@ -45,19 +36,17 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
: BinaryCacheStore(params)
, cacheUri(_cacheUri)
{
auto state(_state.lock());

if (cacheUri.back() == '/')
cacheUri.pop_back();

if (hasPrefix(cacheUri, "ipfs://")) {
ipfsHash = std::string(cacheUri, 7);
addrType = AddrType::IPFS;
}
else if (hasPrefix(cacheUri, "ipns://")) {
ipfsHash = std::string(cacheUri, 7);
addrType = AddrType::IPNS;
}
if (hasPrefix(cacheUri, "ipfs://"))
state->ipfsPath = "/ipfs/" + std::string(cacheUri, 7);
else if (hasPrefix(cacheUri, "ipns://"))
optIpnsPath = "/ipns/" + std::string(cacheUri, 7);
else
throw Error("unknown IPFS URI '%s'", cacheUri);
throw Error("unknown IPNS URI '%s'", cacheUri);

std::string ipfsAPIHost(get(params, "host").value_or("127.0.0.1"));
std::string ipfsAPIPort(get(params, "port").value_or("5001"));
Expand All @@ -72,9 +61,20 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
if (versionInfo.find("Version") == versionInfo.end())
throw Error("daemon for IPFS is not running properly");

// root should already exist
if (!fileExists("") && addrType == AddrType::IPFS)
throw Error("path '%s' is not found", getIpfsPath());
// Resolve the IPNS name to an IPFS object
if (optIpnsPath) {
auto ipnsPath = *optIpnsPath;
debug("Resolving IPFS object of '%s', this could take a while.", ipnsPath);
auto uri = daemonUri + "/api/v0/name/resolve?offline=true&arg=" + getFileTransfer()->urlEncode(ipnsPath);
FileTransferRequest request(uri);
request.post = true;
request.tries = 1;
auto res = getFileTransfer()->download(request);
auto json = nlohmann::json::parse(*res.data);
if (json.find("Path") == json.end())
throw Error("daemon for IPFS is not running properly");
state->ipfsPath = json["Path"];
}
}

std::string getUri() override
Expand All @@ -85,17 +85,16 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
void init() override
{
std::string cacheInfoFile = "nix-cache-info";
if (!fileExists(cacheInfoFile)) {
if (!fileExists(cacheInfoFile))
upsertFile(cacheInfoFile, "StoreDir: " + storeDir + "\n", "text/x-nix-cache-info");
}
BinaryCacheStore::init();
}

protected:

bool fileExists(const std::string & path) override
{
auto uri = daemonUri + "/api/v0/object/stat?offline=true&arg=" + getFileTransfer()->urlEncode(getIpfsPath());
auto uri = daemonUri + "/api/v0/object/stat?arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path);

FileTransferRequest request(uri);
request.post = true;
Expand All @@ -112,52 +111,56 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
}
}

void upsertFile(const std::string & path, const std::string & data, const std::string & mimeType) override
// IPNS publish can be slow, we try to do it rarely.
void sync() override
{
if (addrType == AddrType::IPFS)
throw Error("%s is immutable, cannot modify", getIpfsPath());
if (!optIpnsPath)
return;
auto ipnsPath = *optIpnsPath;

// TODO: use callbacks

auto req1 = FileTransferRequest(daemonUri + "/api/v0/add");
req1.data = std::make_shared<string>(data);
req1.post = true;
req1.tries = 1;
try {
auto res1 = getFileTransfer()->upload(req1);
auto json1 = nlohmann::json::parse(*res1.data);
auto state(_state.lock());

auto addedPath = "/ipfs/" + (std::string) json1["Hash"];
debug("Publishing '%s' to '%s', this could take a while.", state->ipfsPath, ipnsPath);

auto state(_state.lock());
auto uri = daemonUri + "/api/v0/name/publish?offline=true&arg=" + getFileTransfer()->urlEncode(state->ipfsPath);
uri += "&key=" + std::string(ipnsPath, 6);

if (state->inProgressUpsert)
throw Error("a modification to the IPNS is already in progress");

state->inProgressUpsert = true;
auto req = FileTransferRequest(uri);
req.post = true;
req.tries = 1;
getFileTransfer()->download(req);
}

auto uri1 = daemonUri + "/api/v0/object/patch/add-link?offline=true&create=true";
uri1 += "&arg=" + getFileTransfer()->urlEncode(getIpfsPath());
uri1 += "&arg=" + getFileTransfer()->urlEncode(path);
uri1 += "&arg=" + getFileTransfer()->urlEncode(addedPath);
void addLink(std::string name, std::string ipfsObject)
{
auto state(_state.lock());

auto req2 = FileTransferRequest(uri1);
req2.post = true;
req2.tries = 1;
auto res2 = getFileTransfer()->download(req2);
auto json2 = nlohmann::json::parse(*res2.data);
auto uri = daemonUri + "/api/v0/object/patch/add-link?create=true";
uri += "&arg=" + getFileTransfer()->urlEncode(state->ipfsPath);
uri += "&arg=" + getFileTransfer()->urlEncode(name);
uri += "&arg=" + getFileTransfer()->urlEncode(ipfsObject);

auto newRoot = json2["Hash"];
auto req = FileTransferRequest(uri);
req.post = true;
req.tries = 1;
auto res = getFileTransfer()->download(req);
auto json = nlohmann::json::parse(*res.data);

auto uri2 = daemonUri + "/api/v0/name/publish?offline=true&arg=" + getFileTransfer()->urlEncode(newRoot);
uri2 += "&key=" + std::string(getIpfsPath(), 6);
state->ipfsPath = "/ipfs/" + (std::string) json["Hash"];
}

auto req3 = FileTransferRequest(uri2);
req3.post = true;
req3.tries = 1;
getFileTransfer()->download(req3);
void upsertFile(const std::string & path, const std::string & data, const std::string & mimeType) override
{
// TODO: use callbacks

state->inProgressUpsert = false;
auto req = FileTransferRequest(daemonUri + "/api/v0/add");
req.data = std::make_shared<string>(data);
req.post = true;
req.tries = 1;
try {
auto res = getFileTransfer()->upload(req);
auto json = nlohmann::json::parse(*res.data);
addLink(path, "/ipfs/" + (std::string) json["Hash"]);
} catch (FileTransferError & e) {
throw UploadToIPFS("while uploading to IPFS binary cache at '%s': %s", cacheUri, e.msg());
}
Expand All @@ -166,7 +169,7 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
void getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback) noexcept override
{
auto uri = daemonUri + "/api/v0/cat?offline=true&arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path);
auto uri = daemonUri + "/api/v0/cat?arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path);

FileTransferRequest request(uri);
request.post = true;
Expand Down
2 changes: 2 additions & 0 deletions src/libstore/store-api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,8 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const StorePathSet & st
nrDone++;
showProgress();
});

dstStore->sync();
}


Expand Down
4 changes: 4 additions & 0 deletions src/libstore/store-api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ public:
virtual void createUser(const std::string & userName, uid_t userId)
{ }

/* Sync writes to commits written data, usually a no-op. */
virtual void sync()
{ };

protected:

Stats stats;
Expand Down
5 changes: 5 additions & 0 deletions src/nix-store/nix-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ static void opAdd(Strings opFlags, Strings opArgs)

for (auto & i : opArgs)
cout << fmt("%s\n", store->printStorePath(store->addToStore(std::string(baseNameOf(i)), i)));

store->sync();
}


Expand All @@ -188,6 +190,8 @@ static void opAddFixed(Strings opFlags, Strings opArgs)

for (auto & i : opArgs)
cout << fmt("%s\n", store->printStorePath(store->addToStore(std::string(baseNameOf(i)), i, recursive, hashAlgo)));

store->sync();
}


Expand Down Expand Up @@ -952,6 +956,7 @@ static void opServe(Strings opFlags, Strings opArgs)
SizedSource sizedSource(in, info.narSize);

store->addToStore(info, sizedSource, NoRepair, NoCheckSigs);
store->sync();

// consume all the data that has been sent before continuing.
sizedSource.drainAll();
Expand Down
1 change: 1 addition & 0 deletions src/nix/add-to-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct CmdAddToStore : MixDryRun, StoreCommand
if (!dryRun) {
auto source = StringSource { *sink.s };
store->addToStore(info, source);
store->sync();
}

logger->stdout("%s", store->printStorePath(info.path));
Expand Down
2 changes: 2 additions & 0 deletions src/nix/make-content-addressable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ struct CmdMakeContentAddressable : StorePathsCommand, MixJSON

remappings.insert_or_assign(std::move(path), std::move(info.path));
}

store->sync();
}
};

Expand Down

0 comments on commit e899575

Please sign in to comment.