diff --git a/event/transform.go b/event/transform.go index 121cc7f65..4d6169dc0 100644 --- a/event/transform.go +++ b/event/transform.go @@ -3,27 +3,30 @@ package event const ( // ETTransformStart signals the start a transform execution // Payload will be a TransformLifecycle - ETTransformStart = Type("transform:Start") + ETTransformStart = Type("tf:Start") // ETTransformStop signals the completion of a transform execution // Payload will be a TransformLifecycle - ETTransformStop = Type("transform:Stop") + ETTransformStop = Type("tf:Stop") // ETTransformStepStart signals a step is starting. // Payload will be a StepDetail - ETTransformStepStart = Type("transform:StepStart") + ETTransformStepStart = Type("tf:StepStart") // ETTransformStepStop signals a step has stopped. // Payload will be a StepDetail - ETTransformStepStop = Type("transform:StepStop") + ETTransformStepStop = Type("tf:StepStop") // ETTransformStepSkip signals a step was skipped. // Payload will be a StepDetail - ETTransformStepSkip = Type("transform:StepSkip") + ETTransformStepSkip = Type("tf:StepSkip") // ETTransformPrint is sent by print commands. // Payload will be a Message - ETTransformPrint = Type("transform:Print") + ETTransformPrint = Type("tf:Print") // ETTransformError is for when a tranform program execution error occurs. // Payload will be a Message - ETTransformError = Type("transform:Error") + ETTransformError = Type("tf:Error") + // ETTransformDatasetPreview is an abbreviated dataset document in a transform + // Payload will be a *dataset.Dataset Preview + ETTransformDatasetPreview = Type("tf:DatasetPreview") ) // TransformLifecycle captures state about the execution of an entire transform diff --git a/lib/transform.go b/lib/transform.go index 0373e0add..5980b62ac 100644 --- a/lib/transform.go +++ b/lib/transform.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "time" "github.com/qri-io/dataset" "github.com/qri-io/qri/base" @@ -89,8 +88,7 @@ func (m *TransformMethods) Apply(p *ApplyParams, res *ApplyResult) error { // allocate an ID for the transform, for now just log the events it produces runID := transform.NewRunID() m.inst.bus.SubscribeID(func(ctx context.Context, e event.Event) error { - when := time.Unix(e.Timestamp/1000000000, e.Timestamp%1000000000) - log.Infof("[%s] event %s: %s", when, e.Type, e.Payload) + log.Debugw("apply transform event", "type", e.Type, "payload", e.Payload) if e.Type == event.ETTransformPrint { if msg, ok := e.Payload.(event.TransformMessage); ok { io.WriteString(p.ScriptOutput, msg.Msg) diff --git a/transform/apply_test.go b/transform/apply_test.go index 7bcd03d75..f2cc543b7 100644 --- a/transform/apply_test.go +++ b/transform/apply_test.go @@ -3,9 +3,11 @@ package transform import ( "bytes" "context" + "encoding/json" "testing" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/qri-io/dataset" "github.com/qri-io/ioes" "github.com/qri-io/qri/dsref" @@ -35,6 +37,7 @@ func TestApply(t *testing.T) { {Type: event.ETTransformStepStart, Payload: event.TransformStepLifecycle{Category: "download"}}, {Type: event.ETTransformStepStop, Payload: event.TransformStepLifecycle{Category: "download", Status: StatusSucceeded}}, {Type: event.ETTransformStepStart, Payload: event.TransformStepLifecycle{Category: "transform"}}, + {Type: event.ETTransformDatasetPreview, Payload: threeStepDatasetPreview}, {Type: event.ETTransformStepStop, Payload: event.TransformStepLifecycle{Category: "transform", Status: StatusSucceeded}}, {Type: event.ETTransformStop, Payload: event.TransformLifecycle{Status: StatusSucceeded}}, }, @@ -111,7 +114,26 @@ func compareEventLogs(t *testing.T, expect, log []event.Event) { return false } } - if diff := cmp.Diff(expect, log, cmp.FilterPath(ignorePaths, cmp.Ignore())); diff != "" { + ignoreUnexported := cmpopts.IgnoreUnexported( + dataset.Dataset{}, + dataset.Transform{}, + ) + if diff := cmp.Diff(expect, log, cmp.FilterPath(ignorePaths, cmp.Ignore()), ignoreUnexported); diff != "" { t.Errorf("result mismatch (-want +got):\n%s", diff) } } + +var threeStepDatasetPreview = &dataset.Dataset{ + Body: json.RawMessage(`[[1,2,3]]`), + Structure: &dataset.Structure{ + Format: "json", + Schema: map[string]interface{}{"type": "array"}, + }, + Transform: &dataset.Transform{ + Steps: []*dataset.TransformStep{ + {Syntax: "starlark", Category: "setup", Script: `print("oh, hello!")`}, + {Syntax: "starlark", Category: "download", Script: "def download(ctx):\n\treturn"}, + {Syntax: "starlark", Category: "transform", Script: "def transform(ds, ctx):\n\tds.set_body([[1,2,3]])"}, + }, + }, +} diff --git a/transform/startf/exec_step.go b/transform/startf/exec_step.go index aba9ccb9b..92216b094 100644 --- a/transform/startf/exec_step.go +++ b/transform/startf/exec_step.go @@ -7,6 +7,7 @@ import ( golog "github.com/ipfs/go-log" "github.com/qri-io/dataset" + "github.com/qri-io/dataset/preview" "github.com/qri-io/qfs" "github.com/qri-io/qri/base" "github.com/qri-io/qri/dsref" @@ -97,14 +98,14 @@ func (r *StepRunner) RunStep(ctx context.Context, ds *dataset.Dataset, st *datas r.globals[key] = val } - if err := r.callStepFunc(r.thread, st.Category, ds); err != nil { + if err := r.callStepFunc(ctx, r.thread, st.Category, ds); err != nil { return err } return nil } -func (r *StepRunner) callStepFunc(thread *starlark.Thread, stepType string, ds *dataset.Dataset) error { +func (r *StepRunner) callStepFunc(ctx context.Context, thread *starlark.Thread, stepType string, ds *dataset.Dataset) error { if stepType == "setup" { return nil } @@ -118,7 +119,7 @@ func (r *StepRunner) callStepFunc(thread *starlark.Thread, stepType string, ds * case "download": return r.callDownloadFunc(thread, stepFunc) case "transform": - return r.callTransformFunc(thread, stepFunc, ds) + return r.callTransformFunc(ctx, thread, stepFunc, ds) default: return fmt.Errorf("unrecognized starlark step type %q", stepType) } @@ -151,13 +152,14 @@ func (r *StepRunner) callDownloadFunc(thread *starlark.Thread, download *starlar return nil } -func (r *StepRunner) callTransformFunc(thread *starlark.Thread, transform *starlark.Function, ds *dataset.Dataset) (err error) { +func (r *StepRunner) callTransformFunc(ctx context.Context, thread *starlark.Thread, transform *starlark.Function, ds *dataset.Dataset) (err error) { d := skyds.NewDataset(r.prev, r.checkFunc) d.SetMutable(ds) if _, err = starlark.Call(thread, transform, starlark.Tuple{d.Methods(), r.starCtx.Struct()}, nil); err != nil { return err } + // TODO (b5) - this should happen in ds.set_body method call if f := ds.BodyFile(); f != nil { if ds.Structure == nil { if err := base.InferStructure(ds); err != nil { @@ -165,10 +167,14 @@ func (r *StepRunner) callTransformFunc(thread *starlark.Thread, transform *starl return err } } - if err := base.InlineJSONBody(ds); err != nil { - log.Debugw("inlining resulting dataset JSON body", "err", err) + } + + if r.eventsCh != nil { + pview, err := preview.CreatePreview(ctx, ds) + if err != nil { + return err } - ds.SetBodyFile(qfs.NewMemfileBytes("body.json", ds.BodyBytes)) + r.eventsCh <- event.Event{Type: event.ETTransformDatasetPreview, Payload: pview} } return nil diff --git a/transform/startf/transform_test.go b/transform/startf/transform_test.go index d1f6f7ed8..b2a21eb06 100644 --- a/transform/startf/transform_test.go +++ b/transform/startf/transform_test.go @@ -3,7 +3,6 @@ package startf import ( "bytes" "context" - "encoding/json" "fmt" "io/ioutil" "net/http" @@ -141,14 +140,14 @@ func TestExecStep(t *testing.T) { t.Fatal(err) } // Check that body was set by the transform step. - data, err := json.Marshal(ds.Body) + data, err := ioutil.ReadAll(ds.BodyFile()) if err != nil { t.Fatal(err) } actual := string(data) expect := `[[1,2,3]]` if actual != expect { - t.Errorf("expected: \"%s\", actual: \"%s\"", expect, actual) + t.Errorf("expected: %q, actual: %q", expect, actual) } }