Skip to content

Commit

Permalink
feat(api.ServeRPC): serve core methods over RPC
Browse files Browse the repository at this point in the history
this commit builds upon intial work to make all core methods available
as RPC. this isn't finished as of yet, given that all in and out params
will need to conform to encoding/gob specs, which basically boils down
to using only Exported types. I think this is a good opportunity to
have *all* params to core methods be declared in the core package
itself (these could be aliases to underlying types if needed).

TODO:
 * finish implementation
 * write test suite that works core methods over RPC
  • Loading branch information
b5 committed Dec 11, 2017
1 parent 351c8dd commit d4778fe
Show file tree
Hide file tree
Showing 16 changed files with 299 additions and 40 deletions.
2 changes: 1 addition & 1 deletion api/handlers/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type HistoryHandlers struct {
}

func NewHistoryHandlers(log logging.Logger, r repo.Repo) *HistoryHandlers {
req := core.NewHistoryRequests(r)
req := core.NewHistoryRequests(r, nil)
h := HistoryHandlers{*req, log}
return &h
}
Expand Down
2 changes: 1 addition & 1 deletion api/handlers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func NewPeerHandlers(log logging.Logger, r repo.Repo, node *p2p.QriNode) *PeerHandlers {
req := core.NewPeerRequests(r, node)
req := core.NewPeerRequests(node, nil)
h := PeerHandlers{*req, log}
return &h
}
Expand Down
8 changes: 4 additions & 4 deletions api/handlers/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ProfileHandlers struct {
}

func NewProfileHandlers(log logging.Logger, r repo.Repo) *ProfileHandlers {
req := core.NewProfileRequests(r)
req := core.NewProfileRequests(r, nil)
h := ProfileHandlers{*req, log}
return &h
}
Expand All @@ -38,7 +38,7 @@ func (h *ProfileHandlers) ProfileHandler(w http.ResponseWriter, r *http.Request)

