Skip to content

Commit

Permalink
feat(transform): execute transformations with skylark langauge
Browse files Browse the repository at this point in the history
HO SNAP IT EFFING WORKS
  • Loading branch information
b5 committed May 24, 2018
1 parent 31b1ad6 commit f684229
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
GOFILES = $(shell find . -name '*.go' -not -path './vendor/*')
GOPACKAGES = github.com/briandowns/spinner github.com/datatogether/api/apiutil github.com/fatih/color github.com/ipfs/go-datastore github.com/olekukonko/tablewriter github.com/qri-io/analytics github.com/qri-io/bleve github.com/qri-io/dataset github.com/qri-io/doggos github.com/qri-io/dsdiff github.com/qri-io/varName github.com/qri-io/registry/regclient github.com/sergi/go-diff/diffmatchpatch github.com/sirupsen/logrus github.com/spf13/cobra github.com/spf13/cobra/doc github.com/theckman/go-flock github.com/ugorji/go/codec github.com/beme/abide
GOPACKAGES = github.com/briandowns/spinner github.com/datatogether/api/apiutil github.com/fatih/color github.com/ipfs/go-datastore github.com/olekukonko/tablewriter github.com/qri-io/analytics github.com/qri-io/skytf github.com/qri-io/bleve github.com/qri-io/dataset github.com/qri-io/doggos github.com/qri-io/dsdiff github.com/qri-io/varName github.com/qri-io/registry/regclient github.com/sergi/go-diff/diffmatchpatch github.com/sirupsen/logrus github.com/spf13/cobra github.com/spf13/cobra/doc github.com/theckman/go-flock github.com/ugorji/go/codec github.com/beme/abide

default: build

