From 49dbef92285bee8c8d2c99ce6a15e1b6a1cf7636 Mon Sep 17 00:00:00 2001 From: Dustin Long Date: Tue, 12 Mar 2019 16:41:47 -0400 Subject: [PATCH] feat(remote): Beginning of "remote" mode implementation On the remote side, the following is being added: * The flag --remote-mode which enables remote mode * An api /dataset that can be posted to, only in remote mode * Receive method that calculates size of dag, and fails if too large On the client side: * The `qri publish` command has a "--remote" to control where to publish * Publishing to a remote sends a dag.Info instead of dataset head * Remove call to CanonicalizeDatasetRef in base/ Missing: * Remote mode should act like read-only * dag.Info is a stub for now * `qri publish --remote ...` is ignoring name of remote * dSync when a push to a remote succeeds --- api/api.go | 5 ++ api/datasets.go | 10 ++-- api/remote.go | 52 ++++++++++++++++++ base/ref.go | 4 -- cmd/connect.go | 11 +++- cmd/factory.go | 1 + cmd/factory_test.go | 5 ++ cmd/publish.go | 35 ++++++++---- cmd/qri.go | 8 +++ config/api.go | 2 + lib/datasets.go | 37 ++++++++----- lib/params.go | 11 ++++ lib/remote.go | 131 ++++++++++++++++++++++++++++++++++++++++++++ 13 files changed, 276 insertions(+), 36 deletions(-) create mode 100644 api/remote.go create mode 100644 lib/remote.go diff --git a/api/api.go b/api/api.go index 34a817694..f72a7199c 100644 --- a/api/api.go +++ b/api/api.go @@ -227,6 +227,11 @@ func NewServerRoutes(s *Server) *http.ServeMux { dsh := NewDatasetHandlers(s.qriNode, s.cfg.API.ReadOnly) + if s.cfg.API.RemoteMode { + remh := NewRemoteHandlers(s.qriNode) + m.Handle("/dataset", s.middleware(remh.ReceiveHandler)) + } + m.Handle("/list", s.middleware(dsh.ListHandler)) m.Handle("/list/", s.middleware(dsh.PeerListHandler)) m.Handle("/save", s.middleware(dsh.SaveHandler)) diff --git a/api/datasets.go b/api/datasets.go index c4a1ce287..fe9b39ed0 100644 --- a/api/datasets.go +++ b/api/datasets.go @@ -622,18 +622,18 @@ func (h DatasetHandlers) publishHandler(w http.ResponseWriter, r *http.Request, return } - ref.Published = publish p := &lib.SetPublishStatusParams{ - Ref: &ref, + Ref: ref.String(), + PublishStatus: publish, UpdateRegistry: r.FormValue("no_registry") != "true", UpdateRegistryPin: r.FormValue("no_pin") != "true", } - var ok bool - if err := h.DatasetRequests.SetPublishStatus(p, &ok); err != nil { + var publishedRef repo.DatasetRef + if err := h.DatasetRequests.SetPublishStatus(p, &publishedRef); err != nil { util.WriteErrResponse(w, http.StatusInternalServerError, err) return } - util.WriteResponse(w, ref) + util.WriteResponse(w, publishedRef) } func (h DatasetHandlers) updateHandler(w http.ResponseWriter, r *http.Request) { diff --git a/api/remote.go b/api/remote.go new file mode 100644 index 000000000..c2399e1d3 --- /dev/null +++ b/api/remote.go @@ -0,0 +1,52 @@ +package api + +import ( + "io/ioutil" + "net/http" + + util "github.com/datatogether/api/apiutil" + "github.com/qri-io/qri/lib" + "github.com/qri-io/qri/p2p" +) + +// RemoteHandlers wraps a request struct to interface with http.HandlerFunc +type RemoteHandlers struct { + lib.RemoteRequests +} + +// NewRemoteHandlers allocates a RemoteHandlers pointer +func NewRemoteHandlers(node *p2p.QriNode) *RemoteHandlers { + req := lib.NewRemoteRequests(node, nil) + return &RemoteHandlers{*req} +} + +// ReceiveHandler is the endpoint for remotes to receive daginfo +func (h *RemoteHandlers) ReceiveHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "POST": + h.receiveDataset(w, r) + default: + util.NotFoundHandler(w, r) + } +} + +func (h *RemoteHandlers) receiveDataset(w http.ResponseWriter, r *http.Request) { + content, err := ioutil.ReadAll(r.Body) + if err != nil { + util.WriteErrResponse(w, http.StatusInternalServerError, err) + return + } + var result bool + params := lib.ReceiveParams{Body: string(content)} + err = h.Receive(¶ms, &result) + if err != nil { + util.WriteErrResponse(w, http.StatusInternalServerError, err) + return + } + // TODO(dlong): Perform dsync + if result { + util.WriteResponse(w, "Accepted") + return + } + util.WriteResponse(w, "Denied") +} diff --git a/base/ref.go b/base/ref.go index 06cfa7835..d3051f0a1 100644 --- a/base/ref.go +++ b/base/ref.go @@ -18,10 +18,6 @@ func InLocalNamespace(r repo.Repo, ref *repo.DatasetRef) bool { // SetPublishStatus updates the Published field of a dataset ref func SetPublishStatus(r repo.Repo, ref *repo.DatasetRef, published bool) error { - if err := repo.CanonicalizeDatasetRef(r, ref); err != nil { - return err - } - if !InLocalNamespace(r, ref) { return fmt.Errorf("can't publish datasets that are not in your namespace") } diff --git a/cmd/connect.go b/cmd/connect.go index d9bd86fa1..9e4671012 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -53,6 +53,7 @@ peers & swapping data.`, cmd.Flags().BoolVarP(&o.Setup, "setup", "", false, "run setup if necessary, reading options from environment variables") cmd.Flags().BoolVarP(&o.ReadOnly, "read-only", "", false, "run qri in read-only mode, limits the api endpoints") + cmd.Flags().BoolVarP(&o.RemoteMode, "remote-mode", "", false, "run qri in remote mode") cmd.Flags().StringVarP(&o.Registry, "registry", "", "", "specify registry to setup with. only works when --setup is true") return cmd @@ -72,9 +73,10 @@ type ConnectOptions struct { DisableWebapp bool DisableP2P bool - Registry string - Setup bool - ReadOnly bool + Registry string + Setup bool + ReadOnly bool + RemoteMode bool Node *p2p.QriNode Config *config.Config @@ -133,6 +135,9 @@ func (o *ConnectOptions) Run() (err error) { if o.ReadOnly { cfg.API.ReadOnly = true } + if o.RemoteMode { + cfg.API.RemoteMode = true + } if o.DisableP2P { cfg.P2P.Enabled = false } diff --git a/cmd/factory.go b/cmd/factory.go index f213a5f53..00e25c2cc 100644 --- a/cmd/factory.go +++ b/cmd/factory.go @@ -26,6 +26,7 @@ type Factory interface { ConnectionNode() (*p2p.QriNode, error) DatasetRequests() (*lib.DatasetRequests, error) + RemoteRequests() (*lib.RemoteRequests, error) RegistryRequests() (*lib.RegistryRequests, error) LogRequests() (*lib.LogRequests, error) ExportRequests() (*lib.ExportRequests, error) diff --git a/cmd/factory_test.go b/cmd/factory_test.go index 5652bad3a..754e19fdc 100644 --- a/cmd/factory_test.go +++ b/cmd/factory_test.go @@ -105,6 +105,11 @@ func (t TestFactory) DatasetRequests() (*lib.DatasetRequests, error) { return lib.NewDatasetRequests(t.node, t.rpc), nil } +// RemoteRequests generates a lib.RemoteRequests from internal state +func (t TestFactory) RemoteRequests() (*lib.RemoteRequests, error) { + return lib.NewRemoteRequests(t.node, t.rpc), nil +} + // RegistryRequests generates a lib.RegistryRequests from internal state func (t TestFactory) RegistryRequests() (*lib.RegistryRequests, error) { return lib.NewRegistryRequests(t.node, t.rpc), nil diff --git a/cmd/publish.go b/cmd/publish.go index be15496ee..18b6b79d8 100644 --- a/cmd/publish.go +++ b/cmd/publish.go @@ -43,6 +43,7 @@ to a published dataset will be immediately visible to connected peers. cmd.Flags().BoolVarP(&o.Unpublish, "unpublish", "", false, "unpublish a dataset") cmd.Flags().BoolVarP(&o.NoRegistry, "no-registry", "", false, "don't publish to registry") cmd.Flags().BoolVarP(&o.NoPin, "no-pin", "", false, "don't pin dataset to registry") + cmd.Flags().StringVarP(&o.RemoteName, "remote", "", "", "name of remote to publish to") return cmd } @@ -55,38 +56,50 @@ type PublishOptions struct { Unpublish bool NoRegistry bool NoPin bool + RemoteName string DatasetRequests *lib.DatasetRequests + RemoteRequests *lib.RemoteRequests } // Complete adds any missing configuration that can only be added just before calling Run func (o *PublishOptions) Complete(f Factory, args []string) (err error) { o.Refs = args o.DatasetRequests, err = f.DatasetRequests() + o.RemoteRequests, err = f.RemoteRequests() return } // Run executes the publish command func (o *PublishOptions) Run() error { - var res bool - - for _, arg := range o.Refs { - ref, err := repo.ParseDatasetRef(arg) - if err != nil { - return err + for _, ref := range o.Refs { + if o.RemoteName != "" { + // Publish for a "Remote". + p := lib.PushParams{ + Ref: ref, + RemoteName: o.RemoteName, + } + var res bool + if err := o.RemoteRequests.PushToRemote(&p, &res); err != nil { + return err + } + // TODO(dlong): Check if the operation succeeded or failed. Perform dsync. + return nil } - ref.Published = !o.Unpublish - p := &lib.SetPublishStatusParams{ - Ref: &ref, + // Publish for the legacy Registry server. + p := lib.SetPublishStatusParams{ + Ref: ref, + PublishStatus: !o.Unpublish, UpdateRegistry: !o.NoRegistry, UpdateRegistryPin: !o.NoPin, } - if err = o.DatasetRequests.SetPublishStatus(p, &res); err != nil { + var publishedRef repo.DatasetRef + if err := o.DatasetRequests.SetPublishStatus(&p, &publishedRef); err != nil { return err } - printInfo(o.Out, "published dataset %s", ref) + printInfo(o.Out, "published dataset %s", publishedRef) } return nil } diff --git a/cmd/qri.go b/cmd/qri.go index 622ac653c..92cb224d1 100644 --- a/cmd/qri.go +++ b/cmd/qri.go @@ -239,6 +239,14 @@ func (o *QriOptions) DatasetRequests() (*lib.DatasetRequests, error) { return lib.NewDatasetRequests(o.node, o.rpc), nil } +// RemoteRequests generates a lib.RemoteRequests from internal state +func (o *QriOptions) RemoteRequests() (*lib.RemoteRequests, error) { + if err := o.Init(); err != nil { + return nil, err + } + return lib.NewRemoteRequests(o.node, o.rpc), nil +} + // RegistryRequests generates a lib.RegistryRequests from internal state func (o *QriOptions) RegistryRequests() (*lib.RegistryRequests, error) { if err := o.Init(); err != nil { diff --git a/config/api.go b/config/api.go index e259046f3..fbb14b01d 100644 --- a/config/api.go +++ b/config/api.go @@ -17,6 +17,8 @@ type API struct { Port int `json:"port"` // read-only mode ReadOnly bool `json:"readonly"` + // remote mode + RemoteMode bool `json:"remotemode"` // URLRoot is the base url for this server URLRoot string `json:"urlroot"` // TLS enables https via letsEyncrypt diff --git a/lib/datasets.go b/lib/datasets.go index e573eb3db..713468032 100644 --- a/lib/datasets.go +++ b/lib/datasets.go @@ -284,12 +284,13 @@ func (r *DatasetRequests) Save(p *SaveParams, res *repo.DatasetRef) (err error) } if p.Publish { - var done bool + var publishedRef repo.DatasetRef err = r.SetPublishStatus(&SetPublishStatusParams{ - Ref: &ref, + Ref: ref.String(), + PublishStatus: true, UpdateRegistry: true, UpdateRegistryPin: true, - }, &done) + }, &publishedRef) if err != nil { return err @@ -372,42 +373,52 @@ func (r *DatasetRequests) Update(p *UpdateParams, res *repo.DatasetRef) error { // SetPublishStatusParams encapsulates parameters for setting the publication status of a dataset type SetPublishStatusParams struct { - Ref *repo.DatasetRef + Ref string + PublishStatus bool UpdateRegistry bool UpdateRegistryPin bool } // SetPublishStatus updates the publicity of a reference in the peer's namespace -func (r *DatasetRequests) SetPublishStatus(p *SetPublishStatusParams, res *bool) (err error) { +func (r *DatasetRequests) SetPublishStatus(p *SetPublishStatusParams, publishedRef *repo.DatasetRef) (err error) { if r.cli != nil { - return r.cli.Call("DatasetRequests.SetPublishStatus", p, res) + return r.cli.Call("DatasetRequests.SetPublishStatus", p, publishedRef) + } + + ref, err := repo.ParseDatasetRef(p.Ref) + if err != nil { + return err + } + if err = repo.CanonicalizeDatasetRef(r.node.Repo, &ref); err != nil { + return err } - ref := p.Ref - res = &ref.Published - if err = actions.SetPublishStatus(r.node, ref, ref.Published); err != nil { + ref.Published = p.PublishStatus + if err = actions.SetPublishStatus(r.node, &ref, ref.Published); err != nil { return err } + *publishedRef = ref + if p.UpdateRegistry && r.node.Repo.Registry() != nil { var done bool rr := NewRegistryRequests(r.node, nil) if ref.Published { - if err = rr.Publish(ref, &done); err != nil { + if err = rr.Publish(&ref, &done); err != nil { return } if p.UpdateRegistryPin { - return rr.Pin(ref, &done) + return rr.Pin(&ref, &done) } } else { - if err = rr.Unpublish(ref, &done); err != nil { + if err = rr.Unpublish(&ref, &done); err != nil { return } if p.UpdateRegistryPin { - return rr.Unpin(ref, &done) + return rr.Unpin(&ref, &done) } } } diff --git a/lib/params.go b/lib/params.go index 758876957..0e0f52983 100644 --- a/lib/params.go +++ b/lib/params.go @@ -66,3 +66,14 @@ func (lp ListParams) Page() util.Page { number = lp.Offset/size + 1 return util.NewPage(number, size) } + +// PushParams holds parameters for pushing daginfo to remotes +type PushParams struct { + Ref string + RemoteName string +} + +// ReceiveParams hold parameters for receiving daginfo's when running as a remote +type ReceiveParams struct { + Body string +} diff --git a/lib/remote.go b/lib/remote.go new file mode 100644 index 000000000..ca5ec1460 --- /dev/null +++ b/lib/remote.go @@ -0,0 +1,131 @@ +package lib + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/rpc" + + "github.com/qri-io/qri/p2p" + "github.com/qri-io/qri/repo" +) + +const allowedDagInfoSize uint64 = 10 * 1024 * 1024 + +// RemoteRequests encapsulates business logic of remote operation +type RemoteRequests struct { + cli *rpc.Client + node *p2p.QriNode +} + +// NewRemoteRequests creates a RemoteRequests pointer from either a node or an rpc.Client +func NewRemoteRequests(node *p2p.QriNode, cli *rpc.Client) *RemoteRequests { + if node != nil && cli != nil { + panic(fmt.Errorf("both repo and client supplied to NewRemoteRequests")) + } + return &RemoteRequests{ + cli: cli, + node: node, + } +} + +// CoreRequestsName implements the Requests interface +func (RemoteRequests) CoreRequestsName() string { return "remote" } + +// PushToRemote posts a dagInfo to a remote +func (r *RemoteRequests) PushToRemote(p *PushParams, out *bool) error { + if r.cli != nil { + return r.cli.Call("DatasetRequests.PushToRemote", p, out) + } + + ref, err := repo.ParseDatasetRef(p.Ref) + if err != nil { + return err + } + if err = repo.CanonicalizeDatasetRef(r.node.Repo, &ref); err != nil { + return err + } + + // TODO(dlong): Switch to actual dag.Info constructor when it becomes available. + dinfo, err := newDagInfoTmp(r.node, ref.Path) + if err != nil { + return err + } + + // TODO(dlong): Resolve remote name from p.RemoteName instead of using registry's location. + location := Config.Registry.Location + + data, err := json.Marshal(dinfo) + if err != nil { + return err + } + + req, err := http.NewRequest("POST", fmt.Sprintf("%s/dataset", location), bytes.NewReader(data)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + httpClient := http.DefaultClient + res, err := httpClient.Do(req) + if err != nil { + return err + } + + content, err := ioutil.ReadAll(res.Body) + if err != nil { + panic(err) + } + // TODO(dlong): Inspect the remote's response, and then perform dsync. + fmt.Printf(string(content)) + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("error code %d", res.StatusCode) + } + + *out = true + return nil +} + +// Receive is used to save a dataset when running as a remote. API only, not RPC or command-line. +func (r *RemoteRequests) Receive(p *ReceiveParams, out *bool) (err error) { + if r.cli != nil { + return fmt.Errorf("receive cannot be called over RPC") + } + + dinfo := dagInfoTmp{} + err = json.Unmarshal([]byte(p.Body), &dinfo) + if err != nil { + return err + } + + fmt.Printf("Received dag.Info:\n") + fmt.Printf(p.Body) + fmt.Printf("\n\n") + + var totalSize uint64 + for _, s := range dinfo.Sizes { + totalSize += s + } + + // TODO(dlong): Customization for how to decide to accept the dataset. + if totalSize >= allowedDagInfoSize { + // TODO(dlong): Instead of merely rejecting, return a message about why. + *out = false + return nil + } + + *out = true + return nil +} + +// TODO(dlong): Switch to actual dag.Info constructor when it becomes available. +func newDagInfoTmp(node *p2p.QriNode, path string) (*dagInfoTmp, error) { + return &dagInfoTmp{Sizes: []uint64{10, 20, 30}}, nil +} + +type dagInfoTmp struct { + Sizes []uint64 +}