Skip to content

Commit

Permalink
feat(update): add update log command and api endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed May 9, 2019
1 parent 41c9366 commit 56c4840
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 38 deletions.
3 changes: 2 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ func NewServerRoutes(s Server) *http.ServeMux {
}
m.Handle("/update", s.middleware(uh.UpdatesHandler))
m.Handle("/update/run", s.middleware(uh.RunHandler))
m.Handle("/update/log", s.middleware(uh.LogHandler))
m.Handle("/update/logs", s.middleware(uh.LogsHandler))
m.Handle("/update/logs/file", s.middleware(uh.LogFileHandler))
m.Handle("/update/service", s.middleware(uh.ServiceHandler))

renderh := NewRenderHandlers(node.Repo)
Expand Down
Binary file modified api/testdata/api.snapshot
Binary file not shown.
40 changes: 37 additions & 3 deletions api/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,43 @@ func (h UpdateHandlers) unscheduleUpdateHandler(w http.ResponseWriter, r *http.R
}
}

// LogHandler shows the log of previously run updates
func (h *UpdateHandlers) LogHandler(w http.ResponseWriter, r *http.Request) {
util.WriteErrResponse(w, http.StatusNotFound, fmt.Errorf("not finished"))
// LogsHandler shows the log of previously run updates
func (h *UpdateHandlers) LogsHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "OPTIONS":
util.EmptyOkHandler(w, r)
case "GET":
h.logsHandler(w, r)
default:
util.NotFoundHandler(w, r)
}
}

func (h *UpdateHandlers) logsHandler(w http.ResponseWriter, r *http.Request) {
args := lib.ListParamsFromRequest(r)
res := []*lib.Job{}
if err := h.Logs(&args, &res); err != nil {
log.Errorf("listing update logs: %s", err.Error())
util.WriteErrResponse(w, http.StatusInternalServerError, err)
return
}
if err := util.WritePageResponse(w, res, r, args.Page()); err != nil {
log.Errorf("list jobs response: %s", err.Error())
}
}

// LogFileHandler fetches log output file data
func (h *UpdateHandlers) LogFileHandler(w http.ResponseWriter, r *http.Request) {
name := r.FormValue("log_name")
data := []byte{}
if err := h.LogFile(&name, &data); err != nil {
log.Errorf("getting update log file: %s", err.Error())
util.WriteErrResponse(w, http.StatusInternalServerError, err)
return
}

w.WriteHeader(http.StatusOK)
w.Write(data)
}

