Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ipfs binary cache with upload support #10

Merged
merged 21 commits into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
aa2b8f0
Support POST in file transfer
matthewbauer Jun 9, 2020
f7f27a2
Support data with POST in file transfer
matthewbauer Jun 9, 2020
aa0e736
Consolidate ipfs-binary-cache-store.cc to one file
matthewbauer Jun 9, 2020
adec10c
Use /api/v0/version to check if ipfs daemon is running
matthewbauer Jun 9, 2020
e39ce69
Throw error if /ipfs/ is used but path doesn’t exist
matthewbauer Jun 9, 2020
f33f8e6
Support uploading with /ipns/ in ipfs-binary-cache-store
matthewbauer Jun 9, 2020
37393d1
Create file if nix-cache-info doesn’t exist
matthewbauer Jun 9, 2020
a9343ed
Use POST for IPFS getFile
matthewbauer Jun 10, 2020
ab712b4
Ignore 500 errors in ipfs binary cache
matthewbauer Jun 10, 2020
3530ab1
Use offline to speed up IPFS publish
matthewbauer Jun 10, 2020
c135e05
Keep track of in progress upsert
matthewbauer Jun 10, 2020
6be117f
Use addrType enum in IPFSBinaryCacheStore
matthewbauer Jun 10, 2020
b62782d
Cleanup
matthewbauer Jun 10, 2020
bc012e5
Add sync() to store API
matthewbauer Jun 10, 2020
168cb9e
Fixup fileExists method in IPFSBinaryCacheStore
matthewbauer Jun 10, 2020
816ff04
Store IPFS path in state
matthewbauer Jun 10, 2020
904ab3c
Merge remote-tracking branch 'obsidian/ipfs-binary-cache-develop' int…
Ericson2314 Jun 11, 2020
a09c57e
Merge remote-tracking branch 'obsidian/ipfs-binary-cache-with-upload'…
Ericson2314 Jun 11, 2020
03a90fc
ipnsPath -> optIpnsPath
Ericson2314 Jun 11, 2020
25c5ced
Avoid too many `optIpnsPath`
Ericson2314 Jun 11, 2020
e899575
Merge pull request #11 from obsidiansystems/ipfs-binary-cache-with-up…
Ericson2314 Jun 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/libstore/filetransfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,22 @@ struct curlFileTransfer : public FileTransfer

if (request.head)
curl_easy_setopt(req, CURLOPT_NOBODY, 1);
else if (request.post)
curl_easy_setopt(req, CURLOPT_POST, 1);

if (request.data) {
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
curl_easy_setopt(req, CURLOPT_READDATA, this);
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
if (request.post) {
// based off of https://curl.haxx.se/libcurl/c/postit2.html
curl_mime *form = curl_mime_init(req);
curl_mimepart *field = curl_mime_addpart(form);
curl_mime_data(field, request.data->data(), request.data->length());
curl_easy_setopt(req, CURLOPT_MIMEPOST, form);
} else {
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
curl_easy_setopt(req, CURLOPT_READDATA, this);
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
}
}