Expand Down
6 changes: 3 additions & 3 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func TestServerRoutes(t *testing.T) {

// get dataset
{"GET", "/me/family_relationships", "", "getResponseFamilyRelationships.json", 200},
{"GET", "/me/family_relationships/at/map/QmRvu8tNk1ChFmhpczooykvAM4BCLCb4Ss55orpyWYBcoA", "", "getResponseFamilyRelationships.json", 200},
{"GET", "/at/map/QmRvu8tNk1ChFmhpczooykvAM4BCLCb4Ss55orpyWYBcoA", "", "getResponseFamilyRelationships.json", 200},
{"GET", "/me/family_relationships/at/map/QmeSBfrDgFShkfjHJevhSe3zo6L9beTxqrxruzFfUqpNVv", "", "getResponseFamilyRelationships.json", 200},
{"GET", "/at/map/QmeSBfrDgFShkfjHJevhSe3zo6L9beTxqrxruzFfUqpNVv", "", "getResponseFamilyRelationships.json", 200},

{"POST", "/rename", "renameRequest.json", "renameResponse.json", 200},

Expand All @@ -133,7 +133,7 @@ func TestServerRoutes(t *testing.T) {

// remove
{"POST", "/remove/me/cities/at/map/QmbYtNU53DDjYWKSdYHmRz1CUnJhqH5jZ4Aa1ovNm323ib", "", "removeResponseWithPath.json", 200},
{"POST", "/remove/at/map/QmRvu8tNk1ChFmhpczooykvAM4BCLCb4Ss55orpyWYBcoA", "", "removeResponseByPath.json", 200},
{"POST", "/remove/at/map/QmeSBfrDgFShkfjHJevhSe3zo6L9beTxqrxruzFfUqpNVv", "", "removeResponseByPath.json", 200},

{"GET", "/connect/", "", "", 400},

Expand Down
2 changes: 1 addition & 1 deletion api/testdata/addResponseFromURL.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"data":{"commit":{"path":"/map/QmT6ypopcWyjNXRBrFD7kocyh6JoCfoqPVSTm96FJNhHZo","qri":"cm:0","signature":"jAYxX3k/1sCYtEDzvIl9C07cW51o/jfHVI8Q9pEzDghMjGq9XXsZ4Sn2ClBLoJqEPDYLAyEYw/NwQ7yX4vNkTXBsvLHYj/JkRrfQ2O8fOqXaotj/nb2E4IE5HizHvTMV8+S6OyeLu+b2iiYMeB1zFmQlwxBeMyvsHrsFJd+CLQmXJUizaXUYv0h2Z2P/x4+8jBn8SdWFSjvNJ2H1UmDDwRuLnBUofoTsQBe7jFwXWhT83jN0R3eAafmOiWxZ00/rYytLCztk7R/GFMUkCK4gtEs30JWJAr+xE5suPra20c60aTvz0ALFnwnxt/HuJ/z4iaJVaRPrQSE0kGNRNprKmQ==","timestamp":"2001-01-01T01:01:01.000000001Z","title":"created dataset"},"dataPath":"/map/QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","meta":{"accrualPeriodicity":"R/P1W","downloadPath":"http://insight.dev.schoolwires.com/HelpAssets/C2Assets/C2Files/C2ImportFamRelSample.csv","qri":"md:0"},"path":"/map/QmRvu8tNk1ChFmhpczooykvAM4BCLCb4Ss55orpyWYBcoA","qri":"ds:0","structure":{"checksum":"QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","errCount":0,"entries":2,"format":"csv","formatConfig":{"headerRow":true},"length":60,"path":"/map/QmeZTVqjXK4ypanPMKd5F84yFKBhMtcqCBhFWkyS8VjUD7","qri":"st:0","schema":{"items":{"items":[{"title":"parent_identifier","type":"integer"},{"title":"student_identifier","type":"integer"}],"type":"array"},"type":"array"}}},"meta":{"code":200}}
{"data":{"commit":{"path":"/map/QmT6ypopcWyjNXRBrFD7kocyh6JoCfoqPVSTm96FJNhHZo","qri":"cm:0","signature":"jAYxX3k/1sCYtEDzvIl9C07cW51o/jfHVI8Q9pEzDghMjGq9XXsZ4Sn2ClBLoJqEPDYLAyEYw/NwQ7yX4vNkTXBsvLHYj/JkRrfQ2O8fOqXaotj/nb2E4IE5HizHvTMV8+S6OyeLu+b2iiYMeB1zFmQlwxBeMyvsHrsFJd+CLQmXJUizaXUYv0h2Z2P/x4+8jBn8SdWFSjvNJ2H1UmDDwRuLnBUofoTsQBe7jFwXWhT83jN0R3eAafmOiWxZ00/rYytLCztk7R/GFMUkCK4gtEs30JWJAr+xE5suPra20c60aTvz0ALFnwnxt/HuJ/z4iaJVaRPrQSE0kGNRNprKmQ==","timestamp":"2001-01-01T01:01:01.000000001Z","title":"created dataset"},"dataPath":"/map/QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","path":"/map/QmeSBfrDgFShkfjHJevhSe3zo6L9beTxqrxruzFfUqpNVv","qri":"ds:0","structure":{"checksum":"QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","errCount":0,"entries":2,"format":"csv","formatConfig":{"headerRow":true},"length":60,"path":"/map/QmeZTVqjXK4ypanPMKd5F84yFKBhMtcqCBhFWkyS8VjUD7","qri":"st:0","schema":{"items":{"items":[{"title":"parent_identifier","type":"integer"},{"title":"student_identifier","type":"integer"}],"type":"array"},"type":"array"}}},"meta":{"code":200}}
2 changes: 1 addition & 1 deletion api/testdata/getResponseFamilyRelationships.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"data":{"peername":"peer","profileID":"QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt","name":"family_relationships","path":"/map/QmRvu8tNk1ChFmhpczooykvAM4BCLCb4Ss55orpyWYBcoA","dataset":{"commit":{"path":"/map/QmT6ypopcWyjNXRBrFD7kocyh6JoCfoqPVSTm96FJNhHZo","qri":"cm:0","signature":"jAYxX3k/1sCYtEDzvIl9C07cW51o/jfHVI8Q9pEzDghMjGq9XXsZ4Sn2ClBLoJqEPDYLAyEYw/NwQ7yX4vNkTXBsvLHYj/JkRrfQ2O8fOqXaotj/nb2E4IE5HizHvTMV8+S6OyeLu+b2iiYMeB1zFmQlwxBeMyvsHrsFJd+CLQmXJUizaXUYv0h2Z2P/x4+8jBn8SdWFSjvNJ2H1UmDDwRuLnBUofoTsQBe7jFwXWhT83jN0R3eAafmOiWxZ00/rYytLCztk7R/GFMUkCK4gtEs30JWJAr+xE5suPra20c60aTvz0ALFnwnxt/HuJ/z4iaJVaRPrQSE0kGNRNprKmQ==","timestamp":"2001-01-01T01:01:01.000000001Z","title":"created dataset"},"dataPath":"/map/QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","meta":{"accrualPeriodicity":"R/P1W","downloadPath":"http://insight.dev.schoolwires.com/HelpAssets/C2Assets/C2Files/C2ImportFamRelSample.csv","qri":"md:0"},"path":"/map/QmRvu8tNk1ChFmhpczooykvAM4BCLCb4Ss55orpyWYBcoA","qri":"ds:0","structure":{"checksum":"QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","errCount":0,"entries":2,"format":"csv","formatConfig":{"headerRow":true},"length":60,"path":"/map/QmeZTVqjXK4ypanPMKd5F84yFKBhMtcqCBhFWkyS8VjUD7","qri":"st:0","schema":{"items":{"items":[{"title":"parent_identifier","type":"integer"},{"title":"student_identifier","type":"integer"}],"type":"array"},"type":"array"}}}},"meta":{"code":200}}
{"data":{"peername":"peer","profileID":"QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt","name":"family_relationships","path":"/map/QmeSBfrDgFShkfjHJevhSe3zo6L9beTxqrxruzFfUqpNVv","dataset":{"commit":{"path":"/map/QmT6ypopcWyjNXRBrFD7kocyh6JoCfoqPVSTm96FJNhHZo","qri":"cm:0","signature":"jAYxX3k/1sCYtEDzvIl9C07cW51o/jfHVI8Q9pEzDghMjGq9XXsZ4Sn2ClBLoJqEPDYLAyEYw/NwQ7yX4vNkTXBsvLHYj/JkRrfQ2O8fOqXaotj/nb2E4IE5HizHvTMV8+S6OyeLu+b2iiYMeB1zFmQlwxBeMyvsHrsFJd+CLQmXJUizaXUYv0h2Z2P/x4+8jBn8SdWFSjvNJ2H1UmDDwRuLnBUofoTsQBe7jFwXWhT83jN0R3eAafmOiWxZ00/rYytLCztk7R/GFMUkCK4gtEs30JWJAr+xE5suPra20c60aTvz0ALFnwnxt/HuJ/z4iaJVaRPrQSE0kGNRNprKmQ==","timestamp":"2001-01-01T01:01:01.000000001Z","title":"created dataset"},"dataPath":"/map/QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","path":"/map/QmeSBfrDgFShkfjHJevhSe3zo6L9beTxqrxruzFfUqpNVv","qri":"ds:0","structure":{"checksum":"QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","errCount":0,"entries":2,"format":"csv","formatConfig":{"headerRow":true},"length":60,"path":"/map/QmeZTVqjXK4ypanPMKd5F84yFKBhMtcqCBhFWkyS8VjUD7","qri":"st:0","schema":{"items":{"items":[{"title":"parent_identifier","type":"integer"},{"title":"student_identifier","type":"integer"}],"type":"array"},"type":"array"}}}},"meta":{"code":200}}
2 changes: 1 addition & 1 deletion api/testdata/removeResponseByPath.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"data":{"peername":"peer","profileID":"QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt","name":"family_relationships","path":"/map/QmRvu8tNk1ChFmhpczooykvAM4BCLCb4Ss55orpyWYBcoA","dataset":{"commit":{"path":"/map/QmT6ypopcWyjNXRBrFD7kocyh6JoCfoqPVSTm96FJNhHZo","qri":"cm:0","signature":"jAYxX3k/1sCYtEDzvIl9C07cW51o/jfHVI8Q9pEzDghMjGq9XXsZ4Sn2ClBLoJqEPDYLAyEYw/NwQ7yX4vNkTXBsvLHYj/JkRrfQ2O8fOqXaotj/nb2E4IE5HizHvTMV8+S6OyeLu+b2iiYMeB1zFmQlwxBeMyvsHrsFJd+CLQmXJUizaXUYv0h2Z2P/x4+8jBn8SdWFSjvNJ2H1UmDDwRuLnBUofoTsQBe7jFwXWhT83jN0R3eAafmOiWxZ00/rYytLCztk7R/GFMUkCK4gtEs30JWJAr+xE5suPra20c60aTvz0ALFnwnxt/HuJ/z4iaJVaRPrQSE0kGNRNprKmQ==","timestamp":"2001-01-01T01:01:01.000000001Z","title":"created dataset"},"dataPath":"/map/QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","meta":{"accrualPeriodicity":"R/P1W","downloadPath":"http://insight.dev.schoolwires.com/HelpAssets/C2Assets/C2Files/C2ImportFamRelSample.csv","qri":"md:0"},"path":"/map/QmRvu8tNk1ChFmhpczooykvAM4BCLCb4Ss55orpyWYBcoA","qri":"ds:0","structure":{"checksum":"QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","errCount":0,"entries":2,"format":"csv","formatConfig":{"headerRow":true},"length":60,"path":"/map/QmeZTVqjXK4ypanPMKd5F84yFKBhMtcqCBhFWkyS8VjUD7","qri":"st:0","schema":{"items":{"items":[{"title":"parent_identifier","type":"integer"},{"title":"student_identifier","type":"integer"}],"type":"array"},"type":"array"}}}},"meta":{"code":200}}
{"data":{"peername":"peer","profileID":"QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt","name":"family_relationships","path":"/map/QmeSBfrDgFShkfjHJevhSe3zo6L9beTxqrxruzFfUqpNVv","dataset":{"commit":{"path":"/map/QmT6ypopcWyjNXRBrFD7kocyh6JoCfoqPVSTm96FJNhHZo","qri":"cm:0","signature":"jAYxX3k/1sCYtEDzvIl9C07cW51o/jfHVI8Q9pEzDghMjGq9XXsZ4Sn2ClBLoJqEPDYLAyEYw/NwQ7yX4vNkTXBsvLHYj/JkRrfQ2O8fOqXaotj/nb2E4IE5HizHvTMV8+S6OyeLu+b2iiYMeB1zFmQlwxBeMyvsHrsFJd+CLQmXJUizaXUYv0h2Z2P/x4+8jBn8SdWFSjvNJ2H1UmDDwRuLnBUofoTsQBe7jFwXWhT83jN0R3eAafmOiWxZ00/rYytLCztk7R/GFMUkCK4gtEs30JWJAr+xE5suPra20c60aTvz0ALFnwnxt/HuJ/z4iaJVaRPrQSE0kGNRNprKmQ==","timestamp":"2001-01-01T01:01:01.000000001Z","title":"created dataset"},"dataPath":"/map/QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","path":"/map/QmeSBfrDgFShkfjHJevhSe3zo6L9beTxqrxruzFfUqpNVv","qri":"ds:0","structure":{"checksum":"QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG","errCount":0,"entries":2,"format":"csv","formatConfig":{"headerRow":true},"length":60,"path":"/map/QmeZTVqjXK4ypanPMKd5F84yFKBhMtcqCBhFWkyS8VjUD7","qri":"st:0","schema":{"items":{"items":[{"title":"parent_identifier","type":"integer"},{"title":"student_identifier","type":"integer"}],"type":"array"},"type":"array"}}}},"meta":{"code":200}}
18 changes: 9 additions & 9 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
func Execute() {
// Catch errors & pretty-print.
// comment this out to get stack traces back.
defer func() {
if r := recover(); r != nil {
if err, ok := r.(error); ok {
fmt.Println(err.Error())
} else {
fmt.Println(r)
}
}
}()
// defer func() {
// if r := recover(); r != nil {
// if err, ok := r.(error); ok {
// fmt.Println(err.Error())
// } else {
// fmt.Println(r)
// }
// }
// }()

if err := RootCmd.Execute(); err != nil {
printErr(err)
Expand Down
77 changes: 41 additions & 36 deletions core/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ func (r *DatasetRequests) Init(p *SaveParams, res *repo.DatasetRef) (err error)
return r.cli.Call("DatasetRequests.Init", p, res)
}

var dataFile cafs.File

if p.Private {
return fmt.Errorf("option to make dataset private not yet implimented, refer to https://github.com/qri-io/qri/issues/291 for updates")
}
Expand All @@ -280,16 +282,10 @@ func (r *DatasetRequests) Init(p *SaveParams, res *repo.DatasetRef) (err error)
return fmt.Errorf("dataset is required")
}