// RunHandler brings a dataset to the latest version
Expand Down
2 changes: 1 addition & 1 deletion api/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestUpdateHandlers(t *testing.T) {
logCases := []handlerTestCase{
{"OPTIONS", "/", nil},
}
runHandlerTestCases(t, "update log", h.LogHandler, logCases, false)
runHandlerTestCases(t, "update log", h.LogsHandler, logCases, false)

runUpdateCases := []handlerMimeMultipartTestCase{
{"OPTIONS", "/update/run", nil, nil},
Expand Down
69 changes: 53 additions & 16 deletions cmd/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"path/filepath"

util "github.com/datatogether/api/apiutil"
"github.com/qri-io/ioes"
"github.com/qri-io/qri/config"
"github.com/qri-io/qri/lib"
Expand Down Expand Up @@ -46,8 +47,9 @@ func NewUpdateCommand(f Factory, ioStreams ioes.IOStreams) *cobra.Command {
},
}
listCmd := &cobra.Command{
Use: "list",
Short: "list scheduled updates",
Use: "list",
Aliases: []string{"ls"},
Short: "list scheduled updates",
RunE: func(cmd *cobra.Command, args []string) error {
if err := o.Complete(f, args); err != nil {
return err
Expand All @@ -56,20 +58,24 @@ func NewUpdateCommand(f Factory, ioStreams ioes.IOStreams) *cobra.Command {
},
}

listCmd.Flags().IntVar(&o.PageSize, "page-size", 25, "page size of results, default 25")
listCmd.Flags().IntVar(&o.Page, "page", 1, "page number results, default 1")
listCmd.Flags().IntVar(&o.PageSize, "page-size", 25, "page size of results, default 25")

logCmd := &cobra.Command{
Use: "log",
Short: "show log of dataset updates",
logsCmd := &cobra.Command{
Use: "logs",
Aliases: []string{"log"},
Short: "show log of dataset updates",
RunE: func(cmd *cobra.Command, args []string) error {
if err := o.Complete(f, args); err != nil {
return err
}
return o.Log()
return o.Logs(args)
},
}

logsCmd.Flags().IntVar(&o.Page, "page", 1, "page number results, default 1")
logsCmd.Flags().IntVar(&o.PageSize, "page-size", 25, "page size of results, default 25")

runCmd := &cobra.Command{
Use: "run",
Short: "excute an update immideately",
Expand Down Expand Up @@ -135,7 +141,7 @@ func NewUpdateCommand(f Factory, ioStreams ioes.IOStreams) *cobra.Command {
scheduleCmd,
unscheduleCmd,
listCmd,
logCmd,
logsCmd,
runCmd,
serviceCmd,
)
Expand Down Expand Up @@ -221,11 +227,11 @@ func (o *UpdateOptions) Unschedule(args []string) (err error) {

// List shows scheduled update jobs
func (o *UpdateOptions) List() (err error) {

// convert Page and PageSize to Limit and Offset
page := util.NewPage(o.Page, o.PageSize)
p := &lib.ListParams{
// TODO (b5) - finish
Limit: 100,
Offset: 0,
Offset: page.Offset(),
Limit: page.Limit(),
}
res := []*lib.Job{}
if err = o.updateMethods.List(p, &res); err != nil {
Expand All @@ -240,10 +246,41 @@ func (o *UpdateOptions) List() (err error) {
return
}

// Log shows a history of job events
func (o *UpdateOptions) Log() (err error) {
// TODO (b5):
return fmt.Errorf("not finished")
// Logs shows a history of job events
func (o *UpdateOptions) Logs(args []string) (err error) {
if len(args) == 1 {
return o.LogFile(args[0])
}

// convert Page and PageSize to Limit and Offset
page := util.NewPage(o.Page, o.PageSize)
p := &lib.ListParams{
Offset: page.Offset(),
Limit: page.Limit(),
}

res := []*lib.Job{}
if err = o.updateMethods.Logs(p, &res); err != nil {
return
}

for i, j := range res {
num := p.Offset + i + 1
printInfo(o.Out, "%d. %s\n %s | %s\n", num, j.Name, j.Type, j.NextExec())
}

return nil
}

// LogFile prints a log output file
func (o *UpdateOptions) LogFile(logName string) error {
data := []byte{}
if err := o.updateMethods.LogFile(&logName, &data); err != nil {
return err
}

o.Out.Write(data)
return nil
}

// ServiceStatus gets the current status of the update daemon
Expand Down
20 changes: 14 additions & 6 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
package cron

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"time"

apiutil "github.com/datatogether/api/apiutil"
Expand Down Expand Up @@ -103,9 +104,18 @@ func (c *Cron) LoggedJob(ctx context.Context, logName string) (*Job, error) {

// LoggedJobFile returns a reader for a file at the given name
func (c *Cron) LoggedJobFile(ctx context.Context, logName string) (io.ReadCloser, error) {
// reader := c.log.
// TODO (b5):
return nil, fmt.Errorf("not finished")
job, err := c.log.Job(ctx, logName)
if err != nil {
return nil, err
}

if job.LogFilePath == "" {
return ioutil.NopCloser(&bytes.Buffer{}), nil
}

// TODO (b5): if logs are being stored somewhere other than local this'll break
// we should add an OpenLogFile method to LogFileCreator & rename the interface
return os.Open(job.LogFilePath)
}

// Start initiates the check loop, looking for updates to execute once at every
Expand Down Expand Up @@ -172,8 +182,6 @@ func (c *Cron) runJob(ctx context.Context, job *Job, runner RunJobFunc) {
}
}

streams.ErrOut.Write([]byte(fmt.Sprintf("%s %s\n", job.LastRunStart, job.Name)))

if err := runner(ctx, streams, job); err != nil {
log.Errorf("run job: %s error: %s", job.Name, err.Error())
job.LastError = err.Error()
Expand Down
4 changes: 2 additions & 2 deletions lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ func newCron(cfg *config.Config, repoPath string) (cron.Scheduler, error) {
var jobStore, logStore cron.JobStore
switch updateCfg.Type {
case "fs":
jobStore = cron.NewFlatbufferJobStore(repoPath + "/jobs.qfb")
logStore = cron.NewFlatbufferJobStore(repoPath + "/logs.qfb")
jobStore = cron.NewFlatbufferJobStore(repoPath + "/cron_jobs.qfb")
logStore = cron.NewFlatbufferJobStore(repoPath + "/cron_logs.qfb")
case "mem":
jobStore = &cron.MemJobStore{}
logStore = &cron.MemJobStore{}
Expand Down
43 changes: 34 additions & 9 deletions lib/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lib
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -154,19 +155,43 @@ func (m *UpdateMethods) Job(name *string, job *Job) (err error) {
// TODO (b5): refactor RPC communication to use context
var ctx = context.Background()

var res *Job
res, err = m.inst.cron.Job(ctx, *name)
if err == nil {
*job = *res
res, err := m.inst.cron.Job(ctx, *name)
if err != nil {
return err
}

return
*job = *res
return nil
}

// Log shows the history of job execution
func (m *UpdateMethods) Log(name *string, unscheduled *bool) error {
// TODO (b5)
return fmt.Errorf("not finished")
// Logs shows the history of job execution
func (m *UpdateMethods) Logs(p *ListParams, res *[]*Job) error {
// this context is scoped to the scheduling request. currently not cancellable
// because our lib methods don't accept a context themselves
// TODO (b5): refactor RPC communication to use context
var ctx = context.Background()

jobs, err := m.inst.cron.Logs(ctx, p.Offset, p.Limit)
if err != nil {
return err
}

*res = jobs
return nil
}

// LogFile reads log file data for a given logName
func (m *UpdateMethods) LogFile(logName *string, data *[]byte) error {
f, err := m.inst.cron.LoggedJobFile(context.Background(), *logName)
if err != nil {
return err
}

defer f.Close()
res, err := ioutil.ReadAll(f)
*data = res

return err
}

// ServiceStatus describes the current state of a service
Expand Down

0 comments on commit 56c4840

Please sign in to comment.