Skip to content

Commit

Permalink
fix(cron): connect cron to log file, actually run returned command in…
Browse files Browse the repository at this point in the history
… lib
  • Loading branch information
b5 committed May 9, 2019
1 parent a9f3c52 commit 41c9366
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 9 deletions.
1 change: 0 additions & 1 deletion base/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func datasetSaveCmd(streams ioes.IOStreams, job *cron.Job) *exec.Cmd {
}

cmd := exec.Command("qri", args...)
// cmd.Dir = basepath
cmd.Stderr = streams.ErrOut
cmd.Stdout = streams.Out
cmd.Stdin = streams.In
Expand Down
9 changes: 6 additions & 3 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,15 @@ func (c *Cron) runJob(ctx context.Context, job *Job, runner RunJobFunc) {
streams := ioes.NewDiscardIOStreams()
if lfc, ok := c.log.(LogFileCreator); ok {
if file, logPath, err := lfc.CreateLogFile(job); err == nil {
log.Debugf("using log file: %s", logPath)
defer file.Close()
streams.Out = file
streams.ErrOut = file
streams = ioes.NewIOStreams(nil, file, file)
job.LogFilePath = logPath
}
}

streams.ErrOut.Write([]byte(fmt.Sprintf("%s %s\n", job.LastRunStart, job.Name)))

if err := runner(ctx, streams, job); err != nil {
log.Errorf("run job: %s error: %s", job.Name, err.Error())
job.LastError = err.Error()
Expand All @@ -188,6 +190,7 @@ func (c *Cron) runJob(ctx context.Context, job *Job, runner RunJobFunc) {
log.Error(err)
}

job.Name = job.LogName()
if err := c.log.PutJob(ctx, job); err != nil {
log.Error(err)
}
Expand Down Expand Up @@ -344,7 +347,7 @@ func (c *Cron) loggedJobFileHandler(w http.ResponseWriter, r *http.Request) {

func (c *Cron) runHandler(w http.ResponseWriter, r *http.Request) {
// TODO (b5): implement an HTTP run handler
w.WriteHeader(http.StatusTooEarly)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("not finished"))
// c.runJob(r.Context(), nil)
}
5 changes: 5 additions & 0 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func TestCronHTTP(t *testing.T) {
t.Error("expected len of jobs to equal 1")
}

_, err = cli.Job(cliCtx, jobs[0].Name)
if err != nil {
t.Fatal(err.Error())
}

if err := cli.Unschedule(cliCtx, dsJob.Name); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cron/file_job_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (s *FlatbufferJobStore) CreateLogFile(j *Job) (f io.WriteCloser, path strin
if logsDir, err = s.logsDir(); err != nil {
return
}
path = filepath.Join(logsDir, j.LogName())
path = filepath.Join(logsDir, j.LogName()+".log")

f, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
return
Expand Down
3 changes: 2 additions & 1 deletion cron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cron

import (
"fmt"
"path/filepath"
"time"

flatbuffers "github.com/google/flatbuffers/go"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (job *Job) NextExec() time.Time {

// LogName returns a canonical name string from a timestamp and job pointer
func (job *Job) LogName() string {
return fmt.Sprintf("%d-%s", job.LastRunStart.Unix(), job.Name)
return fmt.Sprintf("%d-%s", job.LastRunStart.Unix(), filepath.Base(job.Name))
}

// Copy creates a deep copy of a job
Expand Down
5 changes: 5 additions & 0 deletions lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ var (
const VersionNumber = "0.7.4-dev"

func init() {
// TODO (b5): for now this ensures that `qri update service start`
// actually prints something. `qri update service start` should print
// some basic details AND obey the config.log.levels.cron value
golog.SetLogLevel("cron", "debug")

// Fields like dataset.Structure.Schema contain data of arbitrary types,
// registering with the gob package prevents errors when sending them
// over net/rpc calls.
Expand Down
6 changes: 3 additions & 3 deletions lib/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func UpdateServiceStart(ctx context.Context, repoPath string, updateCfg *config.
var jobStore, logStore cron.JobStore
switch updateCfg.Type {
case "fs":
jobStore = cron.NewFlatbufferJobStore(repoPath + "/jobs.qfb")
logStore = cron.NewFlatbufferJobStore(repoPath + "/logs.qfb")
jobStore = cron.NewFlatbufferJobStore(repoPath + "/cron_jobs.qfb")
logStore = cron.NewFlatbufferJobStore(repoPath + "/cron_logs.qfb")
case "mem":
jobStore = &cron.MemJobStore{}
logStore = &cron.MemJobStore{}
Expand Down Expand Up @@ -356,6 +356,6 @@ func updateFactory(context.Context) cron.RunJobFunc {
if cmd == nil {
return fmt.Errorf("unrecognized update type: %s", job.Type)
}
return nil
return cmd.Run()
}
}

0 comments on commit 41c9366

Please sign in to comment.