Skip to content

Commit

Permalink
refactor(remote): move registry http handlers onto remote
Browse files Browse the repository at this point in the history
so, these belong here. The contract set forth by remote.Client operating over HTTP should be solved in the same package. Also makes for much easier testing
  • Loading branch information
b5 committed Feb 11, 2020
1 parent fc443f9 commit e1a203d
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 102 deletions.
66 changes: 2 additions & 64 deletions registry/regserver/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ package handlers

import (
"net/http"
"strings"
"time"

"github.com/qri-io/apiutil"
"github.com/qri-io/qri/registry"
"github.com/qri-io/qri/remote"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -53,18 +50,8 @@ func NewRoutes(reg registry.Registry, opts ...func(o *RouteOptions)) *http.Serve
mux.HandleFunc("/health", HealthCheckHandler)

if rem := reg.Remote; rem != nil {
mux.Handle("/remote/dsync", rem.DsyncHTTPHandler())
mux.Handle("/remote/logsync", rem.LogsyncHTTPHandler())
mux.Handle("/remote/refs", rem.RefsHTTPHandler())

if fs := reg.Remote.Feeds; fs != nil {
mux.Handle("/remote/feeds", FeedsHandler(fs))
mux.Handle("/remote/feeds/", FeedHandler("/remote/feeds/", fs))
}
if ps := reg.Remote.Previews; ps != nil {
mux.Handle("/remote/dataset/preview/", PreviewHandler("/remote/dataset/preview/", ps))
mux.Handle("/remote/dataset/component/", ComponentHandler(ps))
}
// add any "/remote" routes this remote provides
rem.AddDefaultRoutes(mux)
}

if ps := reg.Profiles; ps != nil {
Expand Down Expand Up @@ -92,52 +79,3 @@ func HealthCheckHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"meta":{"code": 200,"status":"ok"},"data":null}`))
}

// max number of items in a page of feed data
const feedPageSize = 30

// FeedsHandler provides access to the home feed
func FeedsHandler(fs remote.Feeds) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
feeds, err := fs.Feeds(req.Context(), "")
if err != nil {
apiutil.WriteErrResponse(w, http.StatusBadRequest, err)
return
}

apiutil.WriteResponse(w, feeds)
}
}

// FeedHandler gives access a feed VersionInfos constructed by a remote
func FeedHandler(prefix string, fs remote.Feeds) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
page := apiutil.PageFromRequest(req)
refs, err := fs.Feed(req.Context(), "", strings.TrimPrefix(req.URL.Path, prefix), page.Offset(), page.Limit())
if err != nil {
apiutil.WriteErrResponse(w, http.StatusBadRequest, err)
}

apiutil.WritePageResponse(w, refs, req, page)
}
}

// PreviewHandler handles dataset preview requests over HTTP
func PreviewHandler(prefix string, ps remote.Previews) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
preview, err := ps.Preview(req.Context(), "", strings.TrimPrefix(req.URL.Path, prefix))
if err != nil {
apiutil.WriteErrResponse(w, http.StatusBadRequest, err)
return
}

apiutil.WriteResponse(w, preview)
}
}

// ComponentHandler handles dataset component requests over HTTP
func ComponentHandler(fs remote.Previews) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("unfinished: ComponentHTTPHandler"))
}
}
41 changes: 41 additions & 0 deletions remote/peer_sync_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package remote
import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/qri-io/qfs"
"github.com/qri-io/qfs/cafs"
"github.com/qri-io/qri/config"
cfgtest "github.com/qri-io/qri/config/test"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/p2p"
p2ptest "github.com/qri-io/qri/p2p/test"
"github.com/qri-io/qri/repo"
Expand Down Expand Up @@ -44,6 +46,45 @@ func TestAddDataset(t *testing.T) {
}
}

func TestClientFeedsAndPreviews(t *testing.T) {
tr, cleanup := newTestRunner(t)
defer cleanup()

worldBankRef := writeWorldBankPopulation(tr.Ctx, t, tr.NodeA.Repo)
publishRef(t, tr.NodeA.Repo, &worldBankRef)

rem := tr.NodeARemote(t)
server := tr.RemoteTestServer(rem)
defer server.Close()

cli := tr.NodeBClient(t)

feeds, err := cli.Feeds(tr.Ctx, server.URL)
if err != nil {
t.Error(err)
}

expect := map[string][]dsref.VersionInfo{
"recent": {
{
Username: "A",
Name: "world_bank_population",
Path: "/ipfs/QmVeWbw4DJQqWjKXohgTu5JdhVniLPiyb6z6m1duwvXdQe",
MetaTitle: "World Bank Population",
BodySize: 5,
BodyRows: 1,
BodyFormat: "json",
CommitTitle: "initial commit",
CommitMessage: "created dataset",
},
},
}

if diff := cmp.Diff(expect, feeds); diff != "" {
t.Errorf("feeds result mismatch (-want +got): \n%s", diff)
}
}

func newMemRepoTestNode(t *testing.T) *p2p.QriNode {
ms := cafs.NewMapstore()
pi := cfgtest.GetTestPeerInfo(0)
Expand Down
104 changes: 86 additions & 18 deletions remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"

golog "github.com/ipfs/go-log"
"github.com/qri-io/apiutil"
"github.com/qri-io/dag"
"github.com/qri-io/dag/dsync"
"github.com/qri-io/qri/base"
Expand Down Expand Up @@ -175,12 +177,30 @@ func (r *Remote) Node() *p2p.QriNode {
return r.node
}

// Address extracts the address of a remote from a configuration for a given
// remote name
func Address(cfg *config.Config, name string) (addr string, err error) {
if name == "" {
if cfg.Registry != nil && cfg.Registry.Location != "" {
return cfg.Registry.Location, nil
}
return "", fmt.Errorf("no registry specifiied to use as default remote")
}

if dst, found := cfg.Remotes.Get(name); found {
return dst, nil
}

return "", fmt.Errorf(`remote name "%s" not found`, name)
}

// ResolveHeadRef fetches the current dataset head path for a given peername and dataset name
func (r *Remote) ResolveHeadRef(ctx context.Context, peername, name string) (*reporef.DatasetRef, error) {
ref := &reporef.DatasetRef{
Peername: peername,
Name: name,
}

err := repo.CanonicalizeDatasetRef(r.node.Repo, ref)
return ref, err
}
Expand All @@ -196,7 +216,7 @@ func (r *Remote) RemoveDataset(ctx context.Context, params map[string]string) er
if err != nil {
return err
}
log.Debug("remove dataset ", ref)
log.Debugf("remove dataset %s", ref)

// run pre check hook
if r.datasetRemovePreCheck != nil {
Expand Down Expand Up @@ -382,6 +402,22 @@ func (r *Remote) logHook(h Hook) logsync.Hook {
}
}

// AddDefaultRoutes attaches routes a remote client will expect to an HTTP muxer
func (r *Remote) AddDefaultRoutes(mux *http.ServeMux) {
mux.Handle("/remote/dsync", r.DsyncHTTPHandler())
mux.Handle("/remote/logsync", r.LogsyncHTTPHandler())
mux.Handle("/remote/refs", r.RefsHTTPHandler())

if fs := r.Feeds; fs != nil {
mux.Handle("/remote/feeds", r.FeedsHTTPHandler())
mux.Handle("/remote/feeds/", r.FeedHTTPHandler("/remote/feeds/"))
}
if ps := r.Previews; ps != nil {
mux.Handle("/remote/dataset/preview/", r.PreviewHTTPHandler("/remote/dataset/preview/"))
mux.Handle("/remote/dataset/component/", r.ComponentHTTPHandler("/remote/dataset/component/"))
}
}

// DsyncHTTPHandler provides an http handler for dsync
func (r *Remote) DsyncHTTPHandler() http.HandlerFunc {
return dsync.HTTPRemoteHandler(r.dsync)
Expand All @@ -392,6 +428,55 @@ func (r *Remote) LogsyncHTTPHandler() http.HandlerFunc {
return logsync.HTTPHandler(r.logsync)
}

// FeedsHTTPHandler provides access to the home feed
func (r *Remote) FeedsHTTPHandler() http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
feeds, err := r.Feeds.Feeds(req.Context(), "")
if err != nil {
apiutil.WriteErrResponse(w, http.StatusBadRequest, err)
return
}

apiutil.WriteResponse(w, feeds)
}
}

// max number of items in a page of feed data
const feedPageSize = 30

// FeedHTTPHandler gives access a feed VersionInfos constructed by a remote
func (r *Remote) FeedHTTPHandler(prefix string) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
page := apiutil.PageFromRequest(req)
refs, err := r.Feeds.Feed(req.Context(), "", strings.TrimPrefix(req.URL.Path, prefix), page.Offset(), page.Limit())
if err != nil {
apiutil.WriteErrResponse(w, http.StatusBadRequest, err)
}

apiutil.WritePageResponse(w, refs, req, page)
}
}

// PreviewHTTPHandler handles dataset preview requests over HTTP
func (r *Remote) PreviewHTTPHandler(prefix string) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
preview, err := r.Previews.Preview(req.Context(), "", strings.TrimPrefix(req.URL.Path, prefix))
if err != nil {
apiutil.WriteErrResponse(w, http.StatusBadRequest, err)
return
}

apiutil.WriteResponse(w, preview)
}
}

// ComponentHTTPHandler handles dataset component requests over HTTP
func (r *Remote) ComponentHTTPHandler(prefix string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("unfinished: ComponentHTTPHandler"))
}
}

// RefsHTTPHandler handles requests for dataset references
func (r *Remote) RefsHTTPHandler() http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -436,20 +521,3 @@ func (r *Remote) RefsHTTPHandler() http.HandlerFunc {
}
}
}

// Address extracts the address of a remote from a configuration for a given
// remote name
func Address(cfg *config.Config, name string) (addr string, err error) {
if name == "" {
if cfg.Registry != nil && cfg.Registry.Location != "" {
return cfg.Registry.Location, nil
}
return "", fmt.Errorf("no registry specifiied to use as default remote")
}

if dst, found := cfg.Remotes.Get(name); found {
return dst, nil
}

return "", fmt.Errorf(`remote name "%s" not found`, name)
}
52 changes: 32 additions & 20 deletions remote/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,13 @@ func TestDatasetPullPushDeleteHTTP(t *testing.T) {
o.LogRemoved = callCheck("LogRemoved")
}

aCfg := &config.Remote{
Enabled: true,
AllowRemoves: true,
AcceptSizeMax: 10000,
}

rem, err := NewRemote(tr.NodeA, aCfg, opts)
if err != nil {
t.Error(err)
}

mux := http.NewServeMux()
mux.Handle("/remote/refs", rem.RefsHTTPHandler())
mux.Handle("/remote/logsync", rem.LogsyncHTTPHandler())
mux.Handle("/remote/dsync", rem.DsyncHTTPHandler())
server := httptest.NewServer(mux)
rem := tr.NodeARemote(t, opts)
server := tr.RemoteTestServer(rem)
defer server.Close()

worldBankRef := writeWorldBankPopulation(tr.Ctx, t, tr.NodeA.Repo)

cli, err := NewClient(tr.NodeB)
if err != nil {
t.Error(err)
}
cli := tr.NodeBClient(t)

relRef := &reporef.DatasetRef{Peername: worldBankRef.Peername, Name: worldBankRef.Name}
if err := cli.ResolveHeadRef(tr.Ctx, relRef, server.URL); err != nil {
Expand Down Expand Up @@ -266,6 +250,34 @@ func newTestRunner(t *testing.T) (tr *testRunner, cleanup func()) {
return tr, cleanup
}

func (tr *testRunner) NodeARemote(t *testing.T, opts ...func(o *Options)) *Remote {
aCfg := &config.Remote{
Enabled: true,
AllowRemoves: true,
AcceptSizeMax: 10000,
}

rem, err := NewRemote(tr.NodeA, aCfg, opts...)
if err != nil {
t.Fatal(err)
}
return rem
}

func (tr *testRunner) RemoteTestServer(rem *Remote) *httptest.Server {
mux := http.NewServeMux()
rem.AddDefaultRoutes(mux)
return httptest.NewServer(mux)
}

func (tr *testRunner) NodeBClient(t *testing.T) Client {
cli, err := NewClient(tr.NodeB)
if err != nil {
t.Fatal(err)
}
return cli
}

func qriNode(t *testing.T, peername string, node *core.IpfsNode, pi *cfgtest.PeerInfo) *p2p.QriNode {
repo, err := p2ptest.MakeRepoFromIPFSNode(node, peername)
if err != nil {
Expand Down

0 comments on commit e1a203d

Please sign in to comment.