func (h *ProfileHandlers) getProfileHandler(w http.ResponseWriter, r *http.Request) {
args := true
res := &profile.Profile{}
res := &core.Profile{}
if err := h.GetProfile(&args, res); err != nil {
h.log.Infof("error getting profile: %s", err.Error())
util.WriteErrResponse(w, http.StatusInternalServerError, err)
Expand All @@ -49,12 +49,12 @@ func (h *ProfileHandlers) getProfileHandler(w http.ResponseWriter, r *http.Reque
}

func (h *ProfileHandlers) saveProfileHandler(w http.ResponseWriter, r *http.Request) {
p := &profile.Profile{}
p := &core.Profile{}
if err := json.NewDecoder(r.Body).Decode(p); err != nil {
util.WriteErrResponse(w, http.StatusBadRequest, err)
return
}
res := &profile.Profile{}
res := &core.Profile{}
if err := h.SaveProfile(p, res); err != nil {
util.WriteErrResponse(w, http.StatusInternalServerError, err)
return
Expand Down
2 changes: 1 addition & 1 deletion api/handlers/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func NewQueryHandlers(log logging.Logger, r repo.Repo) *QueryHandlers {
req := core.NewQueryRequests(r)
req := core.NewQueryRequests(r, nil)
return &QueryHandlers{*req, log}
}

Expand Down
2 changes: 1 addition & 1 deletion api/handlers/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type SearchHandlers struct {
}

func NewSearchHandlers(log logging.Logger, r repo.Repo) *SearchHandlers {
req := core.NewSearchRequests(r)
req := core.NewSearchRequests(r, nil)
return &SearchHandlers{*req, log}
}

Expand Down
74 changes: 74 additions & 0 deletions cmd/profile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cmd

import (
"encoding/json"
"os"

"github.com/qri-io/qri/core"
"github.com/spf13/cobra"
)

var (
setProfileFilepath string
)

// profileCmd represents the profile command
var profileCmd = &cobra.Command{
Use: "profile",
Short: "show or edit user profile information",
}

var profileGetCmd = &cobra.Command{
Use: "get",
Short: "get profile info",
Run: func(cmd *cobra.Command, args []string) {
r, err := ProfileRequests(false)
ExitIfErr(err)

in := true
res := &core.Profile{}
err = r.GetProfile(&in, res)
ExitIfErr(err)

data, err := json.MarshalIndent(res, "", " ")
ExitIfErr(err)
PrintSuccess(string(data))
},
}

var profileSetCmd = &cobra.Command{
Use: "set",
Short: "add peers to the profile list",
Run: func(cmd *cobra.Command, args []string) {
var (
dataFile *os.File
err error
)

r, err := ProfileRequests(false)
ExitIfErr(err)

dataFile, err = loadFileIfPath(setProfileFilepath)
ExitIfErr(err)

p := &core.Profile{}
err = json.NewDecoder(dataFile).Decode(p)
ExitIfErr(err)

res := &core.Profile{}
err = r.SaveProfile(p, res)
ExitIfErr(err)

data, err := json.MarshalIndent(res, "", " ")
ExitIfErr(err)
PrintSuccess(string(data))
},
}

func init() {
profileSetCmd.Flags().StringVarP(&setProfileFilepath, "file", "f", "", "json file to update profile info")

profileCmd.AddCommand(profileGetCmd)
profileCmd.AddCommand(profileSetCmd)
RootCmd.AddCommand(profileCmd)
}
5 changes: 3 additions & 2 deletions cmd/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ var queriesCmd = &cobra.Command{
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
req := core.NewQueryRequests(GetRepo(false))
req, err := QueryRequests(false)
ExitIfErr(err)
p := core.NewListParams("-created", pageNum, pageSize)

res := []*repo.DatasetRef{}
err := req.List(&p, &res)
err = req.List(&p, &res)
ExitIfErr(err)

for i, q := range res {
Expand Down
24 changes: 24 additions & 0 deletions cmd/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,30 @@ func DatasetRequests(online bool) (*core.DatasetRequests, error) {
return core.NewDatasetRequests(r, cli), nil
}

func QueryRequests(online bool) (*core.QueryRequests, error) {
r, cli, err := RepoOrClient(online)
if err != nil {
return nil, err
}
return core.NewQueryRequests(r, cli), nil
}

func ProfileRequests(online bool) (*core.ProfileRequests, error) {
r, cli, err := RepoOrClient(online)
if err != nil {
return nil, err
}
return core.NewProfileRequests(r, cli), nil
}

func SearchRequests(online bool) (*core.SearchRequests, error) {
r, cli, err := RepoOrClient(online)
if err != nil {
return nil, err
}
return core.NewSearchRequests(r, cli), nil
}

// RepoOrClient returns either a
func RepoOrClient(online bool) (repo.Repo, *rpc.Client, error) {
if fs, err := ipfs.NewFilestore(func(cfg *ipfs.StoreCfg) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ var runCmd = &cobra.Command{
ErrExit(fmt.Errorf("Please provide a query string to execute"))
}

r := GetRepo(false)
req := core.NewQueryRequests(r)
req, err := QueryRequests(false)
ExitIfErr(err)

format, err := dataset.ParseDataFormatString(cmd.Flag("format").Value.String())
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ var searchCmd = &cobra.Command{
ErrExit(fmt.Errorf("wrong number of arguments. expected qri search [query]"))
}

req := core.NewSearchRequests(GetRepo(false))
req, err := SearchRequests(false)
ExitIfErr(err)

if searchCmdReindex {
PrintInfo("building index...")
Expand All @@ -44,7 +45,7 @@ var searchCmd = &cobra.Command{
}
res := []*repo.DatasetRef{}

err := req.Search(p, &res)
err = req.Search(p, &res)
ExitIfErr(err)

outformat := cmd.Flag("format").Value.String()
Expand Down
10 changes: 5 additions & 5 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ func Receivers(node *p2p.QriNode) []CoreRequests {
r := node.Repo
return []CoreRequests{
NewDatasetRequests(r, nil),
NewHistoryRequests(r),
NewPeerRequests(r, node),
NewProfileRequests(r),
NewQueryRequests(r),
NewSearchRequests(r),
NewHistoryRequests(r, nil),
NewPeerRequests(node, nil),
NewProfileRequests(r, nil),
NewQueryRequests(r, nil),
NewSearchRequests(r, nil),
}
}

Expand Down
12 changes: 11 additions & 1 deletion core/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"fmt"
"net/rpc"

"github.com/ipfs/go-datastore"
"github.com/qri-io/dataset/dsfs"
Expand All @@ -10,11 +11,16 @@ import (

type HistoryRequests struct {
repo repo.Repo
cli *rpc.Client
}

func (d HistoryRequests) CoreRequestsName() string { return "history" }

func NewHistoryRequests(r repo.Repo) *HistoryRequests {
func NewHistoryRequests(r repo.Repo, cli *rpc.Client) *HistoryRequests {
if r != nil && cli != nil {
panic(fmt.Errorf("both repo and client supplied to NewHistoryRequests"))
}

return &HistoryRequests{
repo: r,
}
Expand All @@ -26,6 +32,10 @@ type LogParams struct {
}

func (d *HistoryRequests) Log(params *LogParams, res *[]*repo.DatasetRef) (err error) {
if d.cli != nil {
return d.cli.Call("HistoryRequests.Log", params, res)
}

log := []*repo.DatasetRef{}
limit := params.Limit
ref := &repo.DatasetRef{Path: params.Path}
Expand Down
48 changes: 37 additions & 11 deletions core/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"encoding/json"
"fmt"
"net/rpc"

"github.com/ipfs/go-datastore/query"
"github.com/qri-io/qri/p2p"
Expand All @@ -12,30 +13,39 @@ import (
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
)

func NewPeerRequests(r repo.Repo, node *p2p.QriNode) *PeerRequests {
type PeerRequests struct {
qriNode *p2p.QriNode
cli *rpc.Client
}

func NewPeerRequests(node *p2p.QriNode, cli *rpc.Client) *PeerRequests {
if node != nil && cli != nil {
panic(fmt.Errorf("both node and client supplied to NewPeerRequests"))
}

return &PeerRequests{
repo: r,
qriNode: node,
cli: cli,
}
}

type PeerRequests struct {
repo repo.Repo
qriNode *p2p.QriNode
}

func (d PeerRequests) CoreRequestsName() string { return "peers" }

func (d *PeerRequests) List(p *ListParams, res *[]*profile.Profile) error {
if d.cli != nil {
return d.cli.Call("PeerRequests.List", p, res)
}

r := d.qriNode.Repo
replies := make([]*profile.Profile, p.Limit)
i := 0

user, err := d.repo.Profile()
user, err := r.Profile()
if err != nil {
return err
}

ps, err := repo.QueryPeers(d.repo.Peers(), query.Query{})
ps, err := repo.QueryPeers(r.Peers(), query.Query{})
if err != nil {
return fmt.Errorf("error querying peers: %s", err.Error())
}
Expand All @@ -56,16 +66,24 @@ func (d *PeerRequests) List(p *ListParams, res *[]*profile.Profile) error {
}

func (d *PeerRequests) ConnectedPeers(limit *int, peers *[]string) error {
if d.cli != nil {
return d.cli.Call("PeerRequests.ConnectedPeers", limit, peers)
}

*peers = d.qriNode.ConnectedPeers()
return nil
}

func (d *PeerRequests) ConnectToPeer(pid *peer.ID, res *profile.Profile) error {
if d.cli != nil {
return d.cli.Call("PeerRequests.ConnectToPeer", pid, res)
}

if err := d.qriNode.ConnectToPeer(*pid); err != nil {
return fmt.Errorf("error connecting to peer: %s", err.Error())
}

profile, err := d.repo.Peers().GetPeer(*pid)
profile, err := d.qriNode.Repo.Peers().GetPeer(*pid)
if err != nil {
return fmt.Errorf("error getting peer profile: %s", err.Error())
}
Expand All @@ -75,6 +93,10 @@ func (d *PeerRequests) ConnectToPeer(pid *peer.ID, res *profile.Profile) error {
}

func (d *PeerRequests) Get(p *GetParams, res *profile.Profile) error {
if d.cli != nil {
return d.cli.Call("PeerRequests.Get", p, res)
}

// TODO - restore
// peers, err := d.repo.Peers()
// if err != nil {
Expand Down Expand Up @@ -104,12 +126,16 @@ type NamespaceParams struct {
}

func (d *PeerRequests) GetNamespace(p *NamespaceParams, res *[]*repo.DatasetRef) error {
if d.cli != nil {
return d.cli.Call("PeerRequests.GetNamespace", p, res)
}

id, err := peer.IDB58Decode(p.PeerId)
if err != nil {
return fmt.Errorf("error decoding peer Id: %s", err.Error())
}

profile, err := d.repo.Peers().GetPeer(id)
profile, err := d.qriNode.Repo.Peers().GetPeer(id)
if err != nil || profile == nil {
return err
}
Expand Down
Loading

0 comments on commit d4778fe

Please sign in to comment.