Skip to content

Commit

Permalink
feat(cron): store logs and files of stdout logs
Browse files Browse the repository at this point in the history
I think it's worth it to get this API right, it has a number of implications for the overall design of cron.
  • Loading branch information
b5 committed May 9, 2019
1 parent 6f639e1 commit a9f3c52
Show file tree
Hide file tree
Showing 13 changed files with 520 additions and 252 deletions.
131 changes: 67 additions & 64 deletions base/cron.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package base

import (
"context"
"fmt"
"os/exec"
"path/filepath"
Expand All @@ -13,77 +12,80 @@ import (
"github.com/qri-io/qri/cron"
)

// DatasetSaveRunner returns a cron.RunFunc that invokes the "qri save" command
func DatasetSaveRunner(basepath string) cron.RunJobFunc {
return func(ctx context.Context, streams ioes.IOStreams, job *cron.Job) error {
args := []string{"save", job.Name}
// JobToCmd returns an operating system command that will execute the given job
// wiring operating system in/out/errout to the provided iostreams.
func JobToCmd(streams ioes.IOStreams, job *cron.Job) *exec.Cmd {
switch job.Type {
case cron.JTDataset:
return datasetSaveCmd(streams, job)
case cron.JTShellScript:
return shellScriptCmd(streams, job)
default:
return nil
}
}

if o, ok := job.Options.(*cron.DatasetOptions); ok {
if o.Title != "" {
args = append(args, fmt.Sprintf(`--title="%s"`, o.Title))
}
if o.Message != "" {
args = append(args, fmt.Sprintf(`--message="%s"`, o.Message))
}
if o.Recall != "" {
args = append(args, fmt.Sprintf(`--recall="%s"`, o.Recall))
}
if o.BodyPath != "" {
args = append(args, fmt.Sprintf(`--body="%s"`, o.BodyPath))
}
if len(o.FilePaths) > 0 {
for _, path := range o.FilePaths {
args = append(args, fmt.Sprintf(`--file="%s"`, path))
}
// datasetSaveCmd configures a "qri save" command based on job details
// wiring operating system in/out/errout to the provided iostreams.
func datasetSaveCmd(streams ioes.IOStreams, job *cron.Job) *exec.Cmd {
args := []string{"save", job.Name}

if o, ok := job.Options.(*cron.DatasetOptions); ok {
if o.Title != "" {
args = append(args, fmt.Sprintf(`--title="%s"`, o.Title))
}
if o.Message != "" {
args = append(args, fmt.Sprintf(`--message="%s"`, o.Message))
}
if o.Recall != "" {
args = append(args, fmt.Sprintf(`--recall="%s"`, o.Recall))
}
if o.BodyPath != "" {
args = append(args, fmt.Sprintf(`--body="%s"`, o.BodyPath))
}
if len(o.FilePaths) > 0 {
for _, path := range o.FilePaths {
args = append(args, fmt.Sprintf(`--file="%s"`, path))
}
}

// TODO (b5) - config and secrets
// TODO (b5) - config and secrets

boolFlags := map[string]bool{
"--publish": o.Publish,
"--strict": o.Strict,
"--force": o.Force,
"--keep-format": o.ConvertFormatToPrev,
"--no-render": !o.ShouldRender,
}
for flag, use := range boolFlags {
if use {
args = append(args, flag)
}
boolFlags := map[string]bool{
"--publish": o.Publish,
"--strict": o.Strict,
"--force": o.Force,
"--keep-format": o.ConvertFormatToPrev,
"--no-render": !o.ShouldRender,
}
for flag, use := range boolFlags {
if use {
args = append(args, flag)
}

}

cmd := exec.Command("qri", args...)
// cmd.Dir = basepath
cmd.Stderr = streams.ErrOut
cmd.Stdout = streams.Out
cmd.Stdin = streams.In
return cmd.Run()
}

cmd := exec.Command("qri", args...)
// cmd.Dir = basepath
cmd.Stderr = streams.ErrOut
cmd.Stdout = streams.Out
cmd.Stdin = streams.In
return cmd
}

// LocalShellScriptRunner creates a script runner anchored at a local path
// The runner it wires operating sytsem command in/out/errour to the iostreams
// provided by RunJobFunc. All paths are in relation to the provided base path
// shellScriptCmd creates an exec.Cmd, wires operating system in/out/errout
// to the provided iostreams.
// Commands are executed with access to the same enviornment variables as the
// process the runner is executing in
// The executing command blocks until completion
func LocalShellScriptRunner(basepath string) cron.RunJobFunc {
return func(ctx context.Context, streams ioes.IOStreams, job *cron.Job) error {
path := job.Name
if qfs.PathKind(job.Name) == "local" {
// TODO (b5) - need to first check that path can't be found
// path = filepath.Join(basepath, path)
}

cmd := exec.Command(path)
// cmd.Dir = basepath
cmd.Stderr = streams.ErrOut
cmd.Stdout = streams.Out
cmd.Stdin = streams.In
return cmd.Run()
}
func shellScriptCmd(streams ioes.IOStreams, job *cron.Job) *exec.Cmd {
// TODO (b5) - config and secrets as env vars

cmd := exec.Command(job.Name)
cmd.Stderr = streams.ErrOut
cmd.Stdout = streams.Out
cmd.Stdin = streams.In
return cmd
}

// PossibleShellScript checks a path to see if it might be a shell script
Expand All @@ -109,10 +111,11 @@ func DatasetToJob(ds *dataset.Dataset, periodicity string, opts *cron.DatasetOpt

job = &cron.Job{
// TODO (b5) - dataset.Dataset needs an Alias() method:
Name: fmt.Sprintf("%s/%s", ds.Peername, ds.Name),
Periodicity: p,
Type: cron.JTDataset,
LastRun: ds.Commit.Timestamp,
Name: fmt.Sprintf("%s/%s", ds.Peername, ds.Name),
Periodicity: p,
Type: cron.JTDataset,
LastRunStart: ds.Commit.Timestamp,
LastRunStop: ds.Commit.Timestamp,
}
if opts != nil {
job.Options = opts
Expand Down
113 changes: 89 additions & 24 deletions cron/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
Expand Down Expand Up @@ -46,41 +47,31 @@ func (c HTTPClient) Ping() error {
if res.StatusCode == http.StatusOK {
return nil
}
return resError(res)
return maybeErrorResponse(res)
}

// Jobs lists jobs by querying an HTTP server
func (c HTTPClient) Jobs(ctx context.Context, offset, limit int) ([]*Job, error) {
res, err := http.Get(fmt.Sprintf("http://%s/jobs?offset=%d&limit=&%d", c.Addr, offset, limit))
res, err := http.Get(fmt.Sprintf("http://%s/jobs?offset=%d&limit=%d", c.Addr, offset, limit))
if err != nil {
return nil, err
}

defer res.Body.Close()
data, err := ioutil.ReadAll(res.Body)
return decodeJobsResponse(res)
}

// Job gets a job by querying an HTTP server
func (c HTTPClient) Job(ctx context.Context, name string) (*Job, error) {
res, err := http.Get(fmt.Sprintf("http://%s/job?name=%s", c.Addr, name))
if err != nil {
return nil, err
}

js := cronfb.GetRootAsJobs(data, 0)
dec := &cronfb.Job{}
jobs := make([]*Job, js.ListLength())

for i := 0; i < js.ListLength(); i++ {
js.List(dec, i)
decJob := &Job{}
if err := decJob.UnmarshalFlatbuffer(dec); err != nil {
return nil, err
}
jobs[i] = decJob
if res.StatusCode == 200 {
return decodeJobResponse(res)
}

return jobs, nil
}

// Job gets a job by querying an HTTP server
func (c HTTPClient) Job(ctx context.Context, name string) (*Job, error) {
return nil, fmt.Errorf("not finished")
return nil, maybeErrorResponse(res)
}

// Schedule adds a job to the cron scheduler via an HTTP request
Expand All @@ -100,7 +91,45 @@ func (c HTTPClient) Unschedule(ctx context.Context, name string) error {
return err
}

return resError(res)
return maybeErrorResponse(res)
}

// Logs gives a log of executed jobs
func (c HTTPClient) Logs(ctx context.Context, offset, limit int) ([]*Job, error) {
res, err := http.Get(fmt.Sprintf("http://%s/logs?offset=%d&limit=%d", c.Addr, offset, limit))
if err != nil {
return nil, err
}

return decodeJobsResponse(res)
}

// LoggedJob returns a single executed job by job.LogName
func (c HTTPClient) LoggedJob(ctx context.Context, logName string) (*Job, error) {
res, err := http.Get(fmt.Sprintf("http://%s/log?log_name=%s", c.Addr, logName))
if err != nil {
return nil, err
}

if res.StatusCode == 200 {
return decodeJobResponse(res)
}

return nil, maybeErrorResponse(res)
}

// LoggedJobFile returns a reader for a file at the given name
func (c HTTPClient) LoggedJobFile(ctx context.Context, logName string) (io.ReadCloser, error) {
res, err := http.Get(fmt.Sprintf("http://%s/log/output?log_name=%s", c.Addr, logName))
if err != nil {
return nil, err
}

if res.StatusCode == 200 {
return res.Body, nil
}

return nil, maybeErrorResponse(res)
}

func (c HTTPClient) postJob(job *Job) error {
Expand All @@ -114,10 +143,10 @@ func (c HTTPClient) postJob(job *Job) error {
return err
}

return resError(res)
return maybeErrorResponse(res)
}

func resError(res *http.Response) error {
func maybeErrorResponse(res *http.Response) error {
if res.StatusCode == 200 {
return nil
}
Expand All @@ -129,3 +158,39 @@ func resError(res *http.Response) error {

return fmt.Errorf(string(errData))
}

func decodeJobsResponse(res *http.Response) ([]*Job, error) {
defer res.Body.Close()
data, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}

js := cronfb.GetRootAsJobs(data, 0)
dec := &cronfb.Job{}
jobs := make([]*Job, js.ListLength())

for i := 0; i < js.ListLength(); i++ {
js.List(dec, i)
decJob := &Job{}
if err := decJob.UnmarshalFlatbuffer(dec); err != nil {
return nil, err
}
jobs[i] = decJob
}

return jobs, nil
}

func decodeJobResponse(res *http.Response) (*Job, error) {
defer res.Body.Close()
data, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}

js := cronfb.GetRootAsJob(data, 0)
dec := &Job{}
err = dec.UnmarshalFlatbuffer(js)
return dec, err
}
7 changes: 5 additions & 2 deletions cron/cron.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ table StringMapVal {
val:string;
}

// TODO (b5): I think it would be smarter to remove all knoweldge from cron
// TODO (b5): I think it would be smarter to remove all details from cron
// about what exactly is being scheduled, but we would need a go implementation
// of flexbuffers to do that properly, so let's leave this in for now
union Options { DatasetOptions, ShellScriptOptions }
Expand Down Expand Up @@ -38,11 +38,14 @@ table ShellScriptOptions {

table Job {
name:string;
path:string;
type:JobType;
periodicity:string;

lastRun:string;
lastRunStart:string;
lastRunStop:string;
lastError:string;
logFilePath:string;

options:Options;
}
Expand Down
Loading

0 comments on commit a9f3c52

Please sign in to comment.