Skip to content

Commit

Permalink
feat(RPC): change default port, provide RPC listener
Browse files Browse the repository at this point in the history
so we've been planning to do this for a while, but a proper
reason to do so hadn't come up until we needed to be able to
use the CLI while a server is running (while ssh'd into a container).

This commit is a spike that only supports DatasetRequests for now
while I work out the details of this pattern.

also, closes #163
  • Loading branch information
b5 committed Dec 8, 2017
1 parent 83e2fd3 commit 92e42ae
Show file tree
Hide file tree
Showing 22 changed files with 200 additions and 31 deletions.
15 changes: 10 additions & 5 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ const (
DEVELOP_MODE = "develop"
PRODUCTION_MODE = "production"
TEST_MODE = "test"
DefaultPort = "2503"
DefaultRPCPort = "2504"
)

func DefaultConfig() *Config {
return &Config{
Logger: logging.DefaultLogger,
Mode: "develop",
Port: "8080",
Online: true,
Logger: logging.DefaultLogger,
Mode: "develop",
Port: DefaultPort,
RPCPort: DefaultRPCPort,
Online: true,
}
}

Expand All @@ -38,6 +41,8 @@ type Config struct {
Mode string
// port to listen on, will be read from PORT env variable if present.
Port string
// port to listen for RPC calls on, if empty server will not register a RPC listener
RPCPort string
// root url for service
UrlRoot string
// DNS service discovery. Should be either "env" or "dns", default is env
Expand Down Expand Up @@ -67,7 +72,7 @@ type Config struct {
func (cfg *Config) Validate() (err error) {
// make sure port is set
if cfg.Port == "" {
cfg.Port = "8080"
cfg.Port = DefaultPort
}

err = requireConfigStrings(map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion api/handlers/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func NewDatasetHandlers(log logging.Logger, r repo.Repo) *DatasetHandlers {
req := core.NewDatasetRequests(r)
req := core.NewDatasetRequests(r, nil)
h := DatasetHandlers{*req, log, r}
return &h
}
Expand Down
28 changes: 28 additions & 0 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package api

import (
"fmt"
"net"
"net/http"
"net/rpc"

"github.com/datatogether/api/apiutil"
"github.com/qri-io/qri/api/handlers"
"github.com/qri-io/qri/core"
"github.com/qri-io/qri/logging"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/repo"
Expand Down Expand Up @@ -71,10 +74,35 @@ func (s *Server) Serve() (err error) {
server := &http.Server{}
server.Handler = NewServerRoutes(s)
s.log.Infof("starting api server on port %s", s.cfg.Port)
go s.ServeRPC()
// http.ListenAndServe will not return unless there's an error
return StartServer(s.cfg, server)
}

// ServeRPC checks for a configured RPC port, and registers a listner if so
func (s *Server) ServeRPC() {
if s.cfg.RPCPort == "" {
return
}

listener, err := net.Listen("tcp", fmt.Sprintf(":%s", s.cfg.RPCPort))
if err != nil {
s.log.Infof("RPC listen on port %s error: %s", s.cfg.RPCPort, err)
return
}

for _, rcvr := range core.Receivers(s.qriNode) {
if err := rpc.Register(rcvr); err != nil {
s.log.Infof("error registering RPC receiver %s: %s", rcvr.CoreRequestsName(), err.Error())
return
}
}

s.log.Infof("accepting RPC requests on port %s", s.cfg.RPCPort)
rpc.Accept(listener)
return
}

// NewServerRoutes returns a Muxer that has all API routes
func NewServerRoutes(s *Server) *http.ServeMux {
m := http.NewServeMux()
Expand Down
4 changes: 2 additions & 2 deletions cmd/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var datasetAddCmd = &cobra.Command{
ErrExit(fmt.Errorf("please provide a --name"))
}

req := core.NewDatasetRequests(GetRepo(false))
req := core.NewDatasetRequests(RepoOrClient(false))

root := strings.TrimSuffix(args[0], "/"+dsfs.PackageFileDataset.String())
p := &core.AddParams{
Expand Down Expand Up @@ -69,7 +69,7 @@ func initDataset() {
metaFile, err = loadFileIfPath(addDsMetaFilepath)
ExitIfErr(err)

req := core.NewDatasetRequests(GetRepo(false))
req := core.NewDatasetRequests(RepoOrClient(false))

p := &core.InitDatasetParams{
Name: addDsName,
Expand Down
2 changes: 1 addition & 1 deletion cmd/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var exportCmd = &cobra.Command{
}

r := GetRepo(false)
req := core.NewDatasetRequests(r)
req := core.NewDatasetRequests(r, nil)

p := &core.GetDatasetParams{
Name: args[0],
Expand Down
2 changes: 1 addition & 1 deletion cmd/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var infoCmd = &cobra.Command{
}
}

req := core.NewDatasetRequests(GetRepo(false))
req := core.NewDatasetRequests(RepoOrClient(false))

for i, arg := range args {
rt, ref := dsfs.RefType(arg)
Expand Down
10 changes: 9 additions & 1 deletion cmd/init-ipfs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"github.com/ipfs/go-datastore"
ipfs "github.com/qri-io/cafs/ipfs"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand All @@ -10,13 +11,20 @@ var (
initIpfsConfigFile string
)

// defaultDatasets is a hard-coded dataset added when a new qri repo is created
// this hash must always be available
var defaultDatasets = []datastore.Key{
// fivethirtyeight comic characters
datastore.NewKey("/ipfs/QmcqkHFA2LujZxY38dYZKmxsUstN4unk95azBjwEhwrnM6"),
}

// initCmd represents the init command
var initIpfsCmd = &cobra.Command{
Use: "init-ipfs",
Short: "Initialize an ipfs repository",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
err := ipfs.InitRepo(viper.GetString(IpfsFsPath), initIpfsConfigFile)
err := ipfs.InitRepo(viper.GetString(IpfsFsPath), initIpfsConfigFile, defaultDatasets)
ExitIfErr(err)
},
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var datasetRemoveCmd = &cobra.Command{
ErrExit(fmt.Errorf("please specify a dataset path or name to get the info of"))
}

req := core.NewDatasetRequests(GetRepo(false))
req := core.NewDatasetRequests(RepoOrClient(false))

for _, arg := range args {
rt, ref := dsfs.RefType(arg)
Expand Down
2 changes: 1 addition & 1 deletion cmd/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var datasetRenameCmd = &cobra.Command{
ErrExit(fmt.Errorf("please provide current & new dataset names"))
}

req := core.NewDatasetRequests(GetRepo(false))
req := core.NewDatasetRequests(RepoOrClient(false))
p := &core.RenameParams{
Current: args[0],
New: args[1],
Expand Down
31 changes: 31 additions & 0 deletions cmd/repo.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package cmd

import (
"net"
"net/rpc"
"strings"

ipfs "github.com/qri-io/cafs/ipfs"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/fs"
Expand All @@ -25,6 +29,33 @@ func GetRepo(online bool) repo.Repo {
return r
}

// RepoOrClient returns either a
func RepoOrClient(online bool) (repo.Repo, *rpc.Client) {
if fs, err := ipfs.NewFilestore(func(cfg *ipfs.StoreCfg) {
cfg.FsRepoPath = viper.GetString(IpfsFsPath)
cfg.Online = online
}); err == nil {
id := ""
if fs.Node().PeerHost != nil {
id = fs.Node().PeerHost.ID().Pretty()
}

r, err := fs_repo.NewRepo(fs, viper.GetString(QriRepoPath), id)
ExitIfErr(err)
return r, nil
} else if strings.Contains(err.Error(), "lock") {
// TODO - bad bad hardcode
conn, err := net.Dial("tcp", ":2504")
if err != nil {
ErrExit(err)
}
return nil, rpc.NewClient(conn)
} else {
ErrExit(err)
}
return nil, nil
}

func GetIpfsFilestore(online bool) *ipfs.Filestore {
fs, err := ipfs.NewFilestore(func(cfg *ipfs.StoreCfg) {
cfg.FsRepoPath = viper.GetString(IpfsFsPath)
Expand Down
10 changes: 8 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,20 @@ var runCmd = &cobra.Command{
results, err := ioutil.ReadAll(f)
ExitIfErr(err)

PrintResults(res.Dataset.Structure, results, res.Dataset.Structure.Format)
switch cmd.Flag("format").Value.String() {
case "csv", "json":
fmt.Printf("%s", string(results))
default:
PrintResults(res.Dataset.Structure, results, res.Dataset.Structure.Format)
}

},
}

func init() {
RootCmd.AddCommand(runCmd)
// runCmd.Flags().StringP("save", "s", "", "save the resulting dataset to a given address")
runCmd.Flags().StringP("output", "o", "", "file to write to")
runCmd.Flags().StringP("format", "f", "csv", "set output format [csv,json]")
runCmd.Flags().StringP("format", "f", "", "set output format [csv,json]")
runCmd.Flags().StringVarP(&runCmdName, "name", "n", "", "save output to name")
}
8 changes: 5 additions & 3 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ var serverCmd = &cobra.Command{
}

func init() {
serverCmd.Flags().StringVarP(&serverCmdPort, "port", "p", "3000", "port to start server on")
serverCmd.Flags().StringVarP(&serverCmdPort, "port", "p", api.DefaultPort, "port to start server on")
serverCmd.Flags().BoolVarP(&serverInitIpfs, "init-ipfs", "", false, "initialize a new default ipfs repo if empty")
serverCmd.Flags().BoolVarP(&serverMemOnly, "mem-only", "", false, "run qri entirely in-memory")
serverCmd.Flags().BoolVarP(&serverMemOnly, "mem-only", "", false, "run qri entirely in-memory, persisting nothing")
serverCmd.Flags().BoolVarP(&serverOffline, "offline", "", false, "disable networking")
RootCmd.AddCommand(serverCmd)
}
Expand All @@ -80,7 +80,9 @@ func initRepoIfEmpty(repoPath, configPath string) error {
if err := os.MkdirAll(repoPath, os.ModePerm); err != nil {
return err
}
return ipfs.InitRepo(repoPath, configPath)
if err := ipfs.InitRepo(repoPath, configPath, defaultDatasets); err != nil {
return err
}
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var updateCmd = &cobra.Command{
ErrExit(fmt.Errorf("either a metadata or data option is required"))
}

req := core.NewDatasetRequests(GetRepo(false))
req := core.NewDatasetRequests(RepoOrClient(false))

p := &core.GetDatasetParams{
Name: args[0],
Expand Down
4 changes: 2 additions & 2 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ and check each of it's rows against the constraints listed
in the dataset's fields.`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) > 0 {
req := core.NewDatasetRequests(GetRepo(false))
req := core.NewDatasetRequests(RepoOrClient(false))
for _, arg := range args {
rt, ref := dsfs.RefType(arg)
p := &core.ValidateDatasetParams{}
Expand Down Expand Up @@ -78,7 +78,7 @@ func validateDataset() {
metaFile, err = loadFileIfPath(validateDsMetaFilepath)
ExitIfErr(err)

req := core.NewDatasetRequests(GetRepo(false))
req := core.NewDatasetRequests(RepoOrClient(false))

p := &core.ValidateDatasetParams{
Name: validateDsName,
Expand Down
34 changes: 34 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package core

import (
"github.com/qri-io/qri/p2p"
)

// CoreRequests defines a set of core methods
type CoreRequests interface {
// CoreRequestsName confirms participation in the CoreRequests interface while
// also giving a human readable string for logging purposes
CoreRequestsName() string
}

// Requests returns a slice of CoreRequests that defines the full local
// API of core methods
func Receivers(node *p2p.QriNode) []CoreRequests {
r := node.Repo
return []CoreRequests{
NewDatasetRequests(r, nil),
NewHistoryRequests(r),
NewPeerRequests(r, node),
NewProfileRequests(r),
NewQueryRequests(r),
NewSearchRequests(r),
}
}

// func RemoteClient(addr string) (*rpc.Client, error) {
// conn, err := net.Dial("tcp", addr)
// if err != nil {
// return nil, fmt.Errorf("dial error: %s", err)
// }
// return rpc.NewClient(conn), nil
// }
Loading

0 comments on commit 92e42ae

Please sign in to comment.