if (request.verifyTLS) {
Expand Down
1 change: 1 addition & 0 deletions src/libstore/filetransfer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct FileTransferRequest
std::string expectedETag;
bool verifyTLS = true;
bool head = false;
bool post = false;
size_t tries = fileTransferSettings.tries;
unsigned int baseRetryTimeMs = 250;
ActivityId parentAct;
Expand Down
197 changes: 122 additions & 75 deletions src/libstore/ipfs-binary-cache-store.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include <cstring>
#include <nlohmann/json.hpp>

#include "binary-cache-store.hh"
#include "filetransfer.hh"
#include "nar-info-disk-cache.hh"
#include "ipfs.hh"

namespace nix {

Expand All @@ -15,48 +15,66 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
private:

std::string cacheUri;
std::string daemonUri;

/* Host where a IPFS API can be reached (usually localhost) */
std::string ipfsAPIHost;
/* Port where a IPFS API can be reached (usually 5001) */
uint16_t ipfsAPIPort;
/* Whether to use a IPFS Gateway instead of the API */
bool useIpfsGateway;
/* Where to find a IPFS Gateway */
std::string ipfsGatewayURL;

std::string constructIPFSRequest(const std::string & path) {
std::string uri;
std::string ipfsPath = cacheUri + "/" + path;
if (useIpfsGateway == false) {
uri = ipfs::buildAPIURL(ipfsAPIHost, ipfsAPIPort) +
"/cat" +
ipfs::buildQuery({{"arg", ipfsPath}});
} else {
uri = ipfsGatewayURL + ipfsPath;
}
return uri;
std::string getIpfsPath() {
auto state(_state.lock());
return state->ipfsPath;
}
std::optional<string> optIpnsPath;

struct State
{
std::string ipfsPath;
};
Sync<State> _state;

public:

IPFSBinaryCacheStore(
const Params & params, const Path & _cacheUri)
: BinaryCacheStore(params)
, cacheUri(_cacheUri)
, ipfsAPIHost(get(params, "host").value_or("127.0.0.1"))
, ipfsAPIPort(std::stoi(get(params, "port").value_or("5001")))
, useIpfsGateway(get(params, "use_gateway").value_or("0") == "1")
, ipfsGatewayURL(get(params, "gateway").value_or("https://ipfs.io"))
{
auto state(_state.lock());

if (cacheUri.back() == '/')
cacheUri.pop_back();
/*
* A cache is still useful since the IPFS API or
* gateway may have a higher latency when not running on
* localhost
*/
diskCache = getNarInfoDiskCache();

if (hasPrefix(cacheUri, "ipfs://"))
state->ipfsPath = "/ipfs/" + std::string(cacheUri, 7);
else if (hasPrefix(cacheUri, "ipns://"))
optIpnsPath = "/ipns/" + std::string(cacheUri, 7);
else
throw Error("unknown IPNS URI '%s'", cacheUri);

std::string ipfsAPIHost(get(params, "host").value_or("127.0.0.1"));
std::string ipfsAPIPort(get(params, "port").value_or("5001"));
daemonUri = "http://" + ipfsAPIHost + ":" + ipfsAPIPort;

// Check the IPFS daemon is running
FileTransferRequest request(daemonUri + "/api/v0/version");
request.post = true;
request.tries = 1;
auto res = getFileTransfer()->download(request);
auto versionInfo = nlohmann::json::parse(*res.data);
if (versionInfo.find("Version") == versionInfo.end())
throw Error("daemon for IPFS is not running properly");

// Resolve the IPNS name to an IPFS object
if (optIpnsPath) {
auto ipnsPath = *optIpnsPath;
debug("Resolving IPFS object of '%s', this could take a while.", ipnsPath);
auto uri = daemonUri + "/api/v0/name/resolve?offline=true&arg=" + getFileTransfer()->urlEncode(ipnsPath);
FileTransferRequest request(uri);
request.post = true;
request.tries = 1;
auto res = getFileTransfer()->download(request);
auto json = nlohmann::json::parse(*res.data);
if (json.find("Path") == json.end())
throw Error("daemon for IPFS is not running properly");
state->ipfsPath = json["Path"];
}
}

std::string getUri() override
Expand All @@ -66,61 +84,96 @@ class IPFSBinaryCacheStore : public BinaryCacheStore

void init() override
{
if (auto cacheInfo = diskCache->cacheExists(getUri())) {
wantMassQuery.setDefault(cacheInfo->wantMassQuery ? "true" : "false");
priority.setDefault(fmt("%d", cacheInfo->priority));
} else {
try {
BinaryCacheStore::init();
} catch (UploadToIPFS &) {
throw Error(format("‘%s’ does not appear to be a binary cache") % cacheUri);
}
diskCache->createCache(cacheUri, storeDir, wantMassQuery, priority);
}
std::string cacheInfoFile = "nix-cache-info";
if (!fileExists(cacheInfoFile))
upsertFile(cacheInfoFile, "StoreDir: " + storeDir + "\n", "text/x-nix-cache-info");
BinaryCacheStore::init();
}

protected:

bool fileExists(const std::string & path) override
{
/*
* TODO: Try a local mount first, best to share code with
* LocalBinaryCacheStore
*/
auto uri = daemonUri + "/api/v0/object/stat?arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path);

/* TODO: perform ipfs ls instead instead of trying to fetch it */
auto uri = constructIPFSRequest(path);
FileTransferRequest request(uri);
request.post = true;
request.tries = 1;
try {
FileTransferRequest request(uri);
//request.showProgress = FileTransferRequest::no;
request.tries = 5;
if (useIpfsGateway)
request.head = true;
getFileTransfer()->download(request);
return true;
auto res = getFileTransfer()->download(request);
auto json = nlohmann::json::parse(*res.data);

return json.find("Hash") != json.end();
} catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound)
return false;
throw;
// probably should verify this is a not found error but
// ipfs gives us a 500
return false;
}
}

// IPNS publish can be slow, we try to do it rarely.
void sync() override
{
if (!optIpnsPath)
return;
auto ipnsPath = *optIpnsPath;

auto state(_state.lock());

debug("Publishing '%s' to '%s', this could take a while.", state->ipfsPath, ipnsPath);

auto uri = daemonUri + "/api/v0/name/publish?offline=true&arg=" + getFileTransfer()->urlEncode(state->ipfsPath);
uri += "&key=" + std::string(ipnsPath, 6);

auto req = FileTransferRequest(uri);
req.post = true;
req.tries = 1;
getFileTransfer()->download(req);
}

void addLink(std::string name, std::string ipfsObject)
{
auto state(_state.lock());

auto uri = daemonUri + "/api/v0/object/patch/add-link?create=true";
uri += "&arg=" + getFileTransfer()->urlEncode(state->ipfsPath);
uri += "&arg=" + getFileTransfer()->urlEncode(name);
uri += "&arg=" + getFileTransfer()->urlEncode(ipfsObject);

auto req = FileTransferRequest(uri);
req.post = true;
req.tries = 1;
auto res = getFileTransfer()->download(req);
auto json = nlohmann::json::parse(*res.data);

state->ipfsPath = "/ipfs/" + (std::string) json["Hash"];
}

void upsertFile(const std::string & path, const std::string & data, const std::string & mimeType) override
{
throw UploadToIPFS("uploading to an IPFS binary cache is not supported");
// TODO: use callbacks

auto req = FileTransferRequest(daemonUri + "/api/v0/add");
req.data = std::make_shared<string>(data);
req.post = true;
req.tries = 1;
try {
auto res = getFileTransfer()->upload(req);
auto json = nlohmann::json::parse(*res.data);
addLink(path, "/ipfs/" + (std::string) json["Hash"]);
} catch (FileTransferError & e) {
throw UploadToIPFS("while uploading to IPFS binary cache at '%s': %s", cacheUri, e.msg());
}
}

void getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback) noexcept override
{
/*
* TODO: Try local mount first, best to share code with
* LocalBinaryCacheStore
*/
auto uri = constructIPFSRequest(path);
auto uri = daemonUri + "/api/v0/cat?arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path);

FileTransferRequest request(uri);
//request.showProgress = FileTransferRequest::no;
request.tries = 8;
request.post = true;
request.tries = 1;

auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback));

