Skip to content

Commit

Permalink
feat(dsfs): compute & store stats component at save time
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Nov 17, 2020
1 parent a8f2977 commit 3ff3b75
Show file tree
Hide file tree
Showing 17 changed files with 378 additions and 136 deletions.
14 changes: 12 additions & 2 deletions base/body_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/qri-io/dataset"
"github.com/qri-io/qfs"
"github.com/qri-io/qri/base/dsfs"
)

func TestReadBody(t *testing.T) {
Expand Down Expand Up @@ -39,9 +38,20 @@ func TestReadBody(t *testing.T) {
}
}

// BaseTabularSchema is the base schema for tabular data
// NOTE: Do not use if possible, prefer github.com/qri-io/dataset/tabular
// TODO(dustmop): Possibly move this to tabular package
var BaseTabularSchema = map[string]interface{}{
"type": "array",
"items": map[string]interface{}{
"type": "array",
"items": []interface{}{},
},
}

func TestConvertBodyFormat(t *testing.T) {
jsonStructure := &dataset.Structure{Format: "json", Schema: dataset.BaseSchemaArray}
csvStructure := &dataset.Structure{Format: "csv", Schema: dsfs.BaseTabularSchema}
csvStructure := &dataset.Structure{Format: "csv", Schema: BaseTabularSchema}

// CSV -> JSON
body := qfs.NewMemfileBytes("", []byte("a,b,c"))
Expand Down
19 changes: 17 additions & 2 deletions base/dsfs/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,27 @@ const (
BodyTooBig = BodyAction("too_big")
)

// DerefCommit derferences a dataset's Commit element if required
// should be a no-op if ds.Structure is nil or isn't a reference
func DerefCommit(ctx context.Context, store qfs.Filesystem, ds *dataset.Dataset) error {
if ds.Commit != nil && ds.Commit.IsEmpty() && ds.Commit.Path != "" {
cm, err := loadCommit(ctx, store, ds.Commit.Path)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("loading dataset commit: %w", err)
}
cm.Path = ds.Commit.Path
ds.Commit = cm
}
return nil
}

// loadCommit assumes the provided path is valid
func loadCommit(ctx context.Context, fs qfs.Filesystem, path string) (st *dataset.Commit, err error) {
data, err := fileBytes(fs.Get(ctx, path))
if err != nil {
log.Debug(err.Error())
return nil, fmt.Errorf("error loading commit file: %s", err.Error())
return nil, fmt.Errorf("loading commit file: %s", err.Error())
}
return dataset.UnmarshalCommit(data)
}
Expand All @@ -51,7 +66,7 @@ func generateCommitTitleAndMessage(ctx context.Context, fs qfs.Filesystem, privK
shortTitle, longMessage, err := generateCommitDescriptions(ctx, fs, ds, prev, bodyAct, forceIfNoChanges)
if err != nil {
log.Debugf("generateCommitDescriptions err: %s", err)
return fmt.Errorf("error saving: %s", err)
return fmt.Errorf("error saving: %w", err)
}

if shortTitle == defaultCreatedDescription && fileHint != "" {
Expand Down
38 changes: 31 additions & 7 deletions base/dsfs/compute_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/qri-io/dataset"
"github.com/qri-io/dataset/dsio"
"github.com/qri-io/dataset/dsstats"
"github.com/qri-io/jsonschema"
"github.com/qri-io/qfs"
)
Expand All @@ -24,6 +25,9 @@ type computeFieldsFile struct {

ds, prev *dataset.Dataset

// body statistics accumulator
acc *dsstats.Accumulator

// buffer of entries for diffing small datasets. will be set to nil if
// body reads more than BodySizeSmallEnoughToDiff bytes
diffMessageBuf *dsio.EntryBuffer
Expand All @@ -39,6 +43,11 @@ type computeFieldsFile struct {
bytesRead int
}

var (
_ doneProcessingFile = (*computeFieldsFile)(nil)
_ statsComponentFile = (*computeFieldsFile)(nil)
)

func newComputeFieldsFile(ctx context.Context, dsLk *sync.Mutex, fs qfs.Filesystem, pk crypto.PrivKey, ds, prev *dataset.Dataset, sw SaveSwitches) (qfs.File, error) {
var (
bf = ds.BodyFile()
Expand Down Expand Up @@ -123,12 +132,23 @@ func (cff *computeFieldsFile) Close() error {
return nil
}

type doneProcessingFile interface {
DoneProcessing() <-chan error
}

func (cff *computeFieldsFile) DoneProcessing() <-chan error {
return cff.done
}

type doneProcessingFile interface {
DoneProcessing() <-chan error
type statsComponentFile interface {
StatsComponent() (*dataset.Stats, error)
}

func (cff *computeFieldsFile) StatsComponent() (*dataset.Stats, error) {
return &dataset.Stats{
Qri: dataset.KindStats.String(),
Stats: dsstats.ToMap(cff.acc),
}, nil
}

// , store cafs.Filestore, ds, prev *dataset.Dataset, bodyR io.Reader, pk crypto.PrivKey, sw SaveSwitches, done chan error
Expand All @@ -141,18 +161,19 @@ func (cff *computeFieldsFile) handleRows(ctx context.Context) {
depth = 0
)

cff.Lock()
// assign timestamp early. saving process on large files can take many minutes
cff.ds.Commit.Timestamp = Timestamp()
cff.Unlock()

r, err := dsio.NewEntryReader(st, cff.pipeReader)
if err != nil {
log.Debugf("creating entry reader: %s", err)
cff.done <- fmt.Errorf("creating entry reader: %w", err)
return
}

cff.Lock()
// assign timestamp early. saving process on large files can take many minutes
cff.ds.Commit.Timestamp = Timestamp()
cff.acc = dsstats.NewAccumulator(st)
cff.Unlock()

jsch, err := st.JSONSchema()
if err != nil {
cff.done <- err
Expand Down Expand Up @@ -188,6 +209,9 @@ func (cff *computeFieldsFile) handleRows(ctx context.Context) {
depth = d
}
entries++
if err := cff.acc.WriteEntry(ent); err != nil {
return err
}

if i%batchSize == 0 && i != 0 {
numValErrs, flushErr := cff.flushBatch(ctx, batchBuf, st, jsch)
Expand Down
126 changes: 25 additions & 101 deletions base/dsfs/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,118 +91,25 @@ func LoadDatasetRefs(ctx context.Context, fs qfs.Filesystem, path string) (*data
// DerefDataset attempts to fully dereference a dataset
func DerefDataset(ctx context.Context, store qfs.Filesystem, ds *dataset.Dataset) error {
log.Debugf("DerefDataset path=%q", ds.Path)
if err := DerefDatasetMeta(ctx, store, ds); err != nil {
if err := DerefMeta(ctx, store, ds); err != nil {
return err
}
if err := DerefDatasetStructure(ctx, store, ds); err != nil {
if err := DerefStructure(ctx, store, ds); err != nil {
return err
}
if err := DerefDatasetTransform(ctx, store, ds); err != nil {
if err := DerefTransform(ctx, store, ds); err != nil {
return err
}
if err := DerefDatasetViz(ctx, store, ds); err != nil {
if err := DerefViz(ctx, store, ds); err != nil {
return err
}
if err := DerefDatasetReadme(ctx, store, ds); err != nil {
if err := DerefReadme(ctx, store, ds); err != nil {
return err
}
return DerefDatasetCommit(ctx, store, ds)
}

// DerefDatasetStructure derferences a dataset's structure element if required
// should be a no-op if ds.Structure is nil or isn't a reference
func DerefDatasetStructure(ctx context.Context, store qfs.Filesystem, ds *dataset.Dataset) error {
if ds.Structure != nil && ds.Structure.IsEmpty() && ds.Structure.Path != "" {
st, err := loadStructure(ctx, store, ds.Structure.Path)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("error loading dataset structure: %s", err.Error())
}
// assign path to retain internal reference to path
// st.Assign(dataset.NewStructureRef(ds.Structure.Path))
ds.Structure = st
}
return nil
}

// DerefDatasetViz dereferences a dataset's Viz element if required
// should be a no-op if ds.Viz is nil or isn't a reference
func DerefDatasetViz(ctx context.Context, store qfs.Filesystem, ds *dataset.Dataset) error {
if ds.Viz != nil && ds.Viz.IsEmpty() && ds.Viz.Path != "" {
vz, err := loadViz(ctx, store, ds.Viz.Path)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("error loading dataset viz: %s", err.Error())
}
// assign path to retain internal reference to path
// vz.Assign(dataset.NewVizRef(ds.Viz.Path))
ds.Viz = vz
}
return nil
}

// DerefDatasetReadme dereferences a dataset's Readme element if required
// should be a no-op if ds.Readme is nil or isn't a reference
func DerefDatasetReadme(ctx context.Context, store qfs.Filesystem, ds *dataset.Dataset) error {
if ds.Readme != nil && ds.Readme.IsEmpty() && ds.Readme.Path != "" {
rm, err := loadReadme(ctx, store, ds.Readme.Path)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("error loading dataset readme: %s", err.Error())
}
// assign path to retain internal reference to path
// rm.Assign(dataset.NewVizRef(ds.Readme.Path))
ds.Readme = rm
}
return nil
}

// DerefDatasetTransform derferences a dataset's transform element if required
// should be a no-op if ds.Structure is nil or isn't a reference
func DerefDatasetTransform(ctx context.Context, store qfs.Filesystem, ds *dataset.Dataset) error {
if ds.Transform != nil && ds.Transform.IsEmpty() && ds.Transform.Path != "" {
t, err := loadTransform(ctx, store, ds.Transform.Path)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("error loading dataset transform: %s", err.Error())
}
// assign path to retain internal reference to path
// t.Assign(dataset.NewTransformRef(ds.Transform.Path))
ds.Transform = t
}
return nil
}

// DerefDatasetMeta derferences a dataset's transform element if required
// should be a no-op if ds.Structure is nil or isn't a reference
func DerefDatasetMeta(ctx context.Context, store qfs.Filesystem, ds *dataset.Dataset) error {
if ds.Meta != nil && ds.Meta.IsEmpty() && ds.Meta.Path != "" {
md, err := loadMeta(ctx, store, ds.Meta.Path)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("error loading dataset metadata: %s", err.Error())
}
// assign path to retain internal reference to path
// md.Assign(dataset.NewMetaRef(ds.Meta.Path))
ds.Meta = md
}
return nil
}

// DerefDatasetCommit derferences a dataset's Commit element if required
// should be a no-op if ds.Structure is nil or isn't a reference
func DerefDatasetCommit(ctx context.Context, store qfs.Filesystem, ds *dataset.Dataset) error {
if ds.Commit != nil && ds.Commit.IsEmpty() && ds.Commit.Path != "" {
cm, err := loadCommit(ctx, store, ds.Commit.Path)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("error loading dataset commit: %s", err.Error())
}
// assign path to retain internal reference to path
cm.Assign(dataset.NewCommitRef(ds.Commit.Path))
ds.Commit = cm
if err := DerefStats(ctx, store, ds); err != nil {
return err
}
return nil
return DerefCommit(ctx, store, ds)
}

// SaveSwitches represents options for saving a dataset
Expand Down Expand Up @@ -349,6 +256,21 @@ func buildFileGraph(fs qfs.Filesystem, ds *dataset.Dataset, privKey crypto.PrivK
files = append(files, stf)
}

// stats relies on a structure component & a body file
if statsCompFile, ok := bdf.(statsComponentFile); ok {
hook := func(ctx context.Context, f qfs.File, added map[string]string) (io.Reader, error) {
sa, err := statsCompFile.StatsComponent()
if err != nil {
return nil, err
}
ds.Stats = sa
return JSONFile(f.FullPath(), sa)
}

hookFile := qfs.NewWriteHookFile(qfs.NewMemfileBytes(PackageFileStats.Filename(), []byte{}), hook, PackageFileStructure.Filename())
files = append(files, hookFile)
}

if ds.Meta != nil {
ds.Meta.DropTransientValues()
mdf, err := JSONFile(PackageFileMeta.Filename(), ds.Meta)
Expand Down Expand Up @@ -454,6 +376,8 @@ func buildFileGraph(fs qfs.Filesystem, ds *dataset.Dataset, privKey crypto.PrivK
ds.Viz = dataset.NewVizRef(pathMap[comp])
case PackageFileMeta.Filename():
ds.Meta = dataset.NewMetaRef(pathMap[comp])
case PackageFileStats.Filename():
ds.Stats = dataset.NewStatsRef(pathMap[comp])
case bdf.FullPath():
ds.BodyPath = pathMap[comp]
}
Expand Down
33 changes: 22 additions & 11 deletions base/dsfs/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestCreateDataset(t *testing.T) {
err string
}{
{"invalid_reference",
"", nil, "error loading dataset commit: error loading commit file: path not found"},
"", nil, "loading dataset commit: loading commit file: path not found"},
{"invalid",
"", nil, "commit is required"},
{"strict_fail",
Expand Down Expand Up @@ -175,13 +175,13 @@ func TestCreateDataset(t *testing.T) {
repoFiles int // expected total count of files in repo after test execution
}{
{"cities",
"/mem/QmYUqpRqrxUvtXgJ3NnPafUmyShERR9WaqLojxsumvrYpo", nil, 7},
"/mem/QmPUMUixUxM1e6SKtgMaV7U5kvuy25W8yV4KvkcURd6LFg", nil, 8},
{"all_fields",
"/mem/QmVFJmziXeSsjByztA62dPpeGjLykAerP5uFC26Yj1o5CN", nil, 16},
"/mem/Qmcf46vxtuCsbMV4i9d2ifCJnMjEBHXturUyD2xUD6qrn9", nil, 18},
{"cities_no_commit_title",
"/mem/QmULA7AoxdWjEfrsdCNZgXRNXKJQfsQVrUHKWp1s1K1R6i", nil, 19},
"/mem/QmXFRBAWTBQZVJGZxtaCAsEYKRRQLcKZJhn5UPsQ2LoJLu", nil, 21},
{"craigslist",
"/mem/QmXVLv5BKuP1C5TgmFjxF51q6kbqd75CGrFcUMGutaDENQ", nil, 23},
"/mem/QmWm6rGimuUFXgw9CQ9p3fT3h9mCnAXkPr8PHM1dhJRASm", nil, 26},
}

for _, c := range good {
Expand All @@ -204,10 +204,10 @@ func TestCreateDataset(t *testing.T) {

if tc.Expect != nil {
if err := dataset.CompareDatasets(tc.Expect, ds); err != nil {
// expb, _ := json.Marshal(tc.Expect)
// fmt.Println(string(expb))
// dsb, _ := json.Marshal(ds)
// fmt.Println(string(dsb))
expb, _ := json.Marshal(tc.Expect)
fmt.Println(string(expb))
dsb, _ := json.Marshal(ds)
fmt.Println(string(dsb))
t.Errorf("dataset comparison error: %s", err.Error())
}
}
Expand Down Expand Up @@ -289,8 +289,8 @@ func TestCreateDataset(t *testing.T) {
t.Fatalf("CreateDataset expected error got 'nil'. commit: %v", ds.Commit)
}

if len(fs.Files) != 23 {
t.Errorf("invalid number of entries: %d != %d", 23, len(fs.Files))
if len(fs.Files) != 26 {
t.Errorf("invalid number of entries: %d != %d", 26, len(fs.Files))
_, err := fs.Print()
if err != nil {
panic(err)
Expand All @@ -301,6 +301,17 @@ func TestCreateDataset(t *testing.T) {
// case: previous dataset isn't valid
}

// BaseTabularSchema is the base schema for tabular data
// NOTE: Do not use if possible, prefer github.com/qri-io/dataset/tabular
// TODO(dustmop): Possibly move this to tabular package
var BaseTabularSchema = map[string]interface{}{
"type": "array",
"items": map[string]interface{}{
"type": "array",
"items": []interface{}{},
},
}

// Test that if the body is too large, the commit message just assumes the body changed
func TestCreateDatasetBodyTooLarge(t *testing.T) {
ctx := context.Background()
Expand Down
Loading

0 comments on commit 3ff3b75

Please sign in to comment.