if dsp.DataPath == "" && dsp.DataBytes == nil {
return fmt.Errorf("either dataBytes or dataPath is required to create a dataset")
if dsp.DataPath == "" && dsp.DataBytes == nil && dsp.Transform == nil {
return fmt.Errorf("either dataBytes, dataPath, or a transform is required to create a dataset")
}

dataFile, err := repo.DatasetPodDataFile(dsp)
if err != nil {
return err
}
defer dataFile.Close()

ds := &dataset.Dataset{}
if err = ds.Decode(dsp); err != nil {
return fmt.Errorf("decoding dataset: %s", err.Error())
Expand All @@ -303,40 +299,49 @@ func (r *DatasetRequests) Init(p *SaveParams, res *repo.DatasetRef) (err error)
ds.Commit.Title = "created dataset"
}

// validate / generate dataset name
if dsp.Name == "" {
dsp.Name = varName.CreateVarNameFromString(dataFile.FileName())
}
if err := validate.ValidName(dsp.Name); err != nil {
return fmt.Errorf("invalid name: %s", err.Error())
}

// read structure from InitParams, or detect from data
if ds.Structure == nil {
// use a TeeReader that writes to a buffer to preserve data
buf := &bytes.Buffer{}
tr := io.TeeReader(dataFile, buf)
var df dataset.DataFormat
if dsp.Transform == nil {

df, err = detect.ExtensionDataFormat(dataFile.FileName())
dataFile, err = repo.DatasetPodDataFile(dsp)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("invalid data format: %s", err.Error())
return err
}
defer dataFile.Close()

ds.Structure, _, err = detect.FromReader(df, tr)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("determining dataset schema: %s", err.Error())
// validate / generate dataset name
if dsp.Name == "" {
dsp.Name = varName.CreateVarNameFromString(dataFile.FileName())
}
if err := validate.ValidName(dsp.Name); err != nil {
return fmt.Errorf("invalid name: %s", err.Error())
}
// glue whatever we just read back onto the reader
dataFile = cafs.NewMemfileReader(dataFile.FileName(), io.MultiReader(buf, dataFile))
}

// Ensure that dataset structure is valid
if err = validate.Dataset(ds); err != nil {
log.Debug(err.Error())
return fmt.Errorf("invalid dataset: %s", err.Error())
// read structure from InitParams, or detect from data
if ds.Structure == nil && ds.Transform == nil {
// use a TeeReader that writes to a buffer to preserve data
buf := &bytes.Buffer{}
tr := io.TeeReader(dataFile, buf)
var df dataset.DataFormat

df, err = detect.ExtensionDataFormat(dataFile.FileName())
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("invalid data format: %s", err.Error())
}

ds.Structure, _, err = detect.FromReader(df, tr)
if err != nil {
log.Debug(err.Error())
return fmt.Errorf("determining dataset schema: %s", err.Error())
}
// glue whatever we just read back onto the reader
dataFile = cafs.NewMemfileReader(dataFile.FileName(), io.MultiReader(buf, dataFile))
}

// Ensure that dataset structure is valid
if err = validate.Dataset(ds); err != nil {
log.Debug(err.Error())
return fmt.Errorf("invalid dataset: %s", err.Error())
}
}