Expand All @@ -129,9 +182,7 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
try {
(*callbackPtr)(result.get().data);
} catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
return (*callbackPtr)(std::shared_ptr<std::string>());
callbackPtr->rethrow();
return (*callbackPtr)(std::shared_ptr<std::string>());
} catch (...) {
callbackPtr->rethrow();
}
Expand All @@ -145,12 +196,8 @@ static RegisterStoreImplementation regStore([](
const std::string & uri, const Store::Params & params)
-> std::shared_ptr<Store>
{
/*
* TODO: maybe use ipfs:/ fs:/ipfs/
* https://github.com/ipfs/go-ipfs/issues/1678#issuecomment-157478515
*/
if (uri.substr(0, strlen("/ipfs/")) != "/ipfs/" &&
uri.substr(0, strlen("/ipns/")) != "/ipns/")
if (uri.substr(0, strlen("ipfs://")) != "ipfs://" &&
uri.substr(0, strlen("ipns://")) != "ipns://")
return 0;
auto store = std::make_shared<IPFSBinaryCacheStore>(params, uri);
store->init();
Expand Down
32 changes: 0 additions & 32 deletions src/libstore/ipfs.hh

This file was deleted.

2 changes: 2 additions & 0 deletions src/libstore/store-api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,8 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const StorePathSet & st
nrDone++;
showProgress();
});

dstStore->sync();
}


Expand Down
4 changes: 4 additions & 0 deletions src/libstore/store-api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ public:
virtual void createUser(const std::string & userName, uid_t userId)
{ }

/* Sync writes to commits written data, usually a no-op. */
virtual void sync()
{ };

protected:

Stats stats;
Expand Down
5 changes: 5 additions & 0 deletions src/nix-store/nix-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ static void opAdd(Strings opFlags, Strings opArgs)

for (auto & i : opArgs)
cout << fmt("%s\n", store->printStorePath(store->addToStore(std::string(baseNameOf(i)), i)));

store->sync();
}


Expand All @@ -188,6 +190,8 @@ static void opAddFixed(Strings opFlags, Strings opArgs)

for (auto & i : opArgs)
cout << fmt("%s\n", store->printStorePath(store->addToStore(std::string(baseNameOf(i)), i, recursive, hashAlgo)));

store->sync();
}


Expand Down Expand Up @@ -952,6 +956,7 @@ static void opServe(Strings opFlags, Strings opArgs)
SizedSource sizedSource(in, info.narSize);

store->addToStore(info, sizedSource, NoRepair, NoCheckSigs);
store->sync();

// consume all the data that has been sent before continuing.
sizedSource.drainAll();
Expand Down
1 change: 1 addition & 0 deletions src/nix/add-to-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct CmdAddToStore : MixDryRun, StoreCommand
if (!dryRun) {
auto source = StringSource { *sink.s };
store->addToStore(info, source);
store->sync();
}

logger->stdout("%s", store->printStorePath(info.path));
Expand Down
2 changes: 2 additions & 0 deletions src/nix/make-content-addressable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ struct CmdMakeContentAddressable : StorePathsCommand, MixJSON

remappings.insert_or_assign(std::move(path), std::move(info.path));
}

store->sync();
}
};

Expand Down