Skip to content

Commit

Permalink
http-transport legs (#23)
Browse files Browse the repository at this point in the history
* http-transport for legs
* expand selector interface
  • Loading branch information
willscott authored Oct 28, 2021
1 parent 7a2ce4c commit e83b734
Show file tree
Hide file tree
Showing 14 changed files with 751 additions and 108 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/libp2p/go-libp2p-core v0.9.0
github.com/libp2p/go-libp2p-pubsub v0.5.4
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/multiformats/go-multiaddr v0.4.0
github.com/multiformats/go-multicodec v0.3.0
github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
Expand Down
69 changes: 69 additions & 0 deletions http/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package http_test

import (
"context"
"net"
nhttp "net/http"
"testing"
"time"

"github.com/filecoin-project/go-legs/http"
"github.com/filecoin-project/go-legs/test"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

func TestManualSync(t *testing.T) {
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcSys := test.MkLinkSystem(srcStore)
p, err := http.NewPublisher(context.Background(), srcStore, srcSys)
if err != nil {
t.Fatal(err)
}
nl, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
go func() {
_ = nhttp.Serve(nl, p.(nhttp.Handler))
}()
nlm, err := manet.FromNetAddr(nl.Addr())
if err != nil {
t.Fatal(err)
}
proto, _ := multiaddr.NewMultiaddr("/http")
nlm = multiaddr.Join(nlm, proto)

dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstSys := test.MkLinkSystem(dstStore)
s, err := http.NewHTTPSubscriber(context.Background(), nhttp.DefaultClient, nlm, &dstSys, "", nil)
if err != nil {
t.Fatal(err)
}

rootLnk, err := test.Store(srcStore, basicnode.NewString("hello world"))
if err := p.UpdateRoot(context.Background(), rootLnk.(cidlink.Link).Cid); err != nil {
t.Fatal(err)
}

cchan, cncl, err := s.Sync(context.Background(), peer.NewPeerRecord().PeerID, cid.Undef, nil)
if err != nil {
t.Fatal(err)
}

select {
case rc := <-cchan:
if !rc.Equals(rootLnk.(cidlink.Link).Cid) {
t.Fatalf("didn't get expected cid. expected %s, got %s", rootLnk, rc)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out")
}
cncl()
}
134 changes: 134 additions & 0 deletions http/multiaddr/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package multiaddr

import (
"bytes"
"fmt"
"net"
"net/url"

"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

// register an 'httpath' component:
var transcodePath = multiaddr.NewTranscoderFromFunctions(pathStB, pathBtS, pathVal)

func pathVal(b []byte) error {
if bytes.IndexByte(b, '/') >= 0 {
return fmt.Errorf("encoded path '%s' contains a slash", string(b))
}
return nil
}

func pathStB(s string) ([]byte, error) {
return []byte(s), nil
}

func pathBtS(b []byte) (string, error) {
return string(b), nil
}

func init() {
_ = multiaddr.AddProtocol(protoHTTPath)
}

var protoHTTPath = multiaddr.Protocol{
Name: "httpath",
Code: 0x300200,
VCode: multiaddr.CodeToVarint(0x300200),
Size: multiaddr.LengthPrefixedVarSize,
Transcoder: transcodePath,
}

// ToURL takes a multiaddr of the form:
// /dns/thing.com/http/urlescape<path/to/root>
// /ip/192.168.0.1/tcp/80/http
func ToURL(ma multiaddr.Multiaddr) (*url.URL, error) {
// host should be either the dns name or the IP
_, host, err := manet.DialArgs(ma)
if err != nil {
return nil, err
}
if ip := net.ParseIP(host); ip != nil {
if !ip.To4().Equal(ip) {
// raw v6 IPs need `[ip]` encapsulation.
host = fmt.Sprintf("[%s]", host)
}
}

protos := ma.Protocols()
pm := make(map[int]string, len(protos))
for _, p := range protos {
v, err := ma.ValueForProtocol(p.Code)
if err == nil {
pm[p.Code] = v
}
}

scheme := "http"
if _, ok := pm[multiaddr.P_HTTPS]; ok {
scheme = "https"
} // todo: ws/wss

path := ""
if pb, ok := pm[protoHTTPath.Code]; ok {
path, err = url.PathUnescape(pb)
if err != nil {
path = ""
}
}

out := url.URL{
Scheme: scheme,
Host: host,
Path: path,
}
return &out, nil
}

// ToMA takes a url and converts it into a multiaddr.
// converts scheme://host:port/path -> /ip/host/tcp/port/scheme/urlescape{path}
func ToMA(u *url.URL) (*multiaddr.Multiaddr, error) {
h := u.Hostname()
var addr *multiaddr.Multiaddr
if n := net.ParseIP(h); n != nil {
ipAddr, err := manet.FromIP(n)
if err != nil {
return nil, err
}
addr = &ipAddr
} else {
// domain name
ma, err := multiaddr.NewComponent(multiaddr.ProtocolWithCode(multiaddr.P_DNS).Name, h)
if err != nil {
return nil, err
}
mab := multiaddr.Cast(ma.Bytes())
addr = &mab
}
pv := u.Port()
if pv != "" {
port, err := multiaddr.NewComponent(multiaddr.ProtocolWithCode(multiaddr.P_TCP).Name, pv)
if err != nil {
return nil, err
}
wport := multiaddr.Join(*addr, port)
addr = &wport
}

http, err := multiaddr.NewComponent(u.Scheme, "")
if err != nil {
return nil, err
}

joint := multiaddr.Join(*addr, http)
if u.Path != "" {
httpath, err := multiaddr.NewComponent(protoHTTPath.Name, url.PathEscape(u.Path))
if err != nil {
return nil, err
}
joint = multiaddr.Join(joint, httpath)
}

return &joint, nil
}
34 changes: 34 additions & 0 deletions http/multiaddr/convert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package multiaddr

import (
"net/url"
"testing"
)

func TestRoundtrip(t *testing.T) {
samples := []string{
"http://www.google.com/path/to/rsrc",
"https://protocol.ai",
"http://192.168.0.1:8080/admin",
"https://[2a00:1450:400e:80d::200e]:443/",
"https://[2a00:1450:400e:80d::200e]/",
}

for _, s := range samples {
u, _ := url.Parse(s)
mu, err := ToMA(u)
if err != nil {
t.Fatal(err)
}
u2, err := ToURL(*mu)
if u2.Scheme != u.Scheme {
t.Fatalf("scheme didn't roundtrip. got %s expected %s", u2.Scheme, u.Scheme)
}
if u2.Host != u.Host {
t.Fatalf("host didn't roundtrip. got %s, expected %s", u2.Host, u.Host)
}
if u2.Path != u.Path {
t.Fatalf("path didn't roundtrip. got %s, expected %s", u2.Path, u.Path)
}
}
}
85 changes: 85 additions & 0 deletions http/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package http

import (
"context"
"encoding/json"
"errors"
"net/http"
"path"
"sync"

"github.com/filecoin-project/go-legs"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
)

type httpPublisher struct {
rl sync.RWMutex
root cid.Cid
lsys ipld.LinkSystem
}

var _ legs.LegPublisher = (*httpPublisher)(nil)
var _ http.Handler = (*httpPublisher)(nil)

// NewPublisher creates a new http publisher
func NewPublisher(ctx context.Context,
ds datastore.Batching,
lsys ipld.LinkSystem) (legs.LegPublisher, error) {
hp := &httpPublisher{}
hp.lsys = lsys
return hp, nil
}

func (h *httpPublisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ask := path.Base(r.URL.Path)
if ask == "head" {
// serve the
h.rl.RLock()
defer h.rl.RUnlock()
out, err := json.Marshal(h.root.String())
if err != nil {
w.WriteHeader(500)
log.Infow("failed to serve root", "err", err)
} else {
_, _ = w.Write(out)
}
return
}
// interpret `ask` as a CID to serve.
c, err := cid.Parse(ask)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("invalid request: not a cid"))
return
}
item, err := h.lsys.Load(ipld.LinkContext{}, cidlink.Link{Cid: c}, basicnode.Prototype.Any)
if err != nil {
if errors.Is(err, ipld.ErrNotExists{}) {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("cid not found"))
return
}
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("unable to load data for cid"))
log.Infow("failed to load requested block", "err", err)
return
}
// marshal to json and serve.
_ = dagjson.Encode(item, w)
}

func (h *httpPublisher) UpdateRoot(ctx context.Context, c cid.Cid) error {
h.rl.Lock()
defer h.rl.Unlock()
h.root = c
return nil
}

func (h *httpPublisher) Close() error {
return nil
}
Loading

0 comments on commit e83b734

Please sign in to comment.