Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ipfs binary cache with upload support #10

Merged
merged 21 commits into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
aa2b8f0
Support POST in file transfer
matthewbauer Jun 9, 2020
f7f27a2
Support data with POST in file transfer
matthewbauer Jun 9, 2020
aa0e736
Consolidate ipfs-binary-cache-store.cc to one file
matthewbauer Jun 9, 2020
adec10c
Use /api/v0/version to check if ipfs daemon is running
matthewbauer Jun 9, 2020
e39ce69
Throw error if /ipfs/ is used but path doesn’t exist
matthewbauer Jun 9, 2020
f33f8e6
Support uploading with /ipns/ in ipfs-binary-cache-store
matthewbauer Jun 9, 2020
37393d1
Create file if nix-cache-info doesn’t exist
matthewbauer Jun 9, 2020
a9343ed
Use POST for IPFS getFile
matthewbauer Jun 10, 2020
ab712b4
Ignore 500 errors in ipfs binary cache
matthewbauer Jun 10, 2020
3530ab1
Use offline to speed up IPFS publish
matthewbauer Jun 10, 2020
c135e05
Keep track of in progress upsert
matthewbauer Jun 10, 2020
6be117f
Use addrType enum in IPFSBinaryCacheStore
matthewbauer Jun 10, 2020
b62782d
Cleanup
matthewbauer Jun 10, 2020
bc012e5
Add sync() to store API
matthewbauer Jun 10, 2020
168cb9e
Fixup fileExists method in IPFSBinaryCacheStore
matthewbauer Jun 10, 2020
816ff04
Store IPFS path in state
matthewbauer Jun 10, 2020
904ab3c
Merge remote-tracking branch 'obsidian/ipfs-binary-cache-develop' int…
Ericson2314 Jun 11, 2020
a09c57e
Merge remote-tracking branch 'obsidian/ipfs-binary-cache-with-upload'…
Ericson2314 Jun 11, 2020
03a90fc
ipnsPath -> optIpnsPath
Ericson2314 Jun 11, 2020
25c5ced
Avoid too many `optIpnsPath`
Ericson2314 Jun 11, 2020
e899575
Merge pull request #11 from obsidiansystems/ipfs-binary-cache-with-up…
Ericson2314 Jun 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/libstore/filetransfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,22 @@ struct curlFileTransfer : public FileTransfer

if (request.head)
curl_easy_setopt(req, CURLOPT_NOBODY, 1);
else if (request.post)
curl_easy_setopt(req, CURLOPT_POST, 1);

if (request.data) {
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
curl_easy_setopt(req, CURLOPT_READDATA, this);
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
if (request.post) {
// based off of https://curl.haxx.se/libcurl/c/postit2.html
curl_mime *form = curl_mime_init(req);
curl_mimepart *field = curl_mime_addpart(form);
curl_mime_data(field, request.data->data(), request.data->length());
curl_easy_setopt(req, CURLOPT_MIMEPOST, form);
} else {
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
curl_easy_setopt(req, CURLOPT_READDATA, this);
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
}
}

