diff --git a/api/config.go b/api/config.go index 7bf5b6ec8..b66e07b2c 100644 --- a/api/config.go +++ b/api/config.go @@ -11,14 +11,17 @@ const ( DEVELOP_MODE = "develop" PRODUCTION_MODE = "production" TEST_MODE = "test" + DefaultPort = "2503" + DefaultRPCPort = "2504" ) func DefaultConfig() *Config { return &Config{ - Logger: logging.DefaultLogger, - Mode: "develop", - Port: "8080", - Online: true, + Logger: logging.DefaultLogger, + Mode: "develop", + Port: DefaultPort, + RPCPort: DefaultRPCPort, + Online: true, } } @@ -38,6 +41,8 @@ type Config struct { Mode string // port to listen on, will be read from PORT env variable if present. Port string + // port to listen for RPC calls on, if empty server will not register a RPC listener + RPCPort string // root url for service UrlRoot string // DNS service discovery. Should be either "env" or "dns", default is env @@ -67,7 +72,7 @@ type Config struct { func (cfg *Config) Validate() (err error) { // make sure port is set if cfg.Port == "" { - cfg.Port = "8080" + cfg.Port = DefaultPort } err = requireConfigStrings(map[string]string{ diff --git a/api/handlers/datasets.go b/api/handlers/datasets.go index 2e69019b5..859f7e035 100644 --- a/api/handlers/datasets.go +++ b/api/handlers/datasets.go @@ -17,7 +17,7 @@ import ( ) func NewDatasetHandlers(log logging.Logger, r repo.Repo) *DatasetHandlers { - req := core.NewDatasetRequests(r) + req := core.NewDatasetRequests(r, nil) h := DatasetHandlers{*req, log, r} return &h } diff --git a/api/server.go b/api/server.go index 0439c934d..cef80e4fb 100644 --- a/api/server.go +++ b/api/server.go @@ -2,10 +2,13 @@ package api import ( "fmt" + "net" "net/http" + "net/rpc" "github.com/datatogether/api/apiutil" "github.com/qri-io/qri/api/handlers" + "github.com/qri-io/qri/core" "github.com/qri-io/qri/logging" "github.com/qri-io/qri/p2p" "github.com/qri-io/qri/repo" @@ -71,10 +74,35 @@ func (s *Server) Serve() (err error) { server := &http.Server{} server.Handler = NewServerRoutes(s) s.log.Infof("starting api server on port %s", s.cfg.Port) + go s.ServeRPC() // http.ListenAndServe will not return unless there's an error return StartServer(s.cfg, server) } +// ServeRPC checks for a configured RPC port, and registers a listner if so +func (s *Server) ServeRPC() { + if s.cfg.RPCPort == "" { + return + } + + listener, err := net.Listen("tcp", fmt.Sprintf(":%s", s.cfg.RPCPort)) + if err != nil { + s.log.Infof("RPC listen on port %s error: %s", s.cfg.RPCPort, err) + return + } + + for _, rcvr := range core.Receivers(s.qriNode) { + if err := rpc.Register(rcvr); err != nil { + s.log.Infof("error registering RPC receiver %s: %s", rcvr.CoreRequestsName(), err.Error()) + return + } + } + + s.log.Infof("accepting RPC requests on port %s", s.cfg.RPCPort) + rpc.Accept(listener) + return +} + // NewServerRoutes returns a Muxer that has all API routes func NewServerRoutes(s *Server) *http.ServeMux { m := http.NewServeMux() diff --git a/cmd/add.go b/cmd/add.go index cf16eebb0..28d846a21 100644 --- a/cmd/add.go +++ b/cmd/add.go @@ -34,7 +34,7 @@ var datasetAddCmd = &cobra.Command{ ErrExit(fmt.Errorf("please provide a --name")) } - req := core.NewDatasetRequests(GetRepo(false)) + req := core.NewDatasetRequests(RepoOrClient(false)) root := strings.TrimSuffix(args[0], "/"+dsfs.PackageFileDataset.String()) p := &core.AddParams{ @@ -69,7 +69,7 @@ func initDataset() { metaFile, err = loadFileIfPath(addDsMetaFilepath) ExitIfErr(err) - req := core.NewDatasetRequests(GetRepo(false)) + req := core.NewDatasetRequests(RepoOrClient(false)) p := &core.InitDatasetParams{ Name: addDsName, diff --git a/cmd/export.go b/cmd/export.go index 5c7d4587d..45108ac98 100644 --- a/cmd/export.go +++ b/cmd/export.go @@ -30,7 +30,7 @@ var exportCmd = &cobra.Command{ } r := GetRepo(false) - req := core.NewDatasetRequests(r) + req := core.NewDatasetRequests(r, nil) p := &core.GetDatasetParams{ Name: args[0], diff --git a/cmd/info.go b/cmd/info.go index 1a35cf346..b1e0967b6 100644 --- a/cmd/info.go +++ b/cmd/info.go @@ -34,7 +34,7 @@ var infoCmd = &cobra.Command{ } } - req := core.NewDatasetRequests(GetRepo(false)) + req := core.NewDatasetRequests(RepoOrClient(false)) for i, arg := range args { rt, ref := dsfs.RefType(arg) diff --git a/cmd/init-ipfs.go b/cmd/init-ipfs.go index 8ca6f3b2f..d3d96cce7 100644 --- a/cmd/init-ipfs.go +++ b/cmd/init-ipfs.go @@ -1,6 +1,7 @@ package cmd import ( + "github.com/ipfs/go-datastore" ipfs "github.com/qri-io/cafs/ipfs" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -10,13 +11,20 @@ var ( initIpfsConfigFile string ) +// defaultDatasets is a hard-coded dataset added when a new qri repo is created +// this hash must always be available +var defaultDatasets = []datastore.Key{ + // fivethirtyeight comic characters + datastore.NewKey("/ipfs/QmcqkHFA2LujZxY38dYZKmxsUstN4unk95azBjwEhwrnM6"), +} + // initCmd represents the init command var initIpfsCmd = &cobra.Command{ Use: "init-ipfs", Short: "Initialize an ipfs repository", Long: ``, Run: func(cmd *cobra.Command, args []string) { - err := ipfs.InitRepo(viper.GetString(IpfsFsPath), initIpfsConfigFile) + err := ipfs.InitRepo(viper.GetString(IpfsFsPath), initIpfsConfigFile, defaultDatasets) ExitIfErr(err) }, } diff --git a/cmd/remove.go b/cmd/remove.go index 4bf8b6e9e..2612aa7e6 100644 --- a/cmd/remove.go +++ b/cmd/remove.go @@ -19,7 +19,7 @@ var datasetRemoveCmd = &cobra.Command{ ErrExit(fmt.Errorf("please specify a dataset path or name to get the info of")) } - req := core.NewDatasetRequests(GetRepo(false)) + req := core.NewDatasetRequests(RepoOrClient(false)) for _, arg := range args { rt, ref := dsfs.RefType(arg) diff --git a/cmd/rename.go b/cmd/rename.go index 1bcfa8f83..9a2a05646 100644 --- a/cmd/rename.go +++ b/cmd/rename.go @@ -18,7 +18,7 @@ var datasetRenameCmd = &cobra.Command{ ErrExit(fmt.Errorf("please provide current & new dataset names")) } - req := core.NewDatasetRequests(GetRepo(false)) + req := core.NewDatasetRequests(RepoOrClient(false)) p := &core.RenameParams{ Current: args[0], New: args[1], diff --git a/cmd/repo.go b/cmd/repo.go index 5d9a5dae5..fc3df164b 100644 --- a/cmd/repo.go +++ b/cmd/repo.go @@ -1,6 +1,10 @@ package cmd import ( + "net" + "net/rpc" + "strings" + ipfs "github.com/qri-io/cafs/ipfs" "github.com/qri-io/qri/repo" "github.com/qri-io/qri/repo/fs" @@ -25,6 +29,33 @@ func GetRepo(online bool) repo.Repo { return r } +// RepoOrClient returns either a +func RepoOrClient(online bool) (repo.Repo, *rpc.Client) { + if fs, err := ipfs.NewFilestore(func(cfg *ipfs.StoreCfg) { + cfg.FsRepoPath = viper.GetString(IpfsFsPath) + cfg.Online = online + }); err == nil { + id := "" + if fs.Node().PeerHost != nil { + id = fs.Node().PeerHost.ID().Pretty() + } + + r, err := fs_repo.NewRepo(fs, viper.GetString(QriRepoPath), id) + ExitIfErr(err) + return r, nil + } else if strings.Contains(err.Error(), "lock") { + // TODO - bad bad hardcode + conn, err := net.Dial("tcp", ":2504") + if err != nil { + ErrExit(err) + } + return nil, rpc.NewClient(conn) + } else { + ErrExit(err) + } + return nil, nil +} + func GetIpfsFilestore(online bool) *ipfs.Filestore { fs, err := ipfs.NewFilestore(func(cfg *ipfs.StoreCfg) { cfg.FsRepoPath = viper.GetString(IpfsFsPath) diff --git a/cmd/run.go b/cmd/run.go index 09cdc11d1..1b14ef74f 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -59,7 +59,13 @@ var runCmd = &cobra.Command{ results, err := ioutil.ReadAll(f) ExitIfErr(err) - PrintResults(res.Dataset.Structure, results, res.Dataset.Structure.Format) + switch cmd.Flag("format").Value.String() { + case "csv", "json": + fmt.Printf("%s", string(results)) + default: + PrintResults(res.Dataset.Structure, results, res.Dataset.Structure.Format) + } + }, } @@ -67,6 +73,6 @@ func init() { RootCmd.AddCommand(runCmd) // runCmd.Flags().StringP("save", "s", "", "save the resulting dataset to a given address") runCmd.Flags().StringP("output", "o", "", "file to write to") - runCmd.Flags().StringP("format", "f", "csv", "set output format [csv,json]") + runCmd.Flags().StringP("format", "f", "", "set output format [csv,json]") runCmd.Flags().StringVarP(&runCmdName, "name", "n", "", "save output to name") } diff --git a/cmd/server.go b/cmd/server.go index 6ca60fe05..4cc756948 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -67,9 +67,9 @@ var serverCmd = &cobra.Command{ } func init() { - serverCmd.Flags().StringVarP(&serverCmdPort, "port", "p", "3000", "port to start server on") + serverCmd.Flags().StringVarP(&serverCmdPort, "port", "p", api.DefaultPort, "port to start server on") serverCmd.Flags().BoolVarP(&serverInitIpfs, "init-ipfs", "", false, "initialize a new default ipfs repo if empty") - serverCmd.Flags().BoolVarP(&serverMemOnly, "mem-only", "", false, "run qri entirely in-memory") + serverCmd.Flags().BoolVarP(&serverMemOnly, "mem-only", "", false, "run qri entirely in-memory, persisting nothing") serverCmd.Flags().BoolVarP(&serverOffline, "offline", "", false, "disable networking") RootCmd.AddCommand(serverCmd) } @@ -80,7 +80,9 @@ func initRepoIfEmpty(repoPath, configPath string) error { if err := os.MkdirAll(repoPath, os.ModePerm); err != nil { return err } - return ipfs.InitRepo(repoPath, configPath) + if err := ipfs.InitRepo(repoPath, configPath, defaultDatasets); err != nil { + return err + } } } return nil diff --git a/cmd/update.go b/cmd/update.go index cad143e15..1c823807e 100644 --- a/cmd/update.go +++ b/cmd/update.go @@ -46,7 +46,7 @@ var updateCmd = &cobra.Command{ ErrExit(fmt.Errorf("either a metadata or data option is required")) } - req := core.NewDatasetRequests(GetRepo(false)) + req := core.NewDatasetRequests(RepoOrClient(false)) p := &core.GetDatasetParams{ Name: args[0], diff --git a/cmd/validate.go b/cmd/validate.go index 46eb633a5..b9dcd9bb3 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -29,7 +29,7 @@ and check each of it's rows against the constraints listed in the dataset's fields.`, Run: func(cmd *cobra.Command, args []string) { if len(args) > 0 { - req := core.NewDatasetRequests(GetRepo(false)) + req := core.NewDatasetRequests(RepoOrClient(false)) for _, arg := range args { rt, ref := dsfs.RefType(arg) p := &core.ValidateDatasetParams{} @@ -78,7 +78,7 @@ func validateDataset() { metaFile, err = loadFileIfPath(validateDsMetaFilepath) ExitIfErr(err) - req := core.NewDatasetRequests(GetRepo(false)) + req := core.NewDatasetRequests(RepoOrClient(false)) p := &core.ValidateDatasetParams{ Name: validateDsName, diff --git a/core/core.go b/core/core.go new file mode 100644 index 000000000..8702feecb --- /dev/null +++ b/core/core.go @@ -0,0 +1,34 @@ +package core + +import ( + "github.com/qri-io/qri/p2p" +) + +// CoreRequests defines a set of core methods +type CoreRequests interface { + // CoreRequestsName confirms participation in the CoreRequests interface while + // also giving a human readable string for logging purposes + CoreRequestsName() string +} + +// Requests returns a slice of CoreRequests that defines the full local +// API of core methods +func Receivers(node *p2p.QriNode) []CoreRequests { + r := node.Repo + return []CoreRequests{ + NewDatasetRequests(r, nil), + NewHistoryRequests(r), + NewPeerRequests(r, node), + NewProfileRequests(r), + NewQueryRequests(r), + NewSearchRequests(r), + } +} + +// func RemoteClient(addr string) (*rpc.Client, error) { +// conn, err := net.Dial("tcp", addr) +// if err != nil { +// return nil, fmt.Errorf("dial error: %s", err) +// } +// return rpc.NewClient(conn), nil +// } diff --git a/core/datasets.go b/core/datasets.go index 114370ba0..8933d474c 100644 --- a/core/datasets.go +++ b/core/datasets.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "net/http" + "net/rpc" "path/filepath" "strings" "time" @@ -25,15 +26,27 @@ import ( type DatasetRequests struct { repo repo.Repo + cli *rpc.Client } -func NewDatasetRequests(r repo.Repo) *DatasetRequests { +func (d DatasetRequests) CoreRequestsName() string { return "datasets" } + +func NewDatasetRequests(r repo.Repo, cli *rpc.Client) *DatasetRequests { + if r != nil && cli != nil { + panic(fmt.Errorf("both repo and client supplied to NewDatasetRequests")) + } + return &DatasetRequests{ repo: r, + cli: cli, } } func (d *DatasetRequests) List(p *ListParams, res *[]*repo.DatasetRef) error { + if d.cli != nil { + return d.cli.Call("DatasetRequests.List", p, res) + } + store := d.repo.Store() // ensure valid limit value if p.Limit <= 0 { @@ -76,6 +89,10 @@ type GetDatasetParams struct { } func (d *DatasetRequests) Get(p *GetDatasetParams, res *repo.DatasetRef) error { + if d.cli != nil { + return d.cli.Call("DatasetRequests.Get", p, res) + } + store := d.repo.Store() ds, err := dsfs.LoadDataset(store, p.Path) if err != nil { @@ -109,6 +126,10 @@ type InitDatasetParams struct { // InitDataset creates a new qri dataset from a source of data func (r *DatasetRequests) InitDataset(p *InitDatasetParams, res *repo.DatasetRef) error { + if r.cli != nil { + return r.cli.Call("DatasetRequests.InitDataset", p, res) + } + var ( rdr io.Reader store = r.repo.Store() @@ -245,6 +266,10 @@ type UpdateParams struct { // Update adds a history entry, updating a dataset func (r *DatasetRequests) Update(p *UpdateParams, res *repo.DatasetRef) (err error) { + if r.cli != nil { + return r.cli.Call("DatasetRequests.Update", p, res) + } + var ( name string prevpath datastore.Key @@ -331,6 +356,10 @@ type RenameParams struct { } func (r *DatasetRequests) Rename(p *RenameParams, res *repo.DatasetRef) (err error) { + if r.cli != nil { + return r.cli.Call("DatasetRequests.Rename", p, res) + } + if p.Current == "" { return fmt.Errorf("current name is required to rename a dataset") } @@ -373,6 +402,10 @@ type DeleteParams struct { } func (r *DatasetRequests) Delete(p *DeleteParams, ok *bool) (err error) { + if r.cli != nil { + return r.cli.Call("DatasetRequests.List", p, ok) + } + if p.Name == "" && p.Path.String() == "" { return fmt.Errorf("either name or path is required") } @@ -418,6 +451,10 @@ type StructuredData struct { } func (r *DatasetRequests) StructuredData(p *StructuredDataParams, data *StructuredData) (err error) { + if r.cli != nil { + return r.cli.Call("DatasetRequests.StructuredData", p, data) + } + var ( file cafs.File d []byte @@ -480,6 +517,10 @@ type AddParams struct { } func (r *DatasetRequests) AddDataset(p *AddParams, res *repo.DatasetRef) (err error) { + if r.cli != nil { + return r.cli.Call("DatasetRequests.AddDataset", p, res) + } + fs, ok := r.repo.Store().(*ipfs.Filestore) if !ok { return fmt.Errorf("can only add datasets when running an IPFS filestore") @@ -526,6 +567,10 @@ type ValidateDatasetParams struct { } func (r *DatasetRequests) Validate(p *ValidateDatasetParams, errors *dataset.Dataset) (err error) { + if r.cli != nil { + return r.cli.Call("DatasetRequests.Validate", p, errors) + } + // store := Store(cmd, args) // errs, err := history.Validate(store) // ExitIfErr(err) diff --git a/core/datasets_test.go b/core/datasets_test.go index 821d3c445..3c21957c2 100644 --- a/core/datasets_test.go +++ b/core/datasets_test.go @@ -49,7 +49,7 @@ func TestDatasetRequestsInit(t *testing.T) { return } - req := NewDatasetRequests(mr) + req := NewDatasetRequests(mr, nil) for i, c := range cases { got := &repo.DatasetRef{} err := req.InitDataset(c.p, got) @@ -103,7 +103,7 @@ func TestDatasetRequestsList(t *testing.T) { // TODO: re-enable {&ListParams{OrderBy: "name", Limit: 30, Offset: 0}, []*repo.DatasetRef{cities, counter, movies}, ""}, } - req := NewDatasetRequests(mr) + req := NewDatasetRequests(mr, nil) for i, c := range cases { got := []*repo.DatasetRef{} err := req.List(c.p, &got) @@ -156,7 +156,7 @@ func TestDatasetRequestsGet(t *testing.T) { {&GetDatasetParams{Path: path, Name: "cats", Hash: "123"}, moviesDs, ""}, } - req := NewDatasetRequests(mr) + req := NewDatasetRequests(mr, nil) for i, c := range cases { got := &repo.DatasetRef{} err := req.Get(c.p, got) @@ -198,7 +198,7 @@ func TestDatasetRequestsUpdate(t *testing.T) { // {&UpdateParams{Path: path, Name: "cats", Hash: "123"}, moviesDs, ""}, } - req := NewDatasetRequests(mr) + req := NewDatasetRequests(mr, nil) for i, c := range cases { got := &repo.DatasetRef{} err := req.Update(c.p, got) @@ -230,7 +230,7 @@ func TestDatasetRequestsRename(t *testing.T) { {&RenameParams{Current: "new_movies", New: "new_movies"}, "", "name 'new_movies' already exists"}, } - req := NewDatasetRequests(mr) + req := NewDatasetRequests(mr, nil) for i, c := range cases { got := &repo.DatasetRef{} err := req.Rename(c.p, got) @@ -269,7 +269,7 @@ func TestDatasetRequestsDelete(t *testing.T) { {&DeleteParams{Path: path}, nil, ""}, } - req := NewDatasetRequests(mr) + req := NewDatasetRequests(mr, nil) for i, c := range cases { got := false err := req.Delete(c.p, &got) @@ -313,7 +313,7 @@ func TestDatasetRequestsStructuredData(t *testing.T) { {&StructuredDataParams{Format: dataset.JSONDataFormat, Path: archivePath, Limit: 0, Offset: 0, All: true}, 0, ""}, } - req := NewDatasetRequests(mr) + req := NewDatasetRequests(mr, nil) for i, c := range cases { got := &StructuredData{} err := req.StructuredData(c.p, got) @@ -361,7 +361,7 @@ func TestDatasetRequestsAddDataset(t *testing.T) { return } - req := NewDatasetRequests(mr) + req := NewDatasetRequests(mr, nil) for i, c := range cases { got := &repo.DatasetRef{} err := req.AddDataset(c.p, got) diff --git a/core/history.go b/core/history.go index 5f760b401..2d5eef335 100644 --- a/core/history.go +++ b/core/history.go @@ -12,6 +12,8 @@ type HistoryRequests struct { repo repo.Repo } +func (d HistoryRequests) CoreRequestsName() string { return "history" } + func NewHistoryRequests(r repo.Repo) *HistoryRequests { return &HistoryRequests{ repo: r, diff --git a/core/peers.go b/core/peers.go index 9562f0726..0f8ccad62 100644 --- a/core/peers.go +++ b/core/peers.go @@ -24,6 +24,8 @@ type PeerRequests struct { qriNode *p2p.QriNode } +func (d PeerRequests) CoreRequestsName() string { return "peers" } + func (d *PeerRequests) List(p *ListParams, res *[]*profile.Profile) error { replies := make([]*profile.Profile, p.Limit) i := 0 diff --git a/core/profile.go b/core/profile.go index 3f6eae004..92dc58aca 100644 --- a/core/profile.go +++ b/core/profile.go @@ -15,6 +15,8 @@ type ProfileRequests struct { repo repo.Repo } +func (d ProfileRequests) CoreRequestsName() string { return "profile" } + func NewProfileRequests(r repo.Repo) *ProfileRequests { return &ProfileRequests{ repo: r, diff --git a/core/queries.go b/core/queries.go index f350ff390..f99359413 100644 --- a/core/queries.go +++ b/core/queries.go @@ -16,6 +16,8 @@ type QueryRequests struct { repo repo.Repo } +func (d QueryRequests) CoreRequestsName() string { return "queries" } + func NewQueryRequests(r repo.Repo) *QueryRequests { return &QueryRequests{ repo: r, @@ -215,7 +217,7 @@ func (r *QueryRequests) Run(p *RunParams, res *repo.DatasetRef) error { if err := dsfs.DerefDatasetTransform(store, ds); err != nil { return fmt.Errorf("error dereferencing dataset query: %s", err.Error()) } - fmt.Println(ds.AbstractTransform.Path().String()) + // fmt.Println(ds.AbstractTransform.Path().String()) ref := &repo.DatasetRef{Name: p.SaveName, Path: dspath, Dataset: ds} diff --git a/core/search.go b/core/search.go index 77e097be9..3954a329e 100644 --- a/core/search.go +++ b/core/search.go @@ -14,6 +14,8 @@ type SearchRequests struct { // node *p2p.QriNode } +func (d SearchRequests) CoreRequestsName() string { return "search" } + func NewSearchRequests(r repo.Repo) *SearchRequests { return &SearchRequests{ repo: r,