From 56c4840f3ac3f3a2ea93bfb7bd5414980c2a2e54 Mon Sep 17 00:00:00 2001 From: b5 Date: Sun, 5 May 2019 08:13:39 -0400 Subject: [PATCH] feat(update): add update log command and api endpoints --- api/api.go | 3 +- api/testdata/api.snapshot | Bin 180261 -> 180186 bytes api/update.go | 40 ++++++++++++++++++++-- api/update_test.go | 2 +- cmd/update.go | 69 +++++++++++++++++++++++++++++--------- cron/cron.go | 20 +++++++---- lib/lib.go | 4 +-- lib/update.go | 43 +++++++++++++++++++----- 8 files changed, 143 insertions(+), 38 deletions(-) diff --git a/api/api.go b/api/api.go index bfa627249..a6b64dd08 100644 --- a/api/api.go +++ b/api/api.go @@ -271,7 +271,8 @@ func NewServerRoutes(s Server) *http.ServeMux { } m.Handle("/update", s.middleware(uh.UpdatesHandler)) m.Handle("/update/run", s.middleware(uh.RunHandler)) - m.Handle("/update/log", s.middleware(uh.LogHandler)) + m.Handle("/update/logs", s.middleware(uh.LogsHandler)) + m.Handle("/update/logs/file", s.middleware(uh.LogFileHandler)) m.Handle("/update/service", s.middleware(uh.ServiceHandler)) renderh := NewRenderHandlers(node.Repo) diff --git a/api/testdata/api.snapshot b/api/testdata/api.snapshot index ad87538d14e83b8c6b803a1f80b4074d4e89a3ab..7c31ef1b6b1c2d0b3ec30b065352828c7ff7674f 100755 GIT binary patch delta 29 lcmZ45zM1 RK)C`Bmc0THw^zIZg9B;a6XpN_ diff --git a/api/update.go b/api/update.go index e6d8ccf76..3ecd10202 100644 --- a/api/update.go +++ b/api/update.go @@ -94,9 +94,43 @@ func (h UpdateHandlers) unscheduleUpdateHandler(w http.ResponseWriter, r *http.R } } -// LogHandler shows the log of previously run updates -func (h *UpdateHandlers) LogHandler(w http.ResponseWriter, r *http.Request) { - util.WriteErrResponse(w, http.StatusNotFound, fmt.Errorf("not finished")) +// LogsHandler shows the log of previously run updates +func (h *UpdateHandlers) LogsHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "OPTIONS": + util.EmptyOkHandler(w, r) + case "GET": + h.logsHandler(w, r) + default: + util.NotFoundHandler(w, r) + } +} + +func (h *UpdateHandlers) logsHandler(w http.ResponseWriter, r *http.Request) { + args := lib.ListParamsFromRequest(r) + res := []*lib.Job{} + if err := h.Logs(&args, &res); err != nil { + log.Errorf("listing update logs: %s", err.Error()) + util.WriteErrResponse(w, http.StatusInternalServerError, err) + return + } + if err := util.WritePageResponse(w, res, r, args.Page()); err != nil { + log.Errorf("list jobs response: %s", err.Error()) + } +} + +// LogFileHandler fetches log output file data +func (h *UpdateHandlers) LogFileHandler(w http.ResponseWriter, r *http.Request) { + name := r.FormValue("log_name") + data := []byte{} + if err := h.LogFile(&name, &data); err != nil { + log.Errorf("getting update log file: %s", err.Error()) + util.WriteErrResponse(w, http.StatusInternalServerError, err) + return + } + + w.WriteHeader(http.StatusOK) + w.Write(data) } // RunHandler brings a dataset to the latest version diff --git a/api/update_test.go b/api/update_test.go index 8c16f1fe6..7b232c4b8 100644 --- a/api/update_test.go +++ b/api/update_test.go @@ -37,7 +37,7 @@ func TestUpdateHandlers(t *testing.T) { logCases := []handlerTestCase{ {"OPTIONS", "/", nil}, } - runHandlerTestCases(t, "update log", h.LogHandler, logCases, false) + runHandlerTestCases(t, "update log", h.LogsHandler, logCases, false) runUpdateCases := []handlerMimeMultipartTestCase{ {"OPTIONS", "/update/run", nil, nil}, diff --git a/cmd/update.go b/cmd/update.go index 2b07423d6..77d7948f5 100644 --- a/cmd/update.go +++ b/cmd/update.go @@ -5,6 +5,7 @@ import ( "fmt" "path/filepath" + util "github.com/datatogether/api/apiutil" "github.com/qri-io/ioes" "github.com/qri-io/qri/config" "github.com/qri-io/qri/lib" @@ -46,8 +47,9 @@ func NewUpdateCommand(f Factory, ioStreams ioes.IOStreams) *cobra.Command { }, } listCmd := &cobra.Command{ - Use: "list", - Short: "list scheduled updates", + Use: "list", + Aliases: []string{"ls"}, + Short: "list scheduled updates", RunE: func(cmd *cobra.Command, args []string) error { if err := o.Complete(f, args); err != nil { return err @@ -56,20 +58,24 @@ func NewUpdateCommand(f Factory, ioStreams ioes.IOStreams) *cobra.Command { }, } - listCmd.Flags().IntVar(&o.PageSize, "page-size", 25, "page size of results, default 25") listCmd.Flags().IntVar(&o.Page, "page", 1, "page number results, default 1") + listCmd.Flags().IntVar(&o.PageSize, "page-size", 25, "page size of results, default 25") - logCmd := &cobra.Command{ - Use: "log", - Short: "show log of dataset updates", + logsCmd := &cobra.Command{ + Use: "logs", + Aliases: []string{"log"}, + Short: "show log of dataset updates", RunE: func(cmd *cobra.Command, args []string) error { if err := o.Complete(f, args); err != nil { return err } - return o.Log() + return o.Logs(args) }, } + logsCmd.Flags().IntVar(&o.Page, "page", 1, "page number results, default 1") + logsCmd.Flags().IntVar(&o.PageSize, "page-size", 25, "page size of results, default 25") + runCmd := &cobra.Command{ Use: "run", Short: "excute an update immideately", @@ -135,7 +141,7 @@ func NewUpdateCommand(f Factory, ioStreams ioes.IOStreams) *cobra.Command { scheduleCmd, unscheduleCmd, listCmd, - logCmd, + logsCmd, runCmd, serviceCmd, ) @@ -221,11 +227,11 @@ func (o *UpdateOptions) Unschedule(args []string) (err error) { // List shows scheduled update jobs func (o *UpdateOptions) List() (err error) { - + // convert Page and PageSize to Limit and Offset + page := util.NewPage(o.Page, o.PageSize) p := &lib.ListParams{ - // TODO (b5) - finish - Limit: 100, - Offset: 0, + Offset: page.Offset(), + Limit: page.Limit(), } res := []*lib.Job{} if err = o.updateMethods.List(p, &res); err != nil { @@ -240,10 +246,41 @@ func (o *UpdateOptions) List() (err error) { return } -// Log shows a history of job events -func (o *UpdateOptions) Log() (err error) { - // TODO (b5): - return fmt.Errorf("not finished") +// Logs shows a history of job events +func (o *UpdateOptions) Logs(args []string) (err error) { + if len(args) == 1 { + return o.LogFile(args[0]) + } + + // convert Page and PageSize to Limit and Offset + page := util.NewPage(o.Page, o.PageSize) + p := &lib.ListParams{ + Offset: page.Offset(), + Limit: page.Limit(), + } + + res := []*lib.Job{} + if err = o.updateMethods.Logs(p, &res); err != nil { + return + } + + for i, j := range res { + num := p.Offset + i + 1 + printInfo(o.Out, "%d. %s\n %s | %s\n", num, j.Name, j.Type, j.NextExec()) + } + + return nil +} + +// LogFile prints a log output file +func (o *UpdateOptions) LogFile(logName string) error { + data := []byte{} + if err := o.updateMethods.LogFile(&logName, &data); err != nil { + return err + } + + o.Out.Write(data) + return nil } // ServiceStatus gets the current status of the update daemon diff --git a/cron/cron.go b/cron/cron.go index 5a6532a2b..9d56a6b1b 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -2,11 +2,12 @@ package cron import ( + "bytes" "context" - "fmt" "io" "io/ioutil" "net/http" + "os" "time" apiutil "github.com/datatogether/api/apiutil" @@ -103,9 +104,18 @@ func (c *Cron) LoggedJob(ctx context.Context, logName string) (*Job, error) { // LoggedJobFile returns a reader for a file at the given name func (c *Cron) LoggedJobFile(ctx context.Context, logName string) (io.ReadCloser, error) { - // reader := c.log. - // TODO (b5): - return nil, fmt.Errorf("not finished") + job, err := c.log.Job(ctx, logName) + if err != nil { + return nil, err + } + + if job.LogFilePath == "" { + return ioutil.NopCloser(&bytes.Buffer{}), nil + } + + // TODO (b5): if logs are being stored somewhere other than local this'll break + // we should add an OpenLogFile method to LogFileCreator & rename the interface + return os.Open(job.LogFilePath) } // Start initiates the check loop, looking for updates to execute once at every @@ -172,8 +182,6 @@ func (c *Cron) runJob(ctx context.Context, job *Job, runner RunJobFunc) { } } - streams.ErrOut.Write([]byte(fmt.Sprintf("%s %s\n", job.LastRunStart, job.Name))) - if err := runner(ctx, streams, job); err != nil { log.Errorf("run job: %s error: %s", job.Name, err.Error()) job.LastError = err.Error() diff --git a/lib/lib.go b/lib/lib.go index 71f0edafb..7d6000326 100644 --- a/lib/lib.go +++ b/lib/lib.go @@ -408,8 +408,8 @@ func newCron(cfg *config.Config, repoPath string) (cron.Scheduler, error) { var jobStore, logStore cron.JobStore switch updateCfg.Type { case "fs": - jobStore = cron.NewFlatbufferJobStore(repoPath + "/jobs.qfb") - logStore = cron.NewFlatbufferJobStore(repoPath + "/logs.qfb") + jobStore = cron.NewFlatbufferJobStore(repoPath + "/cron_jobs.qfb") + logStore = cron.NewFlatbufferJobStore(repoPath + "/cron_logs.qfb") case "mem": jobStore = &cron.MemJobStore{} logStore = &cron.MemJobStore{} diff --git a/lib/update.go b/lib/update.go index d73523b00..f76b00c09 100644 --- a/lib/update.go +++ b/lib/update.go @@ -3,6 +3,7 @@ package lib import ( "context" "fmt" + "io/ioutil" "os" "path/filepath" "time" @@ -154,19 +155,43 @@ func (m *UpdateMethods) Job(name *string, job *Job) (err error) { // TODO (b5): refactor RPC communication to use context var ctx = context.Background() - var res *Job - res, err = m.inst.cron.Job(ctx, *name) - if err == nil { - *job = *res + res, err := m.inst.cron.Job(ctx, *name) + if err != nil { + return err } - return + *job = *res + return nil } -// Log shows the history of job execution -func (m *UpdateMethods) Log(name *string, unscheduled *bool) error { - // TODO (b5) - return fmt.Errorf("not finished") +// Logs shows the history of job execution +func (m *UpdateMethods) Logs(p *ListParams, res *[]*Job) error { + // this context is scoped to the scheduling request. currently not cancellable + // because our lib methods don't accept a context themselves + // TODO (b5): refactor RPC communication to use context + var ctx = context.Background() + + jobs, err := m.inst.cron.Logs(ctx, p.Offset, p.Limit) + if err != nil { + return err + } + + *res = jobs + return nil +} + +// LogFile reads log file data for a given logName +func (m *UpdateMethods) LogFile(logName *string, data *[]byte) error { + f, err := m.inst.cron.LoggedJobFile(context.Background(), *logName) + if err != nil { + return err + } + + defer f.Close() + res, err := ioutil.ReadAll(f) + *data = res + + return err } // ServiceStatus describes the current state of a service