// TODO - this relies on repo graph calculations, which are temporarily disabled b/c bugs.
Expand Down
2 changes: 1 addition & 1 deletion core/datasets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestDatasetRequestsInit(t *testing.T) {
err string
}{
{nil, nil, "dataset is required"},
{&dataset.DatasetPod{}, nil, "either dataBytes or dataPath is required to create a dataset"},
{&dataset.DatasetPod{}, nil, "either dataBytes, dataPath, or a transform is required to create a dataset"},
{&dataset.DatasetPod{DataPath: "/bad/path"}, nil, "opening file: open /bad/path: no such file or directory"},
{&dataset.DatasetPod{DataPath: jobsDataPath, Commit: &dataset.CommitPod{Qri: "qri:st"}}, nil, "decoding dataset: invalid commit 'qri' value: qri:st"},
{&dataset.DatasetPod{DataPath: "http://localhost:999999/bad/url"}, nil, "fetching data url: Get http://localhost:999999/bad/url: dial tcp: address 999999: invalid port"},
Expand Down
9 changes: 9 additions & 0 deletions repo/actions/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ func (act Dataset) CreateDataset(name string, ds *dataset.Dataset, data cafs.Fil
return
}

if ds.Transform != nil {
log.Info("running transformation...")
data, err = act.ExecTransform(ds, ds.Transform.Data)
if err != nil {
return
}
log.Info("done")
}

path, err = dsfs.CreateDataset(act.Store(), ds, data, act.PrivateKey(), pin)
if err != nil {
return
Expand Down
81 changes: 81 additions & 0 deletions repo/actions/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package actions

import (
"fmt"
"os"

"github.com/qri-io/cafs"
"github.com/qri-io/dataset"
"github.com/qri-io/dataset/dsio"
"github.com/qri-io/qri/repo"
"github.com/qri-io/skytf"
)

// ExecTransform executes a designated transformation
func (act Dataset) ExecTransform(ds *dataset.Dataset, filepath string) (file cafs.File, err error) {
rr, err := skytf.ExecFile(ds, filepath)
if err != nil {
return nil, err
}

st := &dataset.Structure{
Format: dataset.JSONDataFormat,
Schema: ds.Structure.Schema,
}

buf, err := dsio.NewEntryBuffer(st)
if err != nil {
return nil, fmt.Errorf("error allocating result buffer: %s", err)
}

for {
val, err := rr.ReadEntry()
if err != nil {
if err.Error() == "EOF" {
err = nil
break
}
return nil, fmt.Errorf("row iteration error: %s", err.Error())
}
if err := buf.WriteEntry(val); err != nil {
return nil, fmt.Errorf("error writing value to buffer: %s", err.Error())
}
}

if err := buf.Close(); err != nil {
return nil, fmt.Errorf("error closing row buffer: %s", err.Error())
}

// Add skylark file to store
f, err := os.Open(filepath)
if err != nil {
return nil, err
}

// TODO - currently this is just pinning to the repo, this should *not* be pinned,
// instead it should be added to the dataset merkleDAG
tfPath, err := act.Repo.Store().Put(cafs.NewMemfileReader("transform.sky", f), true)
if err != nil {
return nil, err
}

// TODO - clean up events to handle this situation
ref := repo.DatasetRef{
Dataset: &dataset.DatasetPod{
Transform: &dataset.TransformPod{
Syntax: "skylark",
Data: tfPath.String(),
},
},
}

if err = act.LogEvent(repo.ETTransformExecuted, ref); err != nil {
return
}

fmt.Println(string(buf.Bytes()))

ds.Structure = st
ds.Transform.Data = tfPath.String()
return cafs.NewMemfileBytes(fmt.Sprintf("data.%s", st.Format.String()), buf.Bytes()), nil
}
2 changes: 2 additions & 0 deletions repo/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
ETDsUnpinned = EventType("ds_unpinned")
// ETDsAdded represents adding a reference to another peer's dataset to their node
ETDsAdded = EventType("ds_added")
// ETTransformExecuted represents running a transformation
ETTransformExecuted = EventType("tf_executed")
)

// MemEventLog is an in-memory implementation of the
Expand Down

0 comments on commit f684229

Please sign in to comment.