if (request.verifyTLS) {
Expand Down
1 change: 1 addition & 0 deletions src/libstore/filetransfer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct FileTransferRequest
std::string expectedETag;
bool verifyTLS = true;
bool head = false;
bool post = false;
size_t tries = fileTransferSettings.tries;
unsigned int baseRetryTimeMs = 250;
ActivityId parentAct;
Expand Down
177 changes: 102 additions & 75 deletions src/libstore/ipfs-binary-cache-store.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include <cstring>
#include <nlohmann/json.hpp>

#include "binary-cache-store.hh"
#include "filetransfer.hh"
#include "nar-info-disk-cache.hh"
#include "ipfs.hh"

namespace nix {

Expand All @@ -15,48 +15,49 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
private:

std::string cacheUri;
std::string daemonUri;

/* Host where a IPFS API can be reached (usually localhost) */
std::string ipfsAPIHost;
/* Port where a IPFS API can be reached (usually 5001) */
uint16_t ipfsAPIPort;
/* Whether to use a IPFS Gateway instead of the API */
bool useIpfsGateway;
/* Where to find a IPFS Gateway */
std::string ipfsGatewayURL;

std::string constructIPFSRequest(const std::string & path) {
std::string uri;
std::string ipfsPath = cacheUri + "/" + path;
if (useIpfsGateway == false) {
uri = ipfs::buildAPIURL(ipfsAPIHost, ipfsAPIPort) +
"/cat" +
ipfs::buildQuery({{"arg", ipfsPath}});
} else {
uri = ipfsGatewayURL + ipfsPath;
}
return uri;
}
std::string ipfsPath;
Ericson2314 marked this conversation as resolved.
Show resolved Hide resolved

struct State
{
bool inProgressUpsert = false;
};
Sync<State> _state;

public:

IPFSBinaryCacheStore(
const Params & params, const Path & _cacheUri)
: BinaryCacheStore(params)
, cacheUri(_cacheUri)
, ipfsAPIHost(get(params, "host").value_or("127.0.0.1"))
, ipfsAPIPort(std::stoi(get(params, "port").value_or("5001")))
, useIpfsGateway(get(params, "use_gateway").value_or("0") == "1")
, ipfsGatewayURL(get(params, "gateway").value_or("https://ipfs.io"))
{
if (cacheUri.back() == '/')
cacheUri.pop_back();
/*
* A cache is still useful since the IPFS API or
* gateway may have a higher latency when not running on
* localhost
*/
diskCache = getNarInfoDiskCache();

if (hasPrefix(cacheUri, "ipfs://"))
ipfsPath = "/ipfs/" + std::string(cacheUri, 7);
else if (hasPrefix(cacheUri, "ipns://"))
ipfsPath = "/ipns/" + std::string(cacheUri, 7);
else
throw Error("unknown IPFS URI '%s'", cacheUri);

std::string ipfsAPIHost(get(params, "host").value_or("127.0.0.1"));
std::string ipfsAPIPort(get(params, "port").value_or("5001"));
daemonUri = "http://" + ipfsAPIHost + ":" + ipfsAPIPort;

// Check the IPFS daemon is running
FileTransferRequest request(daemonUri + "/api/v0/version");
request.post = true;
request.tries = 1;
auto res = getFileTransfer()->download(request);
auto versionInfo = nlohmann::json::parse(*res.data);
if (versionInfo.find("Version") == versionInfo.end())
throw Error("daemon for IPFS is not running properly");

// root should already exist
if (!fileExists("") && hasPrefix(ipfsPath, "/ipfs/"))
throw Error("path '%s' is not found", ipfsPath);
}

std::string getUri() override
Expand All @@ -66,61 +67,93 @@ class IPFSBinaryCacheStore : public BinaryCacheStore

void init() override
{
if (auto cacheInfo = diskCache->cacheExists(getUri())) {
wantMassQuery.setDefault(cacheInfo->wantMassQuery ? "true" : "false");
priority.setDefault(fmt("%d", cacheInfo->priority));
} else {
try {
BinaryCacheStore::init();
} catch (UploadToIPFS &) {
throw Error(format("‘%s’ does not appear to be a binary cache") % cacheUri);
}
diskCache->createCache(cacheUri, storeDir, wantMassQuery, priority);
std::string cacheInfoFile = "nix-cache-info";
if (!fileExists(cacheInfoFile)) {
upsertFile(cacheInfoFile, "StoreDir: " + storeDir + "\n", "text/x-nix-cache-info");
}
BinaryCacheStore::init();
}

protected:

bool fileExists(const std::string & path) override
{
/*
* TODO: Try a local mount first, best to share code with
* LocalBinaryCacheStore
*/
auto uri = daemonUri + "/api/v0/object/stat?offline=true&arg=" + getFileTransfer()->urlEncode(ipfsPath + "/" + path);

/* TODO: perform ipfs ls instead instead of trying to fetch it */
auto uri = constructIPFSRequest(path);
FileTransferRequest request(uri);
request.post = true;
request.tries = 1;
try {
FileTransferRequest request(uri);
//request.showProgress = FileTransferRequest::no;
request.tries = 5;
if (useIpfsGateway)
request.head = true;
getFileTransfer()->download(request);
return true;
auto res = getFileTransfer()->download(request);
auto json = nlohmann::json::parse(*res.data);

return json.find("Hash") != json.end();
} catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound)
return false;
throw;
// probably should verify this is a not found error but
// ipfs gives us a 500
return false;
}
}

void upsertFile(const std::string & path, const std::string & data, const std::string & mimeType) override
{
throw UploadToIPFS("uploading to an IPFS binary cache is not supported");
if (hasPrefix(ipfsPath, "/ipfs/"))
throw Error("%s is immutable, cannot modify", ipfsPath);

// TODO: use callbacks

auto req1 = FileTransferRequest(daemonUri + "/api/v0/add");
req1.data = std::make_shared<string>(data);
req1.post = true;
req1.tries = 1;
try {
auto res1 = getFileTransfer()->upload(req1);
auto json1 = nlohmann::json::parse(*res1.data);

auto addedPath = "/ipfs/" + (std::string) json1["Hash"];

auto state(_state.lock());

if (state->inProgressUpsert)
throw Error("a modification to the IPNS is already in progress");

state->inProgressUpsert = true;

auto uri1 = daemonUri + "/api/v0/object/patch/add-link?offline=true&create=true";
uri1 += "&arg=" + getFileTransfer()->urlEncode(ipfsPath);
uri1 += "&arg=" + getFileTransfer()->urlEncode(path);
uri1 += "&arg=" + getFileTransfer()->urlEncode(addedPath);

auto req2 = FileTransferRequest(uri1);
req2.post = true;
req2.tries = 1;
auto res2 = getFileTransfer()->download(req2);
auto json2 = nlohmann::json::parse(*res2.data);

auto newRoot = json2["Hash"];

auto uri2 = daemonUri + "/api/v0/name/publish?offline=true&arg=" + getFileTransfer()->urlEncode(newRoot);
uri2 += "&key=" + std::string(ipfsPath, 6);

auto req3 = FileTransferRequest(uri2);
req3.post = true;
req3.tries = 1;
getFileTransfer()->download(req3);

state->inProgressUpsert = false;
} catch (FileTransferError & e) {
throw UploadToIPFS("while uploading to IPFS binary cache at '%s': %s", cacheUri, e.msg());
}
}

void getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback) noexcept override
{
/*
* TODO: Try local mount first, best to share code with
* LocalBinaryCacheStore
*/
auto uri = constructIPFSRequest(path);
auto uri = daemonUri + "/api/v0/cat?offline=true&arg=" + getFileTransfer()->urlEncode(ipfsPath + "/" + path);

FileTransferRequest request(uri);
//request.showProgress = FileTransferRequest::no;
request.tries = 8;
request.post = true;
request.tries = 1;

auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback));

Expand All @@ -129,9 +162,7 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
try {
(*callbackPtr)(result.get().data);
} catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
return (*callbackPtr)(std::shared_ptr<std::string>());
callbackPtr->rethrow();
return (*callbackPtr)(std::shared_ptr<std::string>());
} catch (...) {
callbackPtr->rethrow();
}
Expand All @@ -145,12 +176,8 @@ static RegisterStoreImplementation regStore([](
const std::string & uri, const Store::Params & params)
-> std::shared_ptr<Store>
{
/*
* TODO: maybe use ipfs:/ fs:/ipfs/
* https://github.com/ipfs/go-ipfs/issues/1678#issuecomment-157478515
*/
if (uri.substr(0, strlen("/ipfs/")) != "/ipfs/" &&
uri.substr(0, strlen("/ipns/")) != "/ipns/")
if (uri.substr(0, strlen("ipfs://")) != "ipfs://" &&
uri.substr(0, strlen("ipns://")) != "ipns://")
return 0;
auto store = std::make_shared<IPFSBinaryCacheStore>(params, uri);
store->init();
Expand Down
32 changes: 0 additions & 32 deletions src/libstore/ipfs.hh

This file was deleted.