Skip to content

Commit

Permalink
feat(lib): sew initial cron service into lib
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed May 9, 2019
1 parent 5786d68 commit 5e8a871
Show file tree
Hide file tree
Showing 12 changed files with 394 additions and 122 deletions.
4 changes: 4 additions & 0 deletions config/testdata/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ webapp:
analyticstoken: ""
entrypointupdateaddress: /ipns/webapp.qri.io
entrypointhash: QmYDkLrvzDpzDzKeLD3okiQUzSL1ksNsfYU6ZwRYYn8ViS
update:
type: fs
daemonize: true
address: "127.0.0.1:2506"
rpc:
enabled: true
port: 2504
Expand Down
28 changes: 18 additions & 10 deletions config/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ import "github.com/qri-io/jsonschema"

// Update configures a Remote Procedure Call (Update) listener
type Update struct {
Daemonize bool `json:"daemonize"`
Port int `json:"port"`
Type string `json:"type"`
Daemonize bool `json:"daemonize"`
Address string `json:"address"`
}

// DefaultUpdatePort is local the port Update serves on by default
var DefaultUpdatePort = 2506
// DefaultUpdateAddress is the local address Update serves on by default
var DefaultUpdateAddress = "127.0.0.1:2506"

// DefaultUpdate creates a new default Update configuration
func DefaultUpdate() *Update {
return &Update{
Type: "fs",
Daemonize: true,
Port: DefaultUpdatePort,
Address: DefaultUpdateAddress,
}
}

Expand All @@ -27,15 +29,20 @@ func (cfg Update) Validate() error {
"title": "Update",
"description": "The Update configuration",
"type": "object",
"required": ["daemonize", "port"],
"required": ["type", "daemonize", "address"],
"properties": {
"type": {
"description": "class of cron store",
"enum": ["mem", "fs"],
"type": "string"
},
"deamonize": {
"description": "When true, the update service starts as a daemonized process",
"type": "boolean"
},
"port": {
"description": "port update service will listen for rpc calls",
"type": "integer"
"address": {
"description": "address service will listen and dial on for inter-process communication",
"type": "string"
}
}
}`)
Expand All @@ -45,8 +52,9 @@ func (cfg Update) Validate() error {
// Copy makes a deep copy of the Update struct
func (cfg *Update) Copy() *Update {
res := &Update{
Type: cfg.Type,
Daemonize: cfg.Daemonize,
Port: cfg.Port,
Address: cfg.Address,
}

return res
Expand Down
35 changes: 32 additions & 3 deletions cron/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strings"

flatbuffers "github.com/google/flatbuffers/go"
"github.com/qri-io/dataset"
Expand All @@ -22,6 +23,34 @@ type HTTPClient struct {
// assert HTTPClient is a Scheduler at compile time
var _ Scheduler = (*HTTPClient)(nil)

// ErrUnreachable defines errors where the server cannot be reached
// TODO (b5): consider moving this to qfs
var ErrUnreachable = fmt.Errorf("cannot establish a connection to the server")

// Ping confirms client can dial the server, if a connection cannot be
// established at all, Ping will return ErrUnreachable, all other errors
// will
func (c HTTPClient) Ping() error {
res, err := http.Get(fmt.Sprintf("http://%s", c.Addr))
if err != nil {
msg := strings.ToLower(err.Error())

// TODO (b5): a number of errors constitute a service being "unreachable",
// we should make a more exhaustive assessment. common errors already covered:
// "connect: Connection refused"
// "dial tcp: lookup [url] no such host"
if strings.Contains(msg, "refused") || strings.Contains(msg, "no such host") {
return ErrUnreachable
}
return err
}

if res.StatusCode == http.StatusOK {
return nil
}
return resError(res)
}

// Jobs lists jobs by querying an HTTP server
func (c HTTPClient) Jobs(ctx context.Context, offset, limit int) ([]*Job, error) {
res, err := http.Get(fmt.Sprintf("http://%s/jobs?offset=%d&limit=&%d", c.Addr, offset, limit))
Expand Down Expand Up @@ -86,7 +115,7 @@ func (c HTTPClient) Unschedule(ctx context.Context, name string) error {
return err
}

return c.resError(res)
return resError(res)
}

func (c HTTPClient) postJob(job *Job) error {
Expand All @@ -100,10 +129,10 @@ func (c HTTPClient) postJob(job *Job) error {
return err
}

return c.resError(res)
return resError(res)
}

func (c HTTPClient) resError(res *http.Response) error {
func resError(res *http.Response) error {
if res.StatusCode == 200 {
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions cron/cron.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,12 @@ table Jobs {
list:[Job];
}

// setting file_identifier adds a "magic number" to bytes 4-7 to use as a
// sanity check for a "Qri FlatBuffer File"
file_identifier "QFBF";

// for our use this is mainly an annotation. this file extension for a
// "qri flatbuffer" file should be .qfb
file_extension "qfb"

root_type Jobs;
2 changes: 0 additions & 2 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (

var (
log = golog.Logger("cron")
// max time represents is a date far in the future
maxTime = time.Date(9999, 12, 31, 23, 59, 59, 9999, time.UTC)
// DefaultCheckInterval is the frequency cron will check all stored jobs
// for scheduled updates without any additional configuration. Qri recommends
// not running updates more than once an hour for performance and storage
Expand Down
72 changes: 0 additions & 72 deletions lib/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,78 +310,6 @@ func (r *DatasetRequests) Save(p *SaveParams, res *repo.DatasetRef) (err error)
return nil
}

// UpdateParams defines parameters for the Update command
// TODO (b5): I think we can merge this into SaveParams
type UpdateParams struct {
Ref string
Title string
Message string
Recall string
Secrets map[string]string
Publish bool
DryRun bool
ReturnBody bool
ShouldRender bool
// optional writer to have transform script record standard output to
// note: this won't work over RPC, only on local calls
ScriptOutput io.Writer
}

// Update advances a dataset to the latest known version from either a peer or by
// re-running a transform in the peer's namespace
func (r *DatasetRequests) Update(p *UpdateParams, res *repo.DatasetRef) error {
if r.cli != nil {
return r.cli.Call("DatasetRequests.Update", p, res)
}

ref, err := repo.ParseDatasetRef(p.Ref)
if err != nil {
return err
}

if err = repo.CanonicalizeDatasetRef(r.node.Repo, &ref); err == repo.ErrNotFound {
return fmt.Errorf("unknown dataset '%s'. please add before updating", ref.AliasString())
} else if err != nil {
return err
}

if !base.InLocalNamespace(r.node.Repo, &ref) {
*res, err = actions.UpdateRemoteDataset(r.node, &ref, true)
return err
}

// default to recalling transfrom scripts for local updates
// TODO (b5): not sure if this should be here or in client libraries
if p.Recall == "" {
p.Recall = "tf"
}

saveParams := &SaveParams{
Dataset: &dataset.Dataset{
Name: ref.Name,
Peername: ref.Peername,
ProfileID: ref.ProfileID.String(),
Path: ref.Path,
Commit: &dataset.Commit{
Title: p.Title,
Message: p.Message,
},
Transform: &dataset.Transform{
Secrets: p.Secrets,
},
},
Recall: p.Recall,
Secrets: p.Secrets,
Publish: p.Publish,
DryRun: p.DryRun,
ReturnBody: p.ReturnBody,
ScriptOutput: p.ScriptOutput,
ShouldRender: p.ShouldRender,
}

return r.Save(saveParams, res)
}

// SetPublishStatusParams encapsulates parameters for setting the publication status of a dataset
type SetPublishStatusParams struct {
Ref string
Expand Down
32 changes: 0 additions & 32 deletions lib/datasets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,38 +220,6 @@ func TestDatasetRequestsSaveZip(t *testing.T) {
t.Fatalf("Expected 'Test Repo', got '%s'", res.Dataset.Meta.Title)
}
}

func TestDatasetRequestsUpdate(t *testing.T) {
node := newTestQriNode(t)

r := NewDatasetRequests(node, nil)
res := &repo.DatasetRef{}
if err := r.Update(&UpdateParams{Ref: "me/bad_dataset"}, res); err == nil {
t.Error("expected update to nonexistent dataset to error")
}

ref := addNowTransformDataset(t, node)
res = &repo.DatasetRef{}
if err := r.Update(&UpdateParams{Ref: ref.AliasString(), Recall: "tf", ReturnBody: true}, res); err != nil {
t.Errorf("update error: %s", err)
}

// run a manual save to lose the transform
err := r.Save(&SaveParams{Dataset: &dataset.Dataset{
Peername: res.Peername,
Name: res.Name,
Meta: &dataset.Meta{Title: "an updated title"},
}}, res)
if err != nil {
t.Error("save failed")
}

// update should grab the transform from 2 commits back
if err := r.Update(&UpdateParams{Ref: res.AliasString(), ReturnBody: true}, res); err != nil {
t.Error(err)
}
}

func TestDatasetRequestsList(t *testing.T) {
var (
movies, counter, cities, craigslist, sitemap repo.DatasetRef
Expand Down
67 changes: 64 additions & 3 deletions lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/qri-io/qfs/muxfs"
"github.com/qri-io/qri/config"
"github.com/qri-io/qri/config/migrate"
"github.com/qri-io/qri/cron"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/fs"
Expand Down Expand Up @@ -291,6 +292,10 @@ func NewInstance(opts ...Option) (qri *Instance, err error) {
}
}

if inst.cron, err = newCron(cfg, filepath.Base(cfg.Path()), opts); err != nil {
return
}

if inst.store, err = newStore(cfg); err != nil {
return
}
Expand Down Expand Up @@ -382,6 +387,44 @@ func newFilesystem(cfg *config.Config, store cafs.Filestore) (qfs.Filesystem, er
return fsys, nil
}

func newCron(cfg *config.Config, repoPath string, opts []Option) (cron.Scheduler, error) {
cli := cron.HTTPClient{Addr: cfg.Update.Address}
err := cli.Ping()
if err == nil {
return cli, nil
} else if err != cron.ErrUnreachable {
// at this point something is up with the cron server, still return a cli
// so we can issue commands like restart
return cli, err
}

// at this point err must be cron.ErrUnreachable, which means it's time to
// spin up a new cron service
err = nil

var js cron.JobStore
switch cfg.Update.Type {
case "fs":
js = cron.NewFlatbufferJobStore(repoPath + "/jobs.qfb")
case "mem":
js = &cron.MemJobStore{}
default:
return nil, fmt.Errorf("unknown cron type: %s", cfg.Update.Type)
}

newInst := func(ctx context.Context, streams ioes.IOStreams) (*Instance, error) {
opts = append([]Option{
OptCtx(ctx),
OptIOStreams(streams),
}, opts...)
return NewInstance(opts...)
}

scriptsPath := filepath.Join(repoPath, "/cron")

return cron.NewCron(js, newUpdateRunner(newInst, scriptsPath)), nil
}

// NewInstanceFromConfigAndNode is a temporary solution to create an instance from an
// already-allocated QriNode & configuration
// don't write new code that relies on this, instead create a configuration
Expand Down Expand Up @@ -421,6 +464,7 @@ type Instance struct {
registry *regclient.Client
repo repo.Repo
node *p2p.QriNode
cron cron.Scheduler

rpc *rpc.Client
}
Expand Down Expand Up @@ -455,10 +499,12 @@ func (inst *Instance) Node() *p2p.QriNode {

// Repo accesses the instance Repo if one exists
func (inst *Instance) Repo() repo.Repo {
if inst.node == nil {
return nil
if inst.repo != nil {
return inst.repo
} else if inst.node != nil {
return inst.node.Repo
}
return inst.node.Repo
return nil
}

// RPC accesses the instance RPC client if one exists
Expand All @@ -470,3 +516,18 @@ func (inst *Instance) RPC() *rpc.Client {
func (inst *Instance) Teardown() {
inst.teardown()
}

// // path returns the root path this instance is operating from if such a
// // directory exists (eg: in-memory repos have no path)
// func (inst *Instance) ensureRepoPath(path ...string) string {
// if fsr, ok := inst.Repo().(*fsrepo.Repo); ok {
// return fsr.Path()
// }

// path := filepath.Join(base, "update")
// if err := os.MkdirAll(path); err != nil {
// return "", err
// }

// return ""
// }
Loading

0 comments on commit 5e8a871

Please sign in to comment.