Skip to content

Commit

Permalink
refactor(save): large refactoring for saving datasets
Browse files Browse the repository at this point in the history
Merge pull request #592 from qri-io/refactor-save
  • Loading branch information
b5 authored Oct 30, 2018
2 parents 070766f + b93b9bd commit 8764ddf
Show file tree
Hide file tree
Showing 19 changed files with 469 additions and 402 deletions.
10 changes: 8 additions & 2 deletions actions/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ func addCitiesDataset(t *testing.T, node *p2p.QriNode) repo.DatasetRef {
if err != nil {
t.Fatal(err.Error())
}
dsp := tc.Input.Encode()
dsp.Name = tc.Name
dsp.BodyBytes = tc.Body

ref, _, err := SaveDataset(node, tc.Name, tc.Input, tc.BodyFile(), nil, false, true)
ref, _, err := SaveDataset(node, dsp, false, true)
if err != nil {
t.Fatal(err.Error())
}
Expand All @@ -78,8 +81,11 @@ func addFlourinatedCompoundsDataset(t *testing.T, node *p2p.QriNode) repo.Datase
if err != nil {
t.Fatal(err.Error())
}
dsp := tc.Input.Encode()
dsp.Name = tc.Name
dsp.BodyBytes = tc.Body

ref, _, err := SaveDataset(node, tc.Name, tc.Input, tc.BodyFile(), nil, false, true)
ref, _, err := SaveDataset(node, dsp, false, true)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
291 changes: 20 additions & 271 deletions actions/dataset.go
Original file line number Diff line number Diff line change
@@ -1,244 +1,27 @@
package actions

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"strings"

"github.com/ipfs/go-datastore"
"github.com/qri-io/cafs"
"github.com/qri-io/dataset"
"github.com/qri-io/dataset/detect"
"github.com/qri-io/dataset/dsfs"
"github.com/qri-io/dataset/validate"
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
"github.com/qri-io/varName"
)

// TODO: Move this to core.PrepareDatasetNew

// NewDataset processes dataset input into it's necessary components for creation
func NewDataset(dsp *dataset.DatasetPod) (ds *dataset.Dataset, body cafs.File, secrets map[string]string, err error) {
if dsp == nil {
err = fmt.Errorf("dataset is required")
return
}

if dsp.BodyPath == "" && dsp.BodyBytes == nil && dsp.Transform == nil {
err = fmt.Errorf("either dataBytes, bodyPath, or a transform is required to create a dataset")
return
}

if dsp.Transform != nil {
secrets = dsp.Transform.Secrets
}

ds = &dataset.Dataset{}
if err = ds.Decode(dsp); err != nil {
err = fmt.Errorf("decoding dataset: %s", err.Error())
return
}

if ds.Commit == nil {
ds.Commit = &dataset.Commit{
Title: "created dataset",
}
} else if ds.Commit.Title == "" {
ds.Commit.Title = "created dataset"
}

// open a data file if we can
if body, err = base.DatasetPodBodyFile(dsp); err == nil {
// defer body.Close()

// validate / generate dataset name
if dsp.Name == "" {
dsp.Name = varName.CreateVarNameFromString(body.FileName())
}
if e := validate.ValidName(dsp.Name); e != nil {
err = fmt.Errorf("invalid name: %s", e.Error())
return
}

// read structure from InitParams, or detect from data
if ds.Structure == nil && ds.Transform == nil {
// use a TeeReader that writes to a buffer to preserve data
buf := &bytes.Buffer{}
tr := io.TeeReader(body, buf)
var df dataset.DataFormat

df, err = detect.ExtensionDataFormat(body.FileName())
if err != nil {
log.Debug(err.Error())
err = fmt.Errorf("invalid data format: %s", err.Error())
return
}

ds.Structure, _, err = detect.FromReader(df, tr)
if err != nil {
log.Debug(err.Error())
err = fmt.Errorf("determining dataset schema: %s", err.Error())
return
}
// glue whatever we just read back onto the reader
body = cafs.NewMemfileReader(body.FileName(), io.MultiReader(buf, body))
}

// Ensure that dataset structure is valid
if err = validate.Dataset(ds); err != nil {
log.Debug(err.Error())
err = fmt.Errorf("invalid dataset: %s", err.Error())
return
}

// NOTE - if we have a data file, this overrides any transformation,
// so we need to remove the transform to avoid having the data appear to be
// the result of a transform process
ds.Transform = nil

} else if err.Error() == "not found" {
err = nil
} else {
return
}

return
}

