Skip to content

Commit

Permalink
fix(remote): always send progress completion on client push/pull events
Browse files Browse the repository at this point in the history
also, send completion events with a new Error field populated when things
go wrong
  • Loading branch information
b5 committed Dec 9, 2020
1 parent 3c979ed commit afcb2f8
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 55 deletions.
6 changes: 4 additions & 2 deletions base/dsfs/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ func commitFileAddFunc(privKey crypto.PrivKey, pub event.Publisher) addWriteFile
}

hook := func(ctx context.Context, f qfs.File, added map[string]string) (io.Reader, error) {
go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveProgress, event.DsSaveEvent{
if evtErr := pub.Publish(ctx, event.ETDatasetSaveProgress, event.DsSaveEvent{
Username: ds.Peername,
Name: ds.Name,
Message: "finalizing",
Completion: 0.9,
})
}); evtErr != nil {
log.Debugw("publish event errored", "error", evtErr)
}

if cff, ok := wfs.body.(*computeFieldsFile); ok {
updateScriptPaths(ds, added)
Expand Down
17 changes: 11 additions & 6 deletions base/dsfs/compute_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,17 @@ func (cff *computeFieldsFile) handleRows(ctx context.Context, pub event.Publishe
// publish here so we know that if the user sees the "processing body file"
// message, we know that a compute-fields-file has made it all the way through
// setup
go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveProgress, event.DsSaveEvent{
Username: cff.ds.Peername,
Name: cff.ds.Name,
Message: "processing body file",
Completion: 0.1,
})
go func() {
evtErr := pub.Publish(ctx, event.ETDatasetSaveProgress, event.DsSaveEvent{
Username: cff.ds.Peername,
Name: cff.ds.Name,
Message: "processing body file",
Completion: 0.1,
})
if evtErr != nil {
log.Debugw("ignored error while publishing save progress", "evtErr", evtErr)
}
}()

go func() {
err = dsio.EachEntry(r, func(i int, ent dsio.Entry, err error) error {
Expand Down
29 changes: 19 additions & 10 deletions base/dsfs/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,22 +206,29 @@ func CreateDataset(
}
ds.SetBodyFile(bodyFile)

go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveStarted, event.DsSaveEvent{
Username: peername,
Name: name,
Message: "save started",
Completion: 0,
})
go func() {
evtErr := pub.Publish(ctx, event.ETDatasetSaveStarted, event.DsSaveEvent{
Username: peername,
Name: name,
Message: "save started",
Completion: 0,
})
if evtErr != nil {
log.Debugw("ignored error while publishing save start event", "evtErr", evtErr)
}
}()

path, err := WriteDataset(ctx, dsLk, destination, pub, ds, pk, sw)
if err != nil {
log.Debug(err.Error())
event.PublishLogError(ctx, pub, log, event.ETDatasetSaveCompleted, event.DsSaveEvent{
if evtErr := pub.Publish(ctx, event.ETDatasetSaveCompleted, event.DsSaveEvent{
Username: peername,
Name: name,
Error: err,
Completion: 1.0,
})
}); evtErr != nil {
log.Debugw("ignored error while publishing save completed", "evtErr", evtErr)
}
return "", err
}

Expand All @@ -230,12 +237,14 @@ func CreateDataset(
// the caller doesn't use the ds arg afterward
// might make sense to have a wrapper function that writes and loads on success
if err := DerefDataset(ctx, destination, ds); err != nil {
event.PublishLogError(ctx, pub, log, event.ETDatasetSaveCompleted, event.DsSaveEvent{
if evtErr := pub.Publish(ctx, event.ETDatasetSaveCompleted, event.DsSaveEvent{
Username: peername,
Name: name,
Error: err,
Completion: 1.0,
})
}); evtErr != nil {
log.Debugw("ignored error while publishing save completed", "evtErr", evtErr)
}
return path, err
}

Expand Down
2 changes: 0 additions & 2 deletions base/save_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package base

import (
"bytes"
"context"
"testing"

Expand Down Expand Up @@ -123,7 +122,6 @@ func TestCreateDataset(t *testing.T) {
},
}
ds.SetBodyFile(qfs.NewMemfileBytes("/body.json", []byte("[]")))
PrintProgressBarsOnSave(&bytes.Buffer{}, r.Bus())

if _, err := CreateDataset(ctx, r, r.Filesystem().DefaultWriteFS(), &dataset.Dataset{}, &dataset.Dataset{}, SaveSwitches{Pin: true, ShouldRender: true}); err == nil {
t.Error("expected bad dataset to error")
Expand Down
10 changes: 6 additions & 4 deletions cmd/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,14 @@ func renderTable(writer io.Writer, header []string, data [][]string) {
func PrintProgressBarsOnEvents(w io.Writer, bus event.Bus) {
var lock sync.Mutex
// initialize progress container, with custom width
p := mpb.New(mpb.WithWidth(80))
p := mpb.New(mpb.WithWidth(80), mpb.WithOutput(w))
progress := map[string]*mpb.Bar{}

// wire up a subscription to print download progress to streams
bus.Subscribe(func(_ context.Context, typ event.Type, payload interface{}) error {
lock.Lock()
defer lock.Unlock()
log.Debugw("handle event", "type", typ, "payload", payload)

switch evt := payload.(type) {
case event.DsSaveEvent:
Expand Down Expand Up @@ -286,7 +287,7 @@ func PrintProgressBarsOnEvents(w io.Writer, bus event.Bus) {
bar.SetCurrent(int64(evt.Progress.CompletedBlocks()))
case event.ETRemoteClientPushVersionCompleted:
if bar, exists := progress[evt.Ref.String()]; exists {
bar.SetCurrent(int64(len(evt.Progress)))
bar.SetTotal(int64(len(evt.Progress)), true)
delete(progress, evt.Ref.String())
}

Expand All @@ -299,15 +300,15 @@ func PrintProgressBarsOnEvents(w io.Writer, bus event.Bus) {
bar.SetCurrent(int64(evt.Progress.CompletedBlocks()))
case event.ETRemoteClientPullVersionCompleted:
if bar, exists := progress[evt.Ref.String()]; exists {
bar.SetCurrent(int64(len(evt.Progress)))
bar.SetTotal(int64(len(evt.Progress)), true)
delete(progress, evt.Ref.String())
}
}
}

if len(progress) == 0 {
p.Wait()
p = mpb.New(mpb.WithWidth(80))
p = mpb.New(mpb.WithWidth(80), mpb.WithOutput(w))
}
return nil
},
Expand All @@ -317,6 +318,7 @@ func PrintProgressBarsOnEvents(w io.Writer, bus event.Bus) {

event.ETRemoteClientPushVersionProgress,
event.ETRemoteClientPushVersionCompleted,

event.ETRemoteClientPullVersionProgress,
event.ETRemoteClientPullVersionCompleted,
)
Expand Down
38 changes: 38 additions & 0 deletions cmd/print_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package cmd

import (
"bytes"
"context"
"fmt"
"runtime"
"testing"

"github.com/qri-io/dag"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/event"
)

func TestDoesCommandExist(t *testing.T) {
Expand All @@ -15,3 +22,34 @@ func TestDoesCommandExist(t *testing.T) {
}
}
}

func TestProgressBars(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bus := event.NewBus(ctx)

buf := &bytes.Buffer{}
PrintProgressBarsOnEvents(buf, bus)

ref := dsref.MustParse("c/d")

events := []struct {
t event.Type
p interface{}
}{
{event.ETDatasetSaveStarted, event.DsSaveEvent{Username: "a", Name: "b", Completion: 0.1}},
{event.ETDatasetSaveProgress, event.DsSaveEvent{Username: "a", Name: "b", Completion: 0.2}},
{event.ETDatasetSaveProgress, event.DsSaveEvent{Username: "a", Name: "b", Completion: 0.3}},
{event.ETDatasetSaveCompleted, event.DsSaveEvent{Username: "a", Name: "b", Completion: 0.3, Error: fmt.Errorf("oh noes")}},

{event.ETRemoteClientPullVersionProgress, event.RemoteEvent{Ref: ref, Progress: dag.Completion{0, 1, 1}}},
{event.ETRemoteClientPushVersionProgress, event.RemoteEvent{Ref: ref, Progress: dag.Completion{0, 1, 1}}},
{event.ETRemoteClientPullVersionCompleted, event.RemoteEvent{Ref: ref, Progress: dag.Completion{0, 1, 1}, Error: fmt.Errorf("ooooh noes")}},
{event.ETRemoteClientPushVersionCompleted, event.RemoteEvent{Ref: ref, Progress: dag.Completion{0, 1, 1}, Error: fmt.Errorf("ooooh noes")}},
}

for _, e := range events {
bus.Publish(ctx, e.t, e.p)
}
}
10 changes: 0 additions & 10 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,6 @@ type Publisher interface {
Publish(ctx context.Context, t Type, payload interface{}) error
}

// PublishLogError is a convenience function that fires an event to a publisher
// and logs any error using an external logger. Exists to make publishing in a
// goroutine a one-liner, eg:
// go event.PublishLogErr(ctx, pub, log, event.ETSaveStarted, payload)
func PublishLogError(ctx context.Context, pub Publisher, logger golog.StandardLogger, t Type, payload interface{}) {
if err := pub.Publish(ctx, t, payload); err != nil {
logger.Debug(err)
}
}

// Bus is a central coordination point for event publication and subscription
// zero or more subscribers register eventTypes to be notified of, a publisher
// writes a topic event to the bus, which broadcasts to all subscribers of that
Expand Down
1 change: 1 addition & 0 deletions event/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ type RemoteEvent struct {
Ref dsref.Ref `json:"ref"`
RemoteAddr string `json:"remoteAddr"`
Progress dag.Completion `json:"progress"`
Error error `json:"error,omitempty"`
}
51 changes: 31 additions & 20 deletions remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,18 +442,19 @@ func (c *client) pushDatasetVersion(ctx context.Context, ref dsref.Ref, remoteAd
}
push.SetMeta(params)

progEvt := event.RemoteEvent{
Ref: ref,
RemoteAddr: remoteAddr,
}

go func() {
updates := push.Updates()
for {
select {
case update := <-updates:
go func() {
prog := event.RemoteEvent{
Ref: ref,
RemoteAddr: remoteAddr,
Progress: update,
}
if err := c.events.Publish(ctx, event.ETRemoteClientPushVersionProgress, prog); err != nil {
progEvt.Progress = update
if err := c.events.Publish(ctx, event.ETRemoteClientPushVersionProgress, progEvt); err != nil {
log.Debugf("publishing eventType=%q error=%q", event.ETRemoteClientPushVersionProgress, err)
}
}()
Expand All @@ -464,13 +465,18 @@ func (c *client) pushDatasetVersion(ctx context.Context, ref dsref.Ref, remoteAd
}()

if err := push.Do(ctx); err != nil {
progEvt.Error = err
if evtErr := c.events.Publish(ctx, event.ETRemoteClientPushVersionCompleted, progEvt); evtErr != nil {
log.Debugw("ignored error while publishing pushVersionCompleted", "evtErr", evtErr)
}
return err
}

return c.events.Publish(ctx, event.ETRemoteClientPushVersionCompleted, event.RemoteEvent{
Ref: ref,
RemoteAddr: remoteAddr,
})
for i := range progEvt.Progress {
progEvt.Progress[i] = 100
}

return c.events.Publish(ctx, event.ETRemoteClientPushVersionCompleted, progEvt)
}

// PullDataset fetches & pins a dataset to the store, adding it to the list of
Expand Down Expand Up @@ -585,18 +591,19 @@ func (c *client) pullDatasetVersion(ctx context.Context, ref *dsref.Ref, remoteA
return err
}

progEvt := event.RemoteEvent{
Ref: *ref,
RemoteAddr: remoteAddr,
}

go func() {
updates := pull.Updates()
for {
select {
case update := <-updates:
go func() {
prog := event.RemoteEvent{
Ref: *ref,
RemoteAddr: remoteAddr,
Progress: update,
}
if err := c.events.Publish(ctx, event.ETRemoteClientPullVersionProgress, prog); err != nil {
progEvt.Progress = update
if err := c.events.Publish(ctx, event.ETRemoteClientPullVersionProgress, progEvt); err != nil {
log.Error("publishing %q event: %q", event.ETRemoteClientPullVersionProgress, err)
}
}()
Expand All @@ -617,10 +624,14 @@ func (c *client) pullDatasetVersion(ctx context.Context, ref *dsref.Ref, remoteA
}
}

return c.events.Publish(ctx, event.ETRemoteClientPullVersionCompleted, event.RemoteEvent{
Ref: *ref,
RemoteAddr: remoteAddr,
})
// set progress to 100%
// TODO (b5) - it'd be great if the dag package had a convenience function for
// this
for i := range progEvt.Progress {
progEvt.Progress[i] = 100
}

return c.events.Publish(ctx, event.ETRemoteClientPullVersionCompleted, progEvt)
}

// RemoveDataset requests a remote remove logbook data from an address
Expand Down
1 change: 0 additions & 1 deletion remote/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func TestDatasetPullPushDeleteFeedsPreviewHTTP(t *testing.T) {
)

progBuf := &bytes.Buffer{}
PrintProgressBarsOnPushPull(progBuf, tr.NodeB.Repo.Bus())

relRef := &dsref.Ref{Username: wbp.Username, Name: wbp.Name}
if _, err := cli.NewRemoteRefResolver(server.URL).ResolveRef(tr.Ctx, relRef); err != nil {
Expand Down

0 comments on commit afcb2f8

Please sign in to comment.