diff --git a/base/cron.go b/base/cron.go index 3810d3a42..e3e26e743 100644 --- a/base/cron.go +++ b/base/cron.go @@ -1,7 +1,6 @@ package base import ( - "context" "fmt" "os/exec" "path/filepath" @@ -13,77 +12,80 @@ import ( "github.com/qri-io/qri/cron" ) -// DatasetSaveRunner returns a cron.RunFunc that invokes the "qri save" command -func DatasetSaveRunner(basepath string) cron.RunJobFunc { - return func(ctx context.Context, streams ioes.IOStreams, job *cron.Job) error { - args := []string{"save", job.Name} +// JobToCmd returns an operating system command that will execute the given job +// wiring operating system in/out/errout to the provided iostreams. +func JobToCmd(streams ioes.IOStreams, job *cron.Job) *exec.Cmd { + switch job.Type { + case cron.JTDataset: + return datasetSaveCmd(streams, job) + case cron.JTShellScript: + return shellScriptCmd(streams, job) + default: + return nil + } +} - if o, ok := job.Options.(*cron.DatasetOptions); ok { - if o.Title != "" { - args = append(args, fmt.Sprintf(`--title="%s"`, o.Title)) - } - if o.Message != "" { - args = append(args, fmt.Sprintf(`--message="%s"`, o.Message)) - } - if o.Recall != "" { - args = append(args, fmt.Sprintf(`--recall="%s"`, o.Recall)) - } - if o.BodyPath != "" { - args = append(args, fmt.Sprintf(`--body="%s"`, o.BodyPath)) - } - if len(o.FilePaths) > 0 { - for _, path := range o.FilePaths { - args = append(args, fmt.Sprintf(`--file="%s"`, path)) - } +// datasetSaveCmd configures a "qri save" command based on job details +// wiring operating system in/out/errout to the provided iostreams. +func datasetSaveCmd(streams ioes.IOStreams, job *cron.Job) *exec.Cmd { + args := []string{"save", job.Name} + + if o, ok := job.Options.(*cron.DatasetOptions); ok { + if o.Title != "" { + args = append(args, fmt.Sprintf(`--title="%s"`, o.Title)) + } + if o.Message != "" { + args = append(args, fmt.Sprintf(`--message="%s"`, o.Message)) + } + if o.Recall != "" { + args = append(args, fmt.Sprintf(`--recall="%s"`, o.Recall)) + } + if o.BodyPath != "" { + args = append(args, fmt.Sprintf(`--body="%s"`, o.BodyPath)) + } + if len(o.FilePaths) > 0 { + for _, path := range o.FilePaths { + args = append(args, fmt.Sprintf(`--file="%s"`, path)) } + } - // TODO (b5) - config and secrets + // TODO (b5) - config and secrets - boolFlags := map[string]bool{ - "--publish": o.Publish, - "--strict": o.Strict, - "--force": o.Force, - "--keep-format": o.ConvertFormatToPrev, - "--no-render": !o.ShouldRender, - } - for flag, use := range boolFlags { - if use { - args = append(args, flag) - } + boolFlags := map[string]bool{ + "--publish": o.Publish, + "--strict": o.Strict, + "--force": o.Force, + "--keep-format": o.ConvertFormatToPrev, + "--no-render": !o.ShouldRender, + } + for flag, use := range boolFlags { + if use { + args = append(args, flag) } - } - cmd := exec.Command("qri", args...) - // cmd.Dir = basepath - cmd.Stderr = streams.ErrOut - cmd.Stdout = streams.Out - cmd.Stdin = streams.In - return cmd.Run() } + + cmd := exec.Command("qri", args...) + // cmd.Dir = basepath + cmd.Stderr = streams.ErrOut + cmd.Stdout = streams.Out + cmd.Stdin = streams.In + return cmd } -// LocalShellScriptRunner creates a script runner anchored at a local path -// The runner it wires operating sytsem command in/out/errour to the iostreams -// provided by RunJobFunc. All paths are in relation to the provided base path +// shellScriptCmd creates an exec.Cmd, wires operating system in/out/errout +// to the provided iostreams. // Commands are executed with access to the same enviornment variables as the // process the runner is executing in -// The executing command blocks until completion -func LocalShellScriptRunner(basepath string) cron.RunJobFunc { - return func(ctx context.Context, streams ioes.IOStreams, job *cron.Job) error { - path := job.Name - if qfs.PathKind(job.Name) == "local" { - // TODO (b5) - need to first check that path can't be found - // path = filepath.Join(basepath, path) - } - - cmd := exec.Command(path) - // cmd.Dir = basepath - cmd.Stderr = streams.ErrOut - cmd.Stdout = streams.Out - cmd.Stdin = streams.In - return cmd.Run() - } +func shellScriptCmd(streams ioes.IOStreams, job *cron.Job) *exec.Cmd { + // TODO (b5) - config and secrets as env vars + + cmd := exec.Command(job.Name) + cmd.Stderr = streams.ErrOut + cmd.Stdout = streams.Out + cmd.Stdin = streams.In + return cmd } // PossibleShellScript checks a path to see if it might be a shell script @@ -109,10 +111,11 @@ func DatasetToJob(ds *dataset.Dataset, periodicity string, opts *cron.DatasetOpt job = &cron.Job{ // TODO (b5) - dataset.Dataset needs an Alias() method: - Name: fmt.Sprintf("%s/%s", ds.Peername, ds.Name), - Periodicity: p, - Type: cron.JTDataset, - LastRun: ds.Commit.Timestamp, + Name: fmt.Sprintf("%s/%s", ds.Peername, ds.Name), + Periodicity: p, + Type: cron.JTDataset, + LastRunStart: ds.Commit.Timestamp, + LastRunStop: ds.Commit.Timestamp, } if opts != nil { job.Options = opts diff --git a/cron/client.go b/cron/client.go index f4d85a19f..872ce7667 100644 --- a/cron/client.go +++ b/cron/client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" "net/http" "strings" @@ -46,41 +47,31 @@ func (c HTTPClient) Ping() error { if res.StatusCode == http.StatusOK { return nil } - return resError(res) + return maybeErrorResponse(res) } // Jobs lists jobs by querying an HTTP server func (c HTTPClient) Jobs(ctx context.Context, offset, limit int) ([]*Job, error) { - res, err := http.Get(fmt.Sprintf("http://%s/jobs?offset=%d&limit=&%d", c.Addr, offset, limit)) + res, err := http.Get(fmt.Sprintf("http://%s/jobs?offset=%d&limit=%d", c.Addr, offset, limit)) if err != nil { return nil, err } - defer res.Body.Close() - data, err := ioutil.ReadAll(res.Body) + return decodeJobsResponse(res) +} + +// Job gets a job by querying an HTTP server +func (c HTTPClient) Job(ctx context.Context, name string) (*Job, error) { + res, err := http.Get(fmt.Sprintf("http://%s/job?name=%s", c.Addr, name)) if err != nil { return nil, err } - js := cronfb.GetRootAsJobs(data, 0) - dec := &cronfb.Job{} - jobs := make([]*Job, js.ListLength()) - - for i := 0; i < js.ListLength(); i++ { - js.List(dec, i) - decJob := &Job{} - if err := decJob.UnmarshalFlatbuffer(dec); err != nil { - return nil, err - } - jobs[i] = decJob + if res.StatusCode == 200 { + return decodeJobResponse(res) } - return jobs, nil -} - -// Job gets a job by querying an HTTP server -func (c HTTPClient) Job(ctx context.Context, name string) (*Job, error) { - return nil, fmt.Errorf("not finished") + return nil, maybeErrorResponse(res) } // Schedule adds a job to the cron scheduler via an HTTP request @@ -100,7 +91,45 @@ func (c HTTPClient) Unschedule(ctx context.Context, name string) error { return err } - return resError(res) + return maybeErrorResponse(res) +} + +// Logs gives a log of executed jobs +func (c HTTPClient) Logs(ctx context.Context, offset, limit int) ([]*Job, error) { + res, err := http.Get(fmt.Sprintf("http://%s/logs?offset=%d&limit=%d", c.Addr, offset, limit)) + if err != nil { + return nil, err + } + + return decodeJobsResponse(res) +} + +// LoggedJob returns a single executed job by job.LogName +func (c HTTPClient) LoggedJob(ctx context.Context, logName string) (*Job, error) { + res, err := http.Get(fmt.Sprintf("http://%s/log?log_name=%s", c.Addr, logName)) + if err != nil { + return nil, err + } + + if res.StatusCode == 200 { + return decodeJobResponse(res) + } + + return nil, maybeErrorResponse(res) +} + +// LoggedJobFile returns a reader for a file at the given name +func (c HTTPClient) LoggedJobFile(ctx context.Context, logName string) (io.ReadCloser, error) { + res, err := http.Get(fmt.Sprintf("http://%s/log/output?log_name=%s", c.Addr, logName)) + if err != nil { + return nil, err + } + + if res.StatusCode == 200 { + return res.Body, nil + } + + return nil, maybeErrorResponse(res) } func (c HTTPClient) postJob(job *Job) error { @@ -114,10 +143,10 @@ func (c HTTPClient) postJob(job *Job) error { return err } - return resError(res) + return maybeErrorResponse(res) } -func resError(res *http.Response) error { +func maybeErrorResponse(res *http.Response) error { if res.StatusCode == 200 { return nil } @@ -129,3 +158,39 @@ func resError(res *http.Response) error { return fmt.Errorf(string(errData)) } + +func decodeJobsResponse(res *http.Response) ([]*Job, error) { + defer res.Body.Close() + data, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + + js := cronfb.GetRootAsJobs(data, 0) + dec := &cronfb.Job{} + jobs := make([]*Job, js.ListLength()) + + for i := 0; i < js.ListLength(); i++ { + js.List(dec, i) + decJob := &Job{} + if err := decJob.UnmarshalFlatbuffer(dec); err != nil { + return nil, err + } + jobs[i] = decJob + } + + return jobs, nil +} + +func decodeJobResponse(res *http.Response) (*Job, error) { + defer res.Body.Close() + data, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + + js := cronfb.GetRootAsJob(data, 0) + dec := &Job{} + err = dec.UnmarshalFlatbuffer(js) + return dec, err +} diff --git a/cron/cron.fbs b/cron/cron.fbs index e3a38f655..084f059f8 100644 --- a/cron/cron.fbs +++ b/cron/cron.fbs @@ -10,7 +10,7 @@ table StringMapVal { val:string; } -// TODO (b5): I think it would be smarter to remove all knoweldge from cron +// TODO (b5): I think it would be smarter to remove all details from cron // about what exactly is being scheduled, but we would need a go implementation // of flexbuffers to do that properly, so let's leave this in for now union Options { DatasetOptions, ShellScriptOptions } @@ -38,11 +38,14 @@ table ShellScriptOptions { table Job { name:string; + path:string; type:JobType; periodicity:string; - lastRun:string; + lastRunStart:string; + lastRunStop:string; lastError:string; + logFilePath:string; options:Options; } diff --git a/cron/cron.go b/cron/cron.go index 9471cb33a..126e5162a 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -2,12 +2,14 @@ package cron import ( - "bytes" "context" + "fmt" + "io" "io/ioutil" "net/http" "time" + apiutil "github.com/datatogether/api/apiutil" golog "github.com/ipfs/go-log" "github.com/qri-io/ioes" cronfb "github.com/qri-io/qri/cron/cron_fbs" @@ -22,13 +24,25 @@ var ( DefaultCheckInterval = time.Minute ) -// Scheduler is the "generic" interface for the Cron Scheduler, it's implemented +// Scheduler is the generic interface for the Cron Scheduler, it's implemented // by both Cron and HTTPClient for easier RPC communication type Scheduler interface { + // Jobs lists currently scheduled jobs Jobs(ctx context.Context, offset, limit int) ([]*Job, error) + // Job gets a single scheduled job by name Job(ctx context.Context, name string) (*Job, error) + + // Schedule adds a job to the scheduler for execution once every period Schedule(ctx context.Context, job *Job) error + // Unschedule removes a job from the scheduler Unschedule(ctx context.Context, name string) error + + // Logs gives a log of executed jobs + Logs(ctx context.Context, offset, limit int) ([]*Job, error) + // LoggedJob returns a single executed job by job.LogName + LoggedJob(ctx context.Context, logName string) (*Job, error) + // JobLogFile returns a reader for a file at the given name + LoggedJobFile(ctx context.Context, logName string) (io.ReadCloser, error) } // RunJobFunc is a function for executing a job. Cron takes care of scheduling @@ -40,41 +54,58 @@ type RunJobFunc func(ctx context.Context, streams ioes.IOStreams, job *Job) erro type RunJobFactory func(ctx context.Context) (runner RunJobFunc) // NewCron creates a Cron with the default check interval -func NewCron(js JobStore, factory RunJobFactory) *Cron { - return NewCronInterval(js, factory, DefaultCheckInterval) +func NewCron(schedule, log JobStore, factory RunJobFactory) *Cron { + return NewCronInterval(schedule, log, factory, DefaultCheckInterval) } // NewCronInterval creates a Cron with a custom check interval -func NewCronInterval(js JobStore, factory RunJobFactory, checkInterval time.Duration) *Cron { +func NewCronInterval(schedule, log JobStore, factory RunJobFactory, checkInterval time.Duration) *Cron { return &Cron{ - store: js, + schedule: schedule, + log: log, + interval: checkInterval, factory: factory, } } -// Cron coordinates the scheduling of running "jobs" at specified periodicities +// Cron coordinates the scheduling of running jobs at specified periodicities // (intervals) with a provided job runner function type Cron struct { - store JobStore + schedule JobStore + log JobStore interval time.Duration factory RunJobFactory } -// assert Cron implements ReadJobs at compile time -var _ ReadJobs = (*Cron)(nil) - // assert Cron is a Scheduler at compile time var _ Scheduler = (*Cron)(nil) -// Jobs proxies to the underlying store for reading jobs +// Jobs proxies to the schedule store for reading jobs func (c *Cron) Jobs(ctx context.Context, offset, limit int) ([]*Job, error) { - return c.store.Jobs(ctx, offset, limit) + return c.schedule.Jobs(ctx, offset, limit) } -// Job proxies to the underlying store for reading a job by name +// Job proxies to the schedule store for reading a job by name func (c *Cron) Job(ctx context.Context, name string) (*Job, error) { - return c.store.Job(ctx, name) + return c.schedule.Job(ctx, name) +} + +// Logs returns a list of jobs that have been executed +func (c *Cron) Logs(ctx context.Context, offset, limit int) ([]*Job, error) { + return c.log.Jobs(ctx, offset, limit) +} + +// LoggedJob gives a specific Job by logged job name +func (c *Cron) LoggedJob(ctx context.Context, logName string) (*Job, error) { + return c.log.Job(ctx, logName) +} + +// LoggedJobFile returns a reader for a file at the given name +func (c *Cron) LoggedJobFile(ctx context.Context, logName string) (io.ReadCloser, error) { + // reader := c.log. + // TODO (b5): + return nil, fmt.Errorf("not finished") } // Start initiates the check loop, looking for updates to execute once at every @@ -87,7 +118,7 @@ func (c *Cron) Start(ctx context.Context) error { defer cleanup() log.Debugf("running check") - jobs, err := c.store.Jobs(ctx, 0, 0) + jobs, err := c.schedule.Jobs(ctx, 0, 0) if err != nil { log.Errorf("getting jobs from store: %s", err) return @@ -101,7 +132,7 @@ func (c *Cron) Start(ctx context.Context) error { } if len(run) > 0 { - log.Errorf("found %d job(s) to run", len(run)) + log.Debugf("found %d job(s) to run", len(run)) runner := c.factory(ctx) for _, job := range run { // TODO (b5) - if we want things like per-job timeout, we should create @@ -109,7 +140,7 @@ func (c *Cron) Start(ctx context.Context) error { c.runJob(ctx, job, runner) } } else { - log.Errorf("no jobs to run") + log.Debugf("no jobs to run") } } @@ -129,20 +160,37 @@ func (c *Cron) Start(ctx context.Context) error { func (c *Cron) runJob(ctx context.Context, job *Job, runner RunJobFunc) { log.Debugf("run job: %s", job.Name) - in := &bytes.Buffer{} - out := &bytes.Buffer{} - err := &bytes.Buffer{} - streams := ioes.NewIOStreams(in, out, err) + job.LastRunStart = time.Now() + + streams := ioes.NewDiscardIOStreams() + if lfc, ok := c.log.(LogFileCreator); ok { + if file, logPath, err := lfc.CreateLogFile(job); err == nil { + defer file.Close() + streams.Out = file + streams.ErrOut = file + job.LogFilePath = logPath + } + } if err := runner(ctx, streams, job); err != nil { log.Errorf("run job: %s error: %s", job.Name, err.Error()) job.LastError = err.Error() } else { - log.Errorf("run job: %s success", job.Name) + log.Debugf("run job: %s success", job.Name) job.LastError = "" } - job.LastRun = time.Now() - c.store.PutJob(ctx, job) + job.LastRunStop = time.Now() + + // the updated job that goes to the schedule store shouldn't have a log path + scheduleJob := job.Copy() + scheduleJob.LogFilePath = "" + if err := c.schedule.PutJob(ctx, scheduleJob); err != nil { + log.Error(err) + } + + if err := c.log.PutJob(ctx, job); err != nil { + log.Error(err) + } } // Schedule adds a job to the cron scheduler @@ -151,13 +199,13 @@ func (c *Cron) Schedule(ctx context.Context, job *Job) error { return err } - return c.store.PutJob(ctx, job) + return c.schedule.PutJob(ctx, job) } // Unschedule removes a job from the cron scheduler, cancelling any future // job executions func (c *Cron) Unschedule(ctx context.Context, name string) error { - return c.store.DeleteJob(ctx, name) + return c.schedule.DeleteJob(ctx, name) } // ServeHTTP spins up an HTTP server at the specified address @@ -174,6 +222,10 @@ func newCronRoutes(c *Cron) http.Handler { m := http.NewServeMux() m.HandleFunc("/", c.statusHandler) m.HandleFunc("/jobs", c.jobsHandler) + m.HandleFunc("/job", c.jobHandler) + m.HandleFunc("/logs", c.logsHandler) + m.HandleFunc("/log", c.loggedJobHandler) + m.HandleFunc("/log/output", c.loggedJobFileHandler) m.HandleFunc("/run", c.runHandler) return m @@ -186,7 +238,12 @@ func (c *Cron) statusHandler(w http.ResponseWriter, r *http.Request) { func (c *Cron) jobsHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": - js, err := c.Jobs(r.Context(), 0, 0) + // TODO (b5): handle these errors, but they'll default to 0 so it's mainly + // for reporting when we're given odd values + offset, _ := apiutil.ReqParamInt("offset", r) + limit, _ := apiutil.ReqParamInt("limit", r) + + js, err := c.Jobs(r.Context(), offset, limit) if err != nil { w.WriteHeader(http.StatusInternalServerError) return @@ -207,9 +264,10 @@ func (c *Cron) jobsHandler(w http.ResponseWriter, r *http.Request) { if err := job.UnmarshalFlatbuffer(j); err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) + return } - if err := c.store.PutJob(r.Context(), job); err != nil { + if err := c.schedule.PutJob(r.Context(), job); err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return @@ -226,6 +284,64 @@ func (c *Cron) jobsHandler(w http.ResponseWriter, r *http.Request) { } +func (c *Cron) jobHandler(w http.ResponseWriter, r *http.Request) { + name := r.FormValue("name") + job, err := c.Job(r.Context(), name) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + + w.Write(job.FlatbufferBytes()) +} + +func (c *Cron) logsHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + + case "GET": + // TODO (b5): handle these errors, but they'll default to 0 so it's mainly + // for reporting when we're given odd values + offset, _ := apiutil.ReqParamInt("offset", r) + limit, _ := apiutil.ReqParamInt("limit", r) + + log, err := c.Logs(r.Context(), offset, limit) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Write(jobs(log).FlatbufferBytes()) + return + + } +} + +func (c *Cron) loggedJobHandler(w http.ResponseWriter, r *http.Request) { + logName := r.FormValue("log_name") + job, err := c.LoggedJob(r.Context(), logName) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + + w.Write(job.FlatbufferBytes()) +} + +func (c *Cron) loggedJobFileHandler(w http.ResponseWriter, r *http.Request) { + logName := r.FormValue("log_name") + f, err := c.LoggedJobFile(r.Context(), logName) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + + io.Copy(w, f) + return +} + func (c *Cron) runHandler(w http.ResponseWriter, r *http.Request) { // TODO (b5): implement an HTTP run handler w.WriteHeader(http.StatusTooEarly) diff --git a/cron/cron_fbs/Job.go b/cron/cron_fbs/Job.go index c0ddbf89d..ec7a8dcb9 100644 --- a/cron/cron_fbs/Job.go +++ b/cron/cron_fbs/Job.go @@ -34,8 +34,16 @@ func (rcv *Job) Name() []byte { return nil } -func (rcv *Job) Type() JobType { +func (rcv *Job) Path() []byte { o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Job) Type() JobType { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { return rcv._tab.GetInt8(o + rcv._tab.Pos) } @@ -43,19 +51,27 @@ func (rcv *Job) Type() JobType { } func (rcv *Job) MutateType(n JobType) bool { - return rcv._tab.MutateInt8Slot(6, n) + return rcv._tab.MutateInt8Slot(8, n) } func (rcv *Job) Periodicity() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } return nil } -func (rcv *Job) LastRun() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) +func (rcv *Job) LastRunStart() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Job) LastRunStop() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } @@ -63,7 +79,15 @@ func (rcv *Job) LastRun() []byte { } func (rcv *Job) LastError() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Job) LogFilePath() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(18)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } @@ -71,7 +95,7 @@ func (rcv *Job) LastError() []byte { } func (rcv *Job) OptionsType() byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(20)) if o != 0 { return rcv._tab.GetByte(o + rcv._tab.Pos) } @@ -79,11 +103,11 @@ func (rcv *Job) OptionsType() byte { } func (rcv *Job) MutateOptionsType(n byte) bool { - return rcv._tab.MutateByteSlot(14, n) + return rcv._tab.MutateByteSlot(20, n) } func (rcv *Job) Options(obj *flatbuffers.Table) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(22)) if o != 0 { rcv._tab.Union(obj, o) return true @@ -92,28 +116,37 @@ func (rcv *Job) Options(obj *flatbuffers.Table) bool { } func JobStart(builder *flatbuffers.Builder) { - builder.StartObject(7) + builder.StartObject(10) } func JobAddName(builder *flatbuffers.Builder, name flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(name), 0) } +func JobAddPath(builder *flatbuffers.Builder, path flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(path), 0) +} func JobAddType(builder *flatbuffers.Builder, type_ int8) { - builder.PrependInt8Slot(1, type_, 0) + builder.PrependInt8Slot(2, type_, 0) } func JobAddPeriodicity(builder *flatbuffers.Builder, periodicity flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(periodicity), 0) + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(periodicity), 0) } -func JobAddLastRun(builder *flatbuffers.Builder, lastRun flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(lastRun), 0) +func JobAddLastRunStart(builder *flatbuffers.Builder, lastRunStart flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(lastRunStart), 0) +} +func JobAddLastRunStop(builder *flatbuffers.Builder, lastRunStop flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(5, flatbuffers.UOffsetT(lastRunStop), 0) } func JobAddLastError(builder *flatbuffers.Builder, lastError flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(lastError), 0) + builder.PrependUOffsetTSlot(6, flatbuffers.UOffsetT(lastError), 0) +} +func JobAddLogFilePath(builder *flatbuffers.Builder, logFilePath flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(7, flatbuffers.UOffsetT(logFilePath), 0) } func JobAddOptionsType(builder *flatbuffers.Builder, optionsType byte) { - builder.PrependByteSlot(5, optionsType, 0) + builder.PrependByteSlot(8, optionsType, 0) } func JobAddOptions(builder *flatbuffers.Builder, options flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(6, flatbuffers.UOffsetT(options), 0) + builder.PrependUOffsetTSlot(9, flatbuffers.UOffsetT(options), 0) } func JobEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() diff --git a/cron/cron_test.go b/cron/cron_test.go index cdbafac40..734d746ed 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -41,7 +41,7 @@ func TestCronDataset(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() - cron := NewCronInterval(&MemJobStore{}, factory, time.Millisecond*50) + cron := NewCronInterval(&MemJobStore{}, &MemJobStore{}, factory, time.Millisecond*50) if err := cron.Schedule(ctx, job); err != nil { t.Fatal(err) } @@ -88,7 +88,7 @@ func TestCronShellScript(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() - cron := NewCron(&MemJobStore{}, factory) + cron := NewCron(&MemJobStore{}, &MemJobStore{}, factory) if err := cron.Schedule(ctx, job); err != nil { t.Fatal(err) } @@ -107,6 +107,7 @@ func TestCronShellScript(t *testing.T) { func TestCronHTTP(t *testing.T) { s := &MemJobStore{} + l := &MemJobStore{} factory := func(context.Context) RunJobFunc { return func(ctx context.Context, streams ioes.IOStreams, job *Job) error { @@ -120,7 +121,7 @@ func TestCronHTTP(t *testing.T) { t.Error("expected ping to server that is off to return ErrUnreachable") } - cr := NewCron(s, factory) + cr := NewCron(s, l, factory) // TODO (b5) - how do we keep this from being a leaking goroutine? go cr.ServeHTTP(":7897") @@ -138,10 +139,10 @@ func TestCronHTTP(t *testing.T) { } dsJob := &Job{ - Name: "b5/libp2p_node_count", - Type: JTDataset, - Periodicity: mustRepeatingInterval("R/P1W"), - LastRun: time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), + Name: "b5/libp2p_node_count", + Type: JTDataset, + Periodicity: mustRepeatingInterval("R/P1W"), + LastRunStart: time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), } if err = cli.Schedule(cliCtx, dsJob); err != nil { diff --git a/cron/file_job_store.go b/cron/file_job_store.go index 7bc54e31d..b026886dc 100644 --- a/cron/file_job_store.go +++ b/cron/file_job_store.go @@ -3,8 +3,10 @@ package cron import ( "context" "fmt" + "io" "io/ioutil" "os" + "path/filepath" "sort" "sync" ) @@ -168,7 +170,31 @@ func (s *FlatbufferJobStore) DeleteJob(ctx context.Context, name string) error { return s.saveJobs(js) } +const logsDirName = "logs" + +// CreateLogFile creates an in-memory log file +func (s *FlatbufferJobStore) CreateLogFile(j *Job) (f io.WriteCloser, path string, err error) { + s.lock.Lock() + defer s.lock.Unlock() + + var logsDir string + if logsDir, err = s.logsDir(); err != nil { + return + } + path = filepath.Join(logsDir, j.LogName()) + + f, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + return +} + +func (s *FlatbufferJobStore) logsDir() (string, error) { + path := filepath.Join(filepath.Dir(s.path), logsDirName) + err := os.MkdirAll(path, os.ModePerm) + return path, err +} + // Destroy removes the path entirely func (s *FlatbufferJobStore) Destroy() error { + os.RemoveAll(filepath.Join(filepath.Dir(s.path), logsDirName)) return os.Remove(s.path) } diff --git a/cron/job.go b/cron/job.go index 637647b95..020c03b89 100644 --- a/cron/job.go +++ b/cron/job.go @@ -41,11 +41,14 @@ func (jt JobType) Enum() int8 { // a specified Periodicity (time interval) type Job struct { Name string + Path string Type JobType Periodicity iso8601.RepeatingInterval - LastRun time.Time - LastError string + LastRunStart time.Time + LastRunStop time.Time + LastError string + LogFilePath string Options Options } @@ -68,16 +71,51 @@ func (job *Job) Validate() error { // NextExec returns the next time execution horizion. If job periodicity is // improperly configured, the returned time will be zero func (job *Job) NextExec() time.Time { - return job.Periodicity.After(job.LastRun) + return job.Periodicity.After(job.LastRunStart) +} + +// LogName returns a canonical name string from a timestamp and job pointer +func (job *Job) LogName() string { + return fmt.Sprintf("%d-%s", job.LastRunStart.Unix(), job.Name) +} + +// Copy creates a deep copy of a job +func (job *Job) Copy() *Job { + cp := &Job{ + Name: job.Name, + Path: job.Path, + Type: job.Type, + Periodicity: job.Periodicity, + LastRunStart: job.LastRunStart, + LastRunStop: job.LastRunStop, + LastError: job.LastError, + LogFilePath: job.LogFilePath, + } + + if job.Options != nil { + cp.Options = job.Options + } + + return cp +} + +// FlatbufferBytes formats a job as a flatbuffer byte slice +func (job *Job) FlatbufferBytes() []byte { + builder := flatbuffers.NewBuilder(0) + off := job.MarshalFlatbuffer(builder) + builder.Finish(off) + return builder.FinishedBytes() } // MarshalFlatbuffer writes a job to a builder func (job *Job) MarshalFlatbuffer(builder *flatbuffers.Builder) flatbuffers.UOffsetT { name := builder.CreateString(job.Name) - // typ := builder.CreateString(string(job.Type)) + path := builder.CreateString(job.Path) - lastRun := builder.CreateString(job.LastRun.Format(time.RFC3339)) + lastRunStart := builder.CreateString(job.LastRunStart.Format(time.RFC3339)) + lastRunStop := builder.CreateString(job.LastRunStop.Format(time.RFC3339)) lastError := builder.CreateString(job.LastError) + logPath := builder.CreateString(job.LogFilePath) p := builder.CreateString(job.Periodicity.String()) var opts flatbuffers.UOffsetT @@ -87,10 +125,14 @@ func (job *Job) MarshalFlatbuffer(builder *flatbuffers.Builder) flatbuffers.UOff cronfb.JobStart(builder) cronfb.JobAddName(builder, name) + cronfb.JobAddPath(builder, path) + cronfb.JobAddType(builder, job.Type.Enum()) - cronfb.JobAddLastRun(builder, lastRun) - cronfb.JobAddLastError(builder, lastError) cronfb.JobAddPeriodicity(builder, p) + cronfb.JobAddLastRunStart(builder, lastRunStart) + cronfb.JobAddLastRunStop(builder, lastRunStop) + cronfb.JobAddLastError(builder, lastError) + cronfb.JobAddLogFilePath(builder, logPath) if opts != 0 { cronfb.JobAddOptions(builder, opts) } @@ -99,7 +141,12 @@ func (job *Job) MarshalFlatbuffer(builder *flatbuffers.Builder) flatbuffers.UOff // UnmarshalFlatbuffer decodes a job from a flatbuffer func (job *Job) UnmarshalFlatbuffer(j *cronfb.Job) error { - lastRun, err := time.Parse(time.RFC3339, string(j.LastRun())) + lastRunStart, err := time.Parse(time.RFC3339, string(j.LastRunStart())) + if err != nil { + return err + } + + lastRunStop, err := time.Parse(time.RFC3339, string(j.LastRunStop())) if err != nil { return err } @@ -111,11 +158,14 @@ func (job *Job) UnmarshalFlatbuffer(j *cronfb.Job) error { *job = Job{ Name: string(j.Name()), + Path: string(j.Path()), Type: JobType(cronfb.EnumNamesJobType[j.Type()]), Periodicity: p, - LastRun: lastRun, - LastError: string(j.LastError()), + LastRunStart: lastRunStart, + LastRunStop: lastRunStop, + LastError: string(j.LastError()), + LogFilePath: string(j.LogFilePath()), } unionTable := new(flatbuffers.Table) diff --git a/cron/job_store.go b/cron/job_store.go index 9c48161d8..5ed8ec4d5 100644 --- a/cron/job_store.go +++ b/cron/job_store.go @@ -3,6 +3,7 @@ package cron import ( "context" "fmt" + "io" "sort" "sync" @@ -38,6 +39,13 @@ type JobStore interface { DeleteJob(ctx context.Context, name string) error } +// LogFileCreator is an interface for generating log files to write to, +// JobStores should implement this interface +type LogFileCreator interface { + // CreateLogFile returns a file to write output to + CreateLogFile(job *Job) (f io.WriteCloser, path string, err error) +} + // MemJobStore is an in-memory implementation of the JobStore interface // Jobs stored in MemJobStore can be persisted for the duration of a process // at the longest. @@ -156,10 +164,10 @@ type jobs []*Job func (js jobs) Len() int { return len(js) } func (js jobs) Less(i, j int) bool { - if js[i].LastRun.Equal(js[j].LastRun) { + if js[i].LastRunStart.Equal(js[j].LastRunStart) { return js[i].Name < js[j].Name } - return js[i].LastRun.After(js[j].LastRun) + return js[i].LastRunStart.After(js[j].LastRunStart) } func (js jobs) Swap(i, j int) { js[i], js[j] = js[j], js[i] } diff --git a/cron/job_store_test.go b/cron/job_store_test.go index 9ed9880c7..762319b96 100644 --- a/cron/job_store_test.go +++ b/cron/job_store_test.go @@ -48,10 +48,10 @@ func RunJobStoreTests(t *testing.T, newStore func() JobStore) { } jobTwo := &Job{ - Name: "job two", - Periodicity: mustRepeatingInterval("R/P3M"), - Type: JTShellScript, - LastRun: time.Date(2001, 1, 1, 1, 1, 1, 1, time.UTC), + Name: "job two", + Periodicity: mustRepeatingInterval("R/P3M"), + Type: JTShellScript, + LastRunStart: time.Date(2001, 1, 1, 1, 1, 1, 1, time.UTC), } if err = store.PutJob(ctx, jobTwo); err != nil { t.Errorf("putting job one: %s", err) @@ -66,10 +66,10 @@ func RunJobStoreTests(t *testing.T, newStore func() JobStore) { } updatedJobOne := &Job{ - Name: jobOne.Name, - Periodicity: jobOne.Periodicity, - Type: jobOne.Type, - LastRun: time.Date(2002, 1, 1, 1, 1, 1, 1, time.UTC), + Name: jobOne.Name, + Periodicity: jobOne.Periodicity, + Type: jobOne.Type, + LastRunStart: time.Date(2002, 1, 1, 1, 1, 1, 1, time.UTC), } if err = store.PutJob(ctx, updatedJobOne); err != nil { t.Errorf("putting job one: %s", err) diff --git a/cron/job_test.go b/cron/job_test.go index 013172201..0f6734af6 100644 --- a/cron/job_test.go +++ b/cron/job_test.go @@ -30,9 +30,12 @@ func CompareJobs(a, b *Job) error { return fmt.Errorf("Periodicity mismatch. %s != %s", a.Name, b.Name) } // use unix comparisons to ignore millisecond & nanosecond precision errors - if a.LastRun.Unix() != b.LastRun.Unix() { - return fmt.Errorf("LastRun mismatch. %s != %s", a.LastRun, b.LastRun) + if a.LastRunStart.Unix() != b.LastRunStart.Unix() { + return fmt.Errorf("LastRunStart mismatch. %s != %s", a.LastRunStart, b.LastRunStart) } + + // TODO (b5): compare other fields + if a.Type != b.Type { return fmt.Errorf("Type mistmatch. %s != %s", a.Type, b.Type) } diff --git a/lib/lib.go b/lib/lib.go index 0f21708ac..d8771b2a7 100644 --- a/lib/lib.go +++ b/lib/lib.go @@ -280,7 +280,7 @@ func NewInstance(opts ...Option) (qri *Instance, err error) { } } - if inst.cron, err = newCron(cfg, filepath.Dir(cfg.Path()), opts); err != nil { + if inst.cron, err = newCron(cfg, inst.QriPath()); err != nil { return } @@ -389,53 +389,31 @@ func newFilesystem(cfg *config.Config, store cafs.Filestore) (qfs.Filesystem, er return fsys, nil } -func newCron(cfg *config.Config, repoPath string, opts []Option) (cron.Scheduler, error) { - if cfg.Update == nil { - cfg.Update = config.DefaultUpdate() +func newCron(cfg *config.Config, repoPath string) (cron.Scheduler, error) { + updateCfg := cfg.Update + if updateCfg == nil { + updateCfg = config.DefaultUpdate() } - cli := cron.HTTPClient{Addr: cfg.Update.Address} - err := cli.Ping() - - if err == nil { + cli := cron.HTTPClient{Addr: updateCfg.Address} + if err := cli.Ping(); err == nil { return cli, nil - } else if err != cron.ErrUnreachable { - log.Errorf("initializing cron: %s", err.Error()) - // at this point something is up with the cron server, still return a cli - // so we can issue commands like restart - return cli, err } - // at this point err must be cron.ErrUnreachable, which means it's time to - // spin up a new cron service - err = nil - - var js cron.JobStore - switch cfg.Update.Type { + var jobStore, logStore cron.JobStore + switch updateCfg.Type { case "fs": - js = cron.NewFlatbufferJobStore(repoPath + "/jobs.qfb") + jobStore = cron.NewFlatbufferJobStore(repoPath + "/jobs.qfb") + logStore = cron.NewFlatbufferJobStore(repoPath + "/logs.qfb") case "mem": - js = &cron.MemJobStore{} + jobStore = &cron.MemJobStore{} + logStore = &cron.MemJobStore{} default: - return nil, fmt.Errorf("unknown cron type: %s", cfg.Update.Type) - } - - newInst := func(ctx context.Context) (*Instance, error) { - log.Debug("cron create new instance") - go func() { - <-ctx.Done() - log.Debug("cron close instance") - }() - - opts = append([]Option{ - OptCtx(ctx), - }, opts...) - return NewInstance(opts...) + return nil, fmt.Errorf("unknown cron type: %s", updateCfg.Type) } - // scriptsPath := filepath.Join(repoPath, "/cron") - - return cron.NewCron(js, newUpdateFactory(newInst)), nil + svc := cron.NewCron(jobStore, logStore, updateFactory) + return svc, nil } // NewInstanceFromConfigAndNode is a temporary solution to create an instance from an diff --git a/lib/update.go b/lib/update.go index 00985962e..79ae2e51e 100644 --- a/lib/update.go +++ b/lib/update.go @@ -195,34 +195,20 @@ func UpdateServiceStart(ctx context.Context, repoPath string, updateCfg *config. return fmt.Errorf("service already running") } - var js cron.JobStore + var jobStore, logStore cron.JobStore switch updateCfg.Type { case "fs": - js = cron.NewFlatbufferJobStore(repoPath + "/jobs.qfb") + jobStore = cron.NewFlatbufferJobStore(repoPath + "/jobs.qfb") + logStore = cron.NewFlatbufferJobStore(repoPath + "/logs.qfb") case "mem": - js = &cron.MemJobStore{} + jobStore = &cron.MemJobStore{} + logStore = &cron.MemJobStore{} default: return fmt.Errorf("unknown cron type: %s", updateCfg.Type) } - newInst := func(ctx context.Context) (*Instance, error) { - log.Error("cron create new instance") - go func() { - <-ctx.Done() - log.Error("cron close instance") - }() - - opts = append([]Option{ - OptCtx(ctx), - }, opts...) - return NewInstance(opts...) - } - - // logsPath := filepath.Join(repoPath, "/cron") - svc := cron.NewCron(js, newUpdateFactory(newInst)) - + svc := cron.NewCron(jobStore, logStore, updateFactory) log.Debug("starting update service") - go func() { if err := svc.ServeHTTP(updateCfg.Address); err != nil { log.Errorf("starting cron http server: %s", err) @@ -293,8 +279,7 @@ func (m *UpdateMethods) Run(p *Job, res *repo.DatasetRef) (err error) { err = m.runDatasetUpdate(params, res) case cron.JTShellScript: - runner := base.LocalShellScriptRunner(m.scriptsPath) - err = runner(m.inst.ctx, m.inst.streams, p) + return base.JobToCmd(m.inst.streams, p).Run() default: return fmt.Errorf("unrecognized update type: %s", p.Type) @@ -347,33 +332,30 @@ func (m *UpdateMethods) runDatasetUpdate(p *SaveParams, res *repo.DatasetRef) er return dsr.Save(saveParams, res) } -func newUpdateFactory(newInst func(ctx context.Context) (*Instance, error)) func(context.Context) cron.RunJobFunc { - return func(ctx context.Context) cron.RunJobFunc { - // note (b5): we'd like to one day be able to run scripts like this, creating - // an in-process instance when one or more jobs need running, but context cancellation - // & resource cleanup needs to be *perfect* before this can happen. We're not there yet. - // inst, err := newInst(ctx) - - return func(ctx context.Context, streams ioes.IOStreams, job *cron.Job) error { - // if err != nil { - // return err - // } - log.Errorf("running update: %s", job.Name) - - switch job.Type { - case cron.JTDataset: - // When in-process instances are a thing, do this: - // m := NewUpdateMethods(inst) - // res := &repo.DatasetRef{} - // return m.Run(job, res) - runner := base.DatasetSaveRunner("") - return runner(ctx, streams, job) - case cron.JTShellScript: - runner := base.LocalShellScriptRunner("") - return runner(ctx, streams, job) - default: - return fmt.Errorf("unrecognized update type: %s", job.Type) - } +// note (b5): we'd like to one day be able to run scripts like this, creating +// an in-process instance when one or more jobs need running, but context cancellation +// & resource cleanup needs to be *perfect* before this can happen. We're not there yet. +// func newUpdateFactory(newInst func(ctx context.Context) (*Instance, error)) func(context.Context) cron.RunJobFunc { +// return func(ctx context.Context) cron.RunJobFunc { +// inst, err := newInst(ctx) +func updateFactory(context.Context) cron.RunJobFunc { + return func(ctx context.Context, streams ioes.IOStreams, job *cron.Job) error { + // if err != nil { + // return err + // } + + // When in-process instances are a thing, do something like this: + // if job.Type == cron.JTDataset { + // m := NewUpdateMethods(inst) + // res := &repo.DatasetRef{} + // return m.Run(job, res) + // } + + log.Errorf("running update: %s", job.Name) + cmd := base.JobToCmd(streams, job) + if cmd == nil { + return fmt.Errorf("unrecognized update type: %s", job.Type) } + return nil } }