// TODO: Move this to core.PrepareDatasetUpdate

// UpdateDataset prepares a set of changes for submission to SaveDataset
func UpdateDataset(node *p2p.QriNode, dsp *dataset.DatasetPod) (ds *dataset.Dataset, body cafs.File, secrets map[string]string, err error) {
ds = &dataset.Dataset{}
updates := &dataset.Dataset{}

if dsp == nil {
err = fmt.Errorf("dataset is required")
return
}
if dsp.Name == "" || dsp.Peername == "" {
err = fmt.Errorf("peername & name are required to update dataset")
return
}

if dsp.Transform != nil {
secrets = dsp.Transform.Secrets
}

if err = updates.Decode(dsp); err != nil {
err = fmt.Errorf("decoding dataset: %s", err.Error())
return
}

prev := &repo.DatasetRef{Name: dsp.Name, Peername: dsp.Peername}
if err = repo.CanonicalizeDatasetRef(node.Repo, prev); err != nil {
err = fmt.Errorf("error with previous reference: %s", err.Error())
return
}

if err = DatasetHead(node, prev); err != nil {
err = fmt.Errorf("error getting previous dataset: %s", err.Error())
return
}

if dsp.BodyBytes != nil || dsp.BodyPath != "" {
if body, err = base.DatasetPodBodyFile(dsp); err != nil {
return
}
} else {
// load data cause we need something to compare the structure to
// prevDs := &dataset.Dataset{}
// if err = prevDs.Decode(prev.Dataset); err != nil {
// err = fmt.Errorf("error decoding previous dataset: %s", err)
// return
// }
}

prevds, err := prev.DecodeDataset()
if err != nil {
err = fmt.Errorf("error decoding dataset: %s", err.Error())
return
}

// add all previous fields and any changes
ds.Assign(prevds, updates)
ds.PreviousPath = prev.Path

// ds.Assign clobbers empty commit messages with the previous
// commit message, reassign with updates
if updates.Commit == nil {
updates.Commit = &dataset.Commit{}
}
ds.Commit.Title = updates.Commit.Title
ds.Commit.Message = updates.Commit.Message

// TODO - this is so bad. fix. currently createDataset expects paths to
// local files, so we're just making them up on the spot.
if ds.Transform != nil && ds.Transform.ScriptPath[:len("/ipfs/")] == "/ipfs/" {
tfScript, e := node.Repo.Store().Get(datastore.NewKey(ds.Transform.ScriptPath))
if e != nil {
err = e
return
}

f, e := ioutil.TempFile("", "transform.sky")
if e != nil {
err = e
return
}
if _, e := io.Copy(f, tfScript); err != nil {
err = e
return
}
ds.Transform.ScriptPath = f.Name()
}
if ds.Viz != nil && ds.Viz.ScriptPath[:len("/ipfs/")] == "/ipfs/" {
vizScript, e := node.Repo.Store().Get(datastore.NewKey(ds.Viz.ScriptPath))
if e != nil {
err = e
return
}

f, e := ioutil.TempFile("", "viz.html")
if e != nil {
err = e
return
}
if _, e := io.Copy(f, vizScript); err != nil {
err = e
return
}
ds.Viz.ScriptPath = f.Name()
}

// Assign will assign any previous paths to the current paths
// the dsdiff (called in dsfs.CreateDataset), will compare the paths
// see that they are the same, and claim there are no differences
// since we will potentially have changes in the Meta and Structure
// we want the differ to have to compare each field
// so we reset the paths
if ds.Meta != nil {
ds.Meta.SetPath("")
}
if ds.Structure != nil {
ds.Structure.SetPath("")
}

return
}

// TODO: Move this to core.CreateDataset

// SaveDataset initializes a dataset from a dataset pointer and data file
func SaveDataset(node *p2p.QriNode, name string, ds *dataset.Dataset, data cafs.File, secrets map[string]string, dryRun, pin bool) (ref repo.DatasetRef, body cafs.File, err error) {
func SaveDataset(node *p2p.QriNode, dsp *dataset.DatasetPod, dryRun, pin bool) (ref repo.DatasetRef, body cafs.File, err error) {
var (
r = node.Repo
pro *profile.Profile
ds *dataset.Dataset
bodyFile cafs.File
secrets map[string]string
// NOTE - struct fields need to be instantiated to make assign set to
// new pointer values
userSet = &dataset.Dataset{
Expand All @@ -250,69 +33,35 @@ func SaveDataset(node *p2p.QriNode, name string, ds *dataset.Dataset, data cafs.
}
)

if dryRun {
// dry-runs store to an in-memory repo
node.LocalStreams.Print("🏃🏽‍♀️ dry run\n")
}

pro, err = r.Profile()
if err != nil {
return
// Determine if the save is creating a new dataset or updating an existing dataset by
// seeing if the name can canonicalize to a repo that we know about
lookup := &repo.DatasetRef{Name: dsp.Name, Peername: dsp.Peername}
err = repo.CanonicalizeDatasetRef(node.Repo, lookup)
if err == repo.ErrNotFound {
ds, bodyFile, secrets, err = base.PrepareDatasetNew(dsp)
if err != nil {
return
}
} else {
ds, bodyFile, secrets, err = base.PrepareDatasetSave(node.Repo, dsp)
if err != nil {
return
}
}

userSet.Assign(ds)

if ds.Commit != nil {
// NOTE: add author ProfileID here to keep the dataset package agnostic to
// all identity stuff except keypair crypto
ds.Commit.Author = &dataset.User{ID: pro.ID.String()}
}

if ds.Transform != nil {
node.LocalStreams.Print("🤖 executing transform\n")
data, err = ExecTransform(node, ds, data, secrets)
bodyFile, err = ExecTransform(node, ds, bodyFile, secrets)
if err != nil {
return
}
node.LocalStreams.Print("✅ transform complete\n")
ds.Assign(userSet)
}

if err = base.PrepareViz(ds); err != nil {
return
}

if dryRun {
r, err = repo.NewMemRepo(pro, cafs.NewMapstore(), profile.NewMemStore(), nil)
if err != nil {
return
}
// TODO - memRepo needs to be able to load a previous dataset from our actual repo
// memRepo should be able to wrap another repo & check that before returning not found
}

if ref, err = base.CreateDataset(r, name, ds, data, pin); err != nil {
return
}

if err = r.LogEvent(repo.ETDsCreated, ref); err != nil {
return
}

_, storeIsPinner := r.Store().(cafs.Pinner)
if pin && storeIsPinner {
r.LogEvent(repo.ETDsPinned, ref)
}

if err = base.ReadDataset(r, &ref); err != nil {
return
}

if body, err = r.Store().Get(datastore.NewKey(ref.Dataset.BodyPath)); err != nil {
fmt.Println("error getting from store:", err.Error())
}

return
return base.CreateDataset(node.Repo, node.LocalStreams, dsp.Name, ds, bodyFile, dryRun, pin)
}

// AddDataset fetches & pins a dataset to the store, adding it to the list of stored refs
Expand Down
Loading

0 comments on commit 8764ddf

Please sign in to comment.