Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http-transport legs #23

Merged
merged 12 commits into from
Oct 28, 2021
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/libp2p/go-libp2p-core v0.9.0
github.com/libp2p/go-libp2p-pubsub v0.5.4
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/multiformats/go-multiaddr v0.4.0
github.com/multiformats/go-multicodec v0.3.0
github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
Expand Down
69 changes: 69 additions & 0 deletions http/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package http_test

import (
"context"
"net"
nhttp "net/http"
"testing"
"time"

"github.com/filecoin-project/go-legs/http"
"github.com/filecoin-project/go-legs/test"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

func TestManualSync(t *testing.T) {
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcSys := test.MkLinkSystem(srcStore)
p, err := http.NewPublisher(context.Background(), srcStore, srcSys)
if err != nil {
t.Fatal(err)
}
nl, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
go func() {
_ = nhttp.Serve(nl, p.(nhttp.Handler))
}()
nlm, err := manet.FromNetAddr(nl.Addr())
if err != nil {
t.Fatal(err)
}
proto, _ := multiaddr.NewMultiaddr("/http")
nlm = multiaddr.Join(nlm, proto)

dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstSys := test.MkLinkSystem(dstStore)
s, err := http.NewHTTPSubscriber(context.Background(), nhttp.DefaultClient, nlm, &dstSys, "", nil)
if err != nil {
t.Fatal(err)
}

rootLnk, err := test.Store(srcStore, basicnode.NewString("hello world"))
if err := p.UpdateRoot(context.Background(), rootLnk.(cidlink.Link).Cid); err != nil {
t.Fatal(err)
}

cchan, cncl, err := s.Sync(context.Background(), peer.NewPeerRecord().PeerID, cid.Undef, nil)
if err != nil {
t.Fatal(err)
}

select {
case rc := <-cchan:
if !rc.Equals(rootLnk.(cidlink.Link).Cid) {
t.Fatalf("didn't get expected cid. expected %s, got %s", rootLnk, rc)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out")
}
cncl()
}
134 changes: 134 additions & 0 deletions http/multiaddr/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package multiaddr

import (
"bytes"
"fmt"
"net"
"net/url"

"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

// register an 'httpath' component:
var transcodePath = multiaddr.NewTranscoderFromFunctions(pathStB, pathBtS, pathVal)

func pathVal(b []byte) error {
if bytes.IndexByte(b, '/') >= 0 {
return fmt.Errorf("encoded path '%s' contains a slash", string(b))
}
return nil
}

func pathStB(s string) ([]byte, error) {
return []byte(s), nil
}

func pathBtS(b []byte) (string, error) {
return string(b), nil
}

func init() {
_ = multiaddr.AddProtocol(protoHTTPath)
}

var protoHTTPath = multiaddr.Protocol{
Name: "httpath",
Code: 0x300200,
VCode: multiaddr.CodeToVarint(0x300200),
Size: multiaddr.LengthPrefixedVarSize,
Transcoder: transcodePath,
}

// ToURL takes a multiaddr of the form:
// /dns/thing.com/http/urlescape<path/to/root>
// /ip/192.168.0.1/tcp/80/http
func ToURL(ma multiaddr.Multiaddr) (*url.URL, error) {
// host should be either the dns name or the IP
_, host, err := manet.DialArgs(ma)
if err != nil {
return nil, err
}
if ip := net.ParseIP(host); ip != nil {
if !ip.To4().Equal(ip) {
// raw v6 IPs need `[ip]` encapsulation.
host = fmt.Sprintf("[%s]", host)
}
}

protos := ma.Protocols()
pm := make(map[int]string, len(protos))
for _, p := range protos {
v, err := ma.ValueForProtocol(p.Code)
if err == nil {
pm[p.Code] = v
}
}

scheme := "http"
if _, ok := pm[multiaddr.P_HTTPS]; ok {
scheme = "https"
} // todo: ws/wss

path := ""
if pb, ok := pm[protoHTTPath.Code]; ok {
path, err = url.PathUnescape(pb)
if err != nil {
path = ""
}
}

out := url.URL{
Scheme: scheme,
Host: host,
Path: path,
}
return &out, nil
}

// ToMA takes a url and converts it into a multiaddr.
// converts scheme://host:port/path -> /ip/host/tcp/port/scheme/urlescape{path}
func ToMA(u *url.URL) (*multiaddr.Multiaddr, error) {
h := u.Hostname()
var addr *multiaddr.Multiaddr
if n := net.ParseIP(h); n != nil {
ipAddr, err := manet.FromIP(n)
if err != nil {
return nil, err
}
addr = &ipAddr
} else {
// domain name
ma, err := multiaddr.NewComponent(multiaddr.ProtocolWithCode(multiaddr.P_DNS).Name, h)
if err != nil {
return nil, err
}
mab := multiaddr.Cast(ma.Bytes())
addr = &mab
}
pv := u.Port()
if pv != "" {
port, err := multiaddr.NewComponent(multiaddr.ProtocolWithCode(multiaddr.P_TCP).Name, pv)
if err != nil {
return nil, err
}
wport := multiaddr.Join(*addr, port)
addr = &wport
}

http, err := multiaddr.NewComponent(u.Scheme, "")
if err != nil {
return nil, err
}

joint := multiaddr.Join(*addr, http)
if u.Path != "" {
httpath, err := multiaddr.NewComponent(protoHTTPath.Name, url.PathEscape(u.Path))
if err != nil {
return nil, err
}
joint = multiaddr.Join(joint, httpath)
}

return &joint, nil
}
34 changes: 34 additions & 0 deletions http/multiaddr/convert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package multiaddr

import (
"net/url"
"testing"
)

func TestRoundtrip(t *testing.T) {
samples := []string{
"http://www.google.com/path/to/rsrc",
"https://protocol.ai",
"http://192.168.0.1:8080/admin",
"https://[2a00:1450:400e:80d::200e]:443/",
"https://[2a00:1450:400e:80d::200e]/",
}

for _, s := range samples {
u, _ := url.Parse(s)
mu, err := ToMA(u)
if err != nil {
t.Fatal(err)
}
u2, err := ToURL(*mu)
if u2.Scheme != u.Scheme {
t.Fatalf("scheme didn't roundtrip. got %s expected %s", u2.Scheme, u.Scheme)
}
if u2.Host != u.Host {
t.Fatalf("host didn't roundtrip. got %s, expected %s", u2.Host, u.Host)
}
if u2.Path != u.Path {
t.Fatalf("path didn't roundtrip. got %s, expected %s", u2.Path, u.Path)
}
}
}
85 changes: 85 additions & 0 deletions http/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package http

import (
"context"
"encoding/json"
"errors"
"net/http"
"path"
"sync"

"github.com/filecoin-project/go-legs"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
)

type httpPublisher struct {
rl sync.RWMutex
root cid.Cid
lsys ipld.LinkSystem
}

var _ legs.LegPublisher = (*httpPublisher)(nil)
var _ http.Handler = (*httpPublisher)(nil)

// NewPublisher creates a new http publisher
func NewPublisher(ctx context.Context,
ds datastore.Batching,
lsys ipld.LinkSystem) (legs.LegPublisher, error) {
hp := &httpPublisher{}
hp.lsys = lsys
return hp, nil
}

func (h *httpPublisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ask := path.Base(r.URL.Path)
if ask == "head" {
// serve the
h.rl.RLock()
defer h.rl.RUnlock()
out, err := json.Marshal(h.root.String())
if err != nil {
w.WriteHeader(500)
log.Infow("failed to serve root", "err", err)
} else {
_, _ = w.Write(out)
}
return
}
// interpret `ask` as a CID to serve.
c, err := cid.Parse(ask)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("invalid request: not a cid"))
return
}
item, err := h.lsys.Load(ipld.LinkContext{}, cidlink.Link{Cid: c}, basicnode.Prototype.Any)
if err != nil {
if errors.Is(err, ipld.ErrNotExists{}) {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("cid not found"))
return
}
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("unable to load data for cid"))
log.Infow("failed to load requested block", "err", err)
return
}
// marshal to json and serve.
_ = dagjson.Encode(item, w)
}

func (h *httpPublisher) UpdateRoot(ctx context.Context, c cid.Cid) error {
h.rl.Lock()
defer h.rl.Unlock()
h.root = c
return nil
}

func (h *httpPublisher) Close() error {
return nil
}
Loading