Skip to content

Commit

Permalink
feat(save): emit save events. Print progress bars on save
Browse files Browse the repository at this point in the history
add save event publication to dsfs, move all progress bar output up into cmd
package and replace progress bar package with "mpb" with explicit support for
displaying multiple progress bars at once. To pull this off lib.Instance gets
a new .Bus() accessor method.

This setup brings progress bar output closer to the way desktop / API driven
events display progress.
  • Loading branch information
b5 committed Dec 8, 2020
1 parent 63c3d44 commit 3c979ed
Show file tree
Hide file tree
Showing 21 changed files with 337 additions and 121 deletions.
5 changes: 3 additions & 2 deletions base/archive/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/qri-io/qfs"
"github.com/qri-io/qri/base/dsfs"
testPeers "github.com/qri-io/qri/config/test"
"github.com/qri-io/qri/event"
)

func TestGenerateFilename(t *testing.T) {
Expand Down Expand Up @@ -132,7 +133,7 @@ func testFS() (qfs.Filesystem, map[string]string, error) {
ds.SetBodyFile(dataf)

fs := qfs.NewMemFS()
dskey, err := dsfs.WriteDataset(ctx, nil, fs, ds, pk, dsfs.SaveSwitches{})
dskey, err := dsfs.WriteDataset(ctx, nil, fs, event.NilBus, ds, pk, dsfs.SaveSwitches{})
if err != nil {
return fs, ns, err
}
Expand Down Expand Up @@ -179,7 +180,7 @@ func testFSWithVizAndTransform() (qfs.Filesystem, map[string]string, error) {
privKey := testPeers.GetTestPeerInfo(10).PrivKey

var dsLk sync.Mutex
dskey, err := dsfs.WriteDataset(ctx, &dsLk, st, ds, privKey, dsfs.SaveSwitches{Pin: true})
dskey, err := dsfs.WriteDataset(ctx, &dsLk, st, event.NilBus, ds, privKey, dsfs.SaveSwitches{Pin: true})
if err != nil {
return st, ns, err
}
Expand Down
9 changes: 8 additions & 1 deletion base/dsfs/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/qri-io/qfs"
"github.com/qri-io/qri/base/friendly"
"github.com/qri-io/qri/base/toqtype"
"github.com/qri-io/qri/event"
)

// Timestamp is an function for getting commit timestamps
Expand Down Expand Up @@ -62,13 +63,19 @@ func loadCommit(ctx context.Context, fs qfs.Filesystem, path string) (st *datase
return dataset.UnmarshalCommit(data)
}

func commitFileAddFunc(privKey crypto.PrivKey) addWriteFileFunc {
func commitFileAddFunc(privKey crypto.PrivKey, pub event.Publisher) addWriteFileFunc {
return func(ds *dataset.Dataset, wfs *writeFiles) error {
if ds.Commit == nil {
return nil
}

hook := func(ctx context.Context, f qfs.File, added map[string]string) (io.Reader, error) {
go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveProgress, event.DsSaveEvent{
Username: ds.Peername,
Name: ds.Name,
Message: "finalizing",
Completion: 0.9,
})

if cff, ok := wfs.body.(*computeFieldsFile); ok {
updateScriptPaths(ds, added)
Expand Down
25 changes: 22 additions & 3 deletions base/dsfs/compute_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/qri-io/dataset/dsstats"
"github.com/qri-io/jsonschema"
"github.com/qri-io/qfs"
"github.com/qri-io/qri/event"
)

type computeFieldsFile struct {
Expand Down Expand Up @@ -48,7 +49,15 @@ var (
_ statsComponentFile = (*computeFieldsFile)(nil)
)

func newComputeFieldsFile(ctx context.Context, dsLk *sync.Mutex, fs qfs.Filesystem, pk crypto.PrivKey, ds, prev *dataset.Dataset, sw SaveSwitches) (qfs.File, error) {
func newComputeFieldsFile(
ctx context.Context,
dsLk *sync.Mutex,
fs qfs.Filesystem,
pub event.Publisher,
pk crypto.PrivKey,
ds *dataset.Dataset,
prev *dataset.Dataset,
sw SaveSwitches) (qfs.File, error) {
var (
bf = ds.BodyFile()
bfPrev qfs.File
Expand Down Expand Up @@ -84,7 +93,7 @@ func newComputeFieldsFile(ctx context.Context, dsLk *sync.Mutex, fs qfs.Filesyst
done: make(chan error),
}

go cff.handleRows(ctx)
go cff.handleRows(ctx, pub)

return cff, nil
}
Expand Down Expand Up @@ -151,7 +160,7 @@ func (cff *computeFieldsFile) StatsComponent() (*dataset.Stats, error) {
}, nil
}

func (cff *computeFieldsFile) handleRows(ctx context.Context) {
func (cff *computeFieldsFile) handleRows(ctx context.Context, pub event.Publisher) {
var (
batchBuf *dsio.EntryBuffer
st = cff.ds.Structure
Expand Down Expand Up @@ -201,6 +210,16 @@ func (cff *computeFieldsFile) handleRows(ctx context.Context) {
return
}

// publish here so we know that if the user sees the "processing body file"
// message, we know that a compute-fields-file has made it all the way through
// setup
go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveProgress, event.DsSaveEvent{
Username: cff.ds.Peername,
Name: cff.ds.Name,
Message: "processing body file",
Completion: 0.1,
})

go func() {
err = dsio.EachEntry(r, func(i int, ent dsio.Entry, err error) error {
if err != nil {
Expand Down
43 changes: 38 additions & 5 deletions base/dsfs/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/qri-io/dataset"
"github.com/qri-io/dataset/validate"
"github.com/qri-io/qfs"
"github.com/qri-io/qri/event"
)

// number of entries to per batch when processing body data in WriteDataset
Expand Down Expand Up @@ -142,6 +143,7 @@ func CreateDataset(
ctx context.Context,
source qfs.Filesystem,
destination qfs.Filesystem,
pub event.Publisher,
ds *dataset.Dataset,
prev *dataset.Dataset,
pk crypto.PrivKey,
Expand Down Expand Up @@ -192,17 +194,34 @@ func CreateDataset(
}

// lock for editing dataset pointer
var dsLk = &sync.Mutex{}
var (
dsLk = &sync.Mutex{}
peername = ds.Peername
name = ds.Name
)

bodyFile, err := newComputeFieldsFile(ctx, dsLk, source, pk, ds, prev, sw)
bodyFile, err := newComputeFieldsFile(ctx, dsLk, source, pub, pk, ds, prev, sw)
if err != nil {
return "", err
}
ds.SetBodyFile(bodyFile)

path, err := WriteDataset(ctx, dsLk, destination, ds, pk, sw)
go event.PublishLogError(ctx, pub, log, event.ETDatasetSaveStarted, event.DsSaveEvent{
Username: peername,
Name: name,
Message: "save started",
Completion: 0,
})

path, err := WriteDataset(ctx, dsLk, destination, pub, ds, pk, sw)
if err != nil {
log.Debug(err.Error())
event.PublishLogError(ctx, pub, log, event.ETDatasetSaveCompleted, event.DsSaveEvent{
Username: peername,
Name: name,
Error: err,
Completion: 1.0,
})
return "", err
}

Expand All @@ -211,16 +230,30 @@ func CreateDataset(
// the caller doesn't use the ds arg afterward
// might make sense to have a wrapper function that writes and loads on success
if err := DerefDataset(ctx, destination, ds); err != nil {
event.PublishLogError(ctx, pub, log, event.ETDatasetSaveCompleted, event.DsSaveEvent{
Username: peername,
Name: name,
Error: err,
Completion: 1.0,
})
return path, err
}
return path, nil

return path, pub.Publish(ctx, event.ETDatasetSaveCompleted, event.DsSaveEvent{
Username: peername,
Name: name,
Message: "dataset saved",
Path: path,
Completion: 1.0,
})
}

// WriteDataset persists a datasets to a destination filesystem
func WriteDataset(
ctx context.Context,
dsLk *sync.Mutex,
destination qfs.Filesystem,
pub event.Publisher,
ds *dataset.Dataset,
pk crypto.PrivKey,
sw SaveSwitches,
Expand All @@ -240,7 +273,7 @@ func WriteDataset(
addStatsFile,
addReadmeFile,
vizFilesAddFunc(destination, sw),
commitFileAddFunc(pk),
commitFileAddFunc(pk, pub),
addDatasetFile,
}

Expand Down
65 changes: 54 additions & 11 deletions base/dsfs/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/qri-io/dataset"
"github.com/qri-io/dataset/dsio"
"github.com/qri-io/dataset/dstest"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/qri-io/qfs"
"github.com/qri-io/qri/base/toqtype"
testPeers "github.com/qri-io/qri/config/test"
"github.com/qri-io/qri/event"
)

func TestLoadDataset(t *testing.T) {
Expand Down Expand Up @@ -47,7 +49,7 @@ func TestLoadDataset(t *testing.T) {
info := testPeers.GetTestPeerInfo(10)
pk := info.PrivKey

apath, err := WriteDataset(ctx, &sync.Mutex{}, fs, ds, pk, SaveSwitches{})
apath, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, ds, pk, SaveSwitches{})
if err != nil {
t.Errorf(err.Error())
return
Expand Down Expand Up @@ -156,7 +158,7 @@ func TestCreateDataset(t *testing.T) {
t.Fatalf("creating test case: %s", err)
}

_, err = CreateDataset(ctx, fs, fs, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true})
_, err = CreateDataset(ctx, fs, fs, event.NilBus, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true})
if err == nil {
t.Fatalf("CreateDataset expected error. got nil")
}
Expand Down Expand Up @@ -189,7 +191,7 @@ func TestCreateDataset(t *testing.T) {
t.Fatalf("creating test case: %s", err)
}

path, err := CreateDataset(ctx, fs, fs, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true})
path, err := CreateDataset(ctx, fs, fs, event.NilBus, tc.Input, c.prev, privKey, SaveSwitches{ShouldRender: true})
if err != nil {
t.Fatalf("CreateDataset: %s", err)
}
Expand Down Expand Up @@ -218,7 +220,7 @@ func TestCreateDataset(t *testing.T) {
}

t.Run("no_priv_key", func(t *testing.T) {
_, err := CreateDataset(ctx, fs, fs, nil, nil, nil, SaveSwitches{ShouldRender: true})
_, err := CreateDataset(ctx, fs, fs, event.NilBus, nil, nil, nil, SaveSwitches{ShouldRender: true})
if err == nil {
t.Fatal("expected call without prvate key to error")
}
Expand All @@ -242,7 +244,7 @@ func TestCreateDataset(t *testing.T) {
t.Errorf("case nil body and previous body files, error reading data file: %s", err.Error())
}
expectedErr := "bodyfile or previous bodyfile needed"
_, err = CreateDataset(ctx, fs, fs, ds, nil, privKey, SaveSwitches{ShouldRender: true})
_, err = CreateDataset(ctx, fs, fs, event.NilBus, ds, nil, privKey, SaveSwitches{ShouldRender: true})
if err.Error() != expectedErr {
t.Errorf("case nil body and previous body files, error mismatch: expected '%s', got '%s'", expectedErr, err.Error())
}
Expand Down Expand Up @@ -272,7 +274,7 @@ func TestCreateDataset(t *testing.T) {
}
ds.SetBodyFile(qfs.NewMemfileBytes("body.csv", bodyBytes))

path, err := CreateDataset(ctx, fs, fs, ds, dsPrev, privKey, SaveSwitches{ShouldRender: true})
path, err := CreateDataset(ctx, fs, fs, event.NilBus, ds, dsPrev, privKey, SaveSwitches{ShouldRender: true})
if err != nil && err.Error() != expectedErr {
t.Fatalf("mismatch: expected %q, got %q", expectedErr, err.Error())
} else if err == nil {
Expand Down Expand Up @@ -312,7 +314,7 @@ func TestDatasetSaveCustomTimestamp(t *testing.T) {
}
ds.SetBodyFile(qfs.NewMemfileBytes("/body.json", []byte(`[]`)))

path, err := CreateDataset(ctx, fs, fs, ds, nil, privKey, SaveSwitches{})
path, err := CreateDataset(ctx, fs, fs, event.NilBus, ds, nil, privKey, SaveSwitches{})
if err != nil {
t.Fatal(err)
}
Expand All @@ -324,6 +326,47 @@ func TestDatasetSaveCustomTimestamp(t *testing.T) {
}
}

func TestDatasetSaveEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fs := qfs.NewMemFS()
privKey := testPeers.GetTestPeerInfo(10).PrivKey
bus := event.NewBus(ctx)

fired := map[event.Type]int{}
bus.Subscribe(func(ctx context.Context, t event.Type, payload interface{}) error {
fired[t]++
return nil
},
event.ETDatasetSaveStarted,
event.ETDatasetSaveProgress,
event.ETDatasetSaveCompleted,
)

ds := &dataset.Dataset{
Commit: &dataset.Commit{
Timestamp: time.Date(2100, 1, 2, 3, 4, 5, 6, time.Local),
},
Structure: &dataset.Structure{Format: "json", Schema: dataset.BaseSchemaArray},
}
ds.SetBodyFile(qfs.NewMemfileBytes("/body.json", []byte(`[]`)))

if _, err := CreateDataset(ctx, fs, fs, bus, ds, nil, privKey, SaveSwitches{}); err != nil {
t.Fatal(err)
}

expect := map[event.Type]int{
event.ETDatasetSaveStarted: 1,
event.ETDatasetSaveProgress: 2,
event.ETDatasetSaveCompleted: 1,
}

if diff := cmp.Diff(expect, fired); diff != "" {
t.Errorf("fired event count mismatch. (-want +got):%s\n", diff)
}
}

// BaseTabularSchema is the base schema for tabular data
// NOTE: Do not use if possible, prefer github.com/qri-io/dataset/tabular
// TODO(dustmop): Possibly move this to tabular package
Expand Down Expand Up @@ -374,7 +417,7 @@ func TestCreateDatasetBodyTooLarge(t *testing.T) {
}
nextDs.SetBodyFile(qfs.NewMemfileBytes(testBodyPath, testBodyBytes))

path, err := CreateDataset(ctx, fs, fs, &nextDs, &prevDs, privKey, SaveSwitches{ShouldRender: true})
path, err := CreateDataset(ctx, fs, fs, event.NilBus, &nextDs, &prevDs, privKey, SaveSwitches{ShouldRender: true})
if err != nil {
t.Fatalf("CreateDataset: %s", err)
}
Expand Down Expand Up @@ -403,7 +446,7 @@ func TestWriteDataset(t *testing.T) {
info := testPeers.GetTestPeerInfo(10)
pk := info.PrivKey

if _, err := WriteDataset(ctx, &sync.Mutex{}, fs, &dataset.Dataset{}, pk, SaveSwitches{Pin: true}); err == nil || err.Error() != "cannot save empty dataset" {
if _, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, &dataset.Dataset{}, pk, SaveSwitches{Pin: true}); err == nil || err.Error() != "cannot save empty dataset" {
t.Errorf("didn't reject empty dataset: %s", err)
}

Expand All @@ -426,7 +469,7 @@ func TestWriteDataset(t *testing.T) {

ds := tc.Input

got, err := WriteDataset(ctx, &sync.Mutex{}, fs, ds, pk, SaveSwitches{Pin: true})
got, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, ds, pk, SaveSwitches{Pin: true})
if !(err == nil && c.err == "" || err != nil && err.Error() == c.err) {
t.Errorf("case %d error mismatch. expected: '%s', got: '%s'", i, c.err, err)
continue
Expand Down Expand Up @@ -1011,7 +1054,7 @@ func BenchmarkCreateDatasetCSV(b *testing.B) {
_, dataset := GenerateDataset(b, sampleSize, "csv")

b.StartTimer()
_, err := CreateDataset(ctx, fs, fs, dataset, nil, privKey, SaveSwitches{ShouldRender: true})
_, err := CreateDataset(ctx, fs, fs, event.NilBus, dataset, nil, privKey, SaveSwitches{ShouldRender: true})
if err != nil {
b.Errorf("error creating dataset: %s", err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion base/dsfs/testdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/qri-io/dataset"
"github.com/qri-io/qfs"
testPeers "github.com/qri-io/qri/config/test"
"github.com/qri-io/qri/event"
)

var AirportCodes = &dataset.Dataset{
Expand Down Expand Up @@ -178,7 +179,7 @@ func makeFilestore() (map[string]string, qfs.Filesystem, error) {

ds.SetBodyFile(qfs.NewMemfileBytes(fmt.Sprintf("/body.%s", ds.Structure.Format), data))

dskey, err := WriteDataset(ctx, &sync.Mutex{}, fs, ds, pk, SaveSwitches{Pin: true})
dskey, err := WriteDataset(ctx, &sync.Mutex{}, fs, event.NilBus, ds, pk, SaveSwitches{Pin: true})
if err != nil {
return datasets, nil, fmt.Errorf("dataset: %s write error: %s", k, err.Error())
}
Expand Down
Loading

0 comments on commit 3c979ed

Please sign in to comment.