Skip to content

Commit

Permalink
Merge pull request #1624 from qri-io/b5/xform-ds-preview
Browse files Browse the repository at this point in the history
feat(transform): transform step emits dataset event on completion, rework tf event constants
  • Loading branch information
b5 authored Jan 28, 2021
2 parents 73a8d83 + 28bb8b0 commit 552f07b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 21 deletions.
17 changes: 10 additions & 7 deletions event/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,30 @@ package event
const (
// ETTransformStart signals the start a transform execution
// Payload will be a TransformLifecycle
ETTransformStart = Type("transform:Start")
ETTransformStart = Type("tf:Start")
// ETTransformStop signals the completion of a transform execution
// Payload will be a TransformLifecycle
ETTransformStop = Type("transform:Stop")
ETTransformStop = Type("tf:Stop")

// ETTransformStepStart signals a step is starting.
// Payload will be a StepDetail
ETTransformStepStart = Type("transform:StepStart")
ETTransformStepStart = Type("tf:StepStart")
// ETTransformStepStop signals a step has stopped.
// Payload will be a StepDetail
ETTransformStepStop = Type("transform:StepStop")
ETTransformStepStop = Type("tf:StepStop")
// ETTransformStepSkip signals a step was skipped.
// Payload will be a StepDetail
ETTransformStepSkip = Type("transform:StepSkip")
ETTransformStepSkip = Type("tf:StepSkip")

// ETTransformPrint is sent by print commands.
// Payload will be a Message
ETTransformPrint = Type("transform:Print")
ETTransformPrint = Type("tf:Print")
// ETTransformError is for when a tranform program execution error occurs.
// Payload will be a Message
ETTransformError = Type("transform:Error")
ETTransformError = Type("tf:Error")
// ETTransformDatasetPreview is an abbreviated dataset document in a transform
// Payload will be a *dataset.Dataset Preview
ETTransformDatasetPreview = Type("tf:DatasetPreview")
)

// TransformLifecycle captures state about the execution of an entire transform
Expand Down
4 changes: 1 addition & 3 deletions lib/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/qri-io/dataset"
"github.com/qri-io/qri/base"
Expand Down Expand Up @@ -89,8 +88,7 @@ func (m *TransformMethods) Apply(p *ApplyParams, res *ApplyResult) error {
// allocate an ID for the transform, for now just log the events it produces
runID := transform.NewRunID()
m.inst.bus.SubscribeID(func(ctx context.Context, e event.Event) error {
when := time.Unix(e.Timestamp/1000000000, e.Timestamp%1000000000)
log.Infof("[%s] event %s: %s", when, e.Type, e.Payload)
log.Debugw("apply transform event", "type", e.Type, "payload", e.Payload)
if e.Type == event.ETTransformPrint {
if msg, ok := e.Payload.(event.TransformMessage); ok {
io.WriteString(p.ScriptOutput, msg.Msg)
Expand Down
24 changes: 23 additions & 1 deletion transform/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package transform
import (
"bytes"
"context"
"encoding/json"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/qri-io/dataset"
"github.com/qri-io/ioes"
"github.com/qri-io/qri/dsref"
Expand Down Expand Up @@ -35,6 +37,7 @@ func TestApply(t *testing.T) {
{Type: event.ETTransformStepStart, Payload: event.TransformStepLifecycle{Category: "download"}},
{Type: event.ETTransformStepStop, Payload: event.TransformStepLifecycle{Category: "download", Status: StatusSucceeded}},
{Type: event.ETTransformStepStart, Payload: event.TransformStepLifecycle{Category: "transform"}},
{Type: event.ETTransformDatasetPreview, Payload: threeStepDatasetPreview},
{Type: event.ETTransformStepStop, Payload: event.TransformStepLifecycle{Category: "transform", Status: StatusSucceeded}},
{Type: event.ETTransformStop, Payload: event.TransformLifecycle{Status: StatusSucceeded}},
},
Expand Down Expand Up @@ -111,7 +114,26 @@ func compareEventLogs(t *testing.T, expect, log []event.Event) {
return false
}
}
if diff := cmp.Diff(expect, log, cmp.FilterPath(ignorePaths, cmp.Ignore())); diff != "" {
ignoreUnexported := cmpopts.IgnoreUnexported(
dataset.Dataset{},
dataset.Transform{},
)
if diff := cmp.Diff(expect, log, cmp.FilterPath(ignorePaths, cmp.Ignore()), ignoreUnexported); diff != "" {
t.Errorf("result mismatch (-want +got):\n%s", diff)
}
}

var threeStepDatasetPreview = &dataset.Dataset{
Body: json.RawMessage(`[[1,2,3]]`),
Structure: &dataset.Structure{
Format: "json",
Schema: map[string]interface{}{"type": "array"},
},
Transform: &dataset.Transform{
Steps: []*dataset.TransformStep{
{Syntax: "starlark", Category: "setup", Script: `print("oh, hello!")`},
{Syntax: "starlark", Category: "download", Script: "def download(ctx):\n\treturn"},
{Syntax: "starlark", Category: "transform", Script: "def transform(ds, ctx):\n\tds.set_body([[1,2,3]])"},
},
},
}
20 changes: 13 additions & 7 deletions transform/startf/exec_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

golog "github.com/ipfs/go-log"
"github.com/qri-io/dataset"
"github.com/qri-io/dataset/preview"
"github.com/qri-io/qfs"
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/dsref"
Expand Down Expand Up @@ -97,14 +98,14 @@ func (r *StepRunner) RunStep(ctx context.Context, ds *dataset.Dataset, st *datas
r.globals[key] = val
}

if err := r.callStepFunc(r.thread, st.Category, ds); err != nil {
if err := r.callStepFunc(ctx, r.thread, st.Category, ds); err != nil {
return err
}

return nil
}

func (r *StepRunner) callStepFunc(thread *starlark.Thread, stepType string, ds *dataset.Dataset) error {
func (r *StepRunner) callStepFunc(ctx context.Context, thread *starlark.Thread, stepType string, ds *dataset.Dataset) error {
if stepType == "setup" {
return nil
}
Expand All @@ -118,7 +119,7 @@ func (r *StepRunner) callStepFunc(thread *starlark.Thread, stepType string, ds *
case "download":
return r.callDownloadFunc(thread, stepFunc)
case "transform":
return r.callTransformFunc(thread, stepFunc, ds)
return r.callTransformFunc(ctx, thread, stepFunc, ds)
default:
return fmt.Errorf("unrecognized starlark step type %q", stepType)
}
Expand Down Expand Up @@ -151,24 +152,29 @@ func (r *StepRunner) callDownloadFunc(thread *starlark.Thread, download *starlar
return nil
}

func (r *StepRunner) callTransformFunc(thread *starlark.Thread, transform *starlark.Function, ds *dataset.Dataset) (err error) {
func (r *StepRunner) callTransformFunc(ctx context.Context, thread *starlark.Thread, transform *starlark.Function, ds *dataset.Dataset) (err error) {
d := skyds.NewDataset(r.prev, r.checkFunc)
d.SetMutable(ds)
if _, err = starlark.Call(thread, transform, starlark.Tuple{d.Methods(), r.starCtx.Struct()}, nil); err != nil {
return err
}

// TODO (b5) - this should happen in ds.set_body method call
if f := ds.BodyFile(); f != nil {
if ds.Structure == nil {
if err := base.InferStructure(ds); err != nil {
log.Debugw("inferring structure", "err", err)
return err
}
}
if err := base.InlineJSONBody(ds); err != nil {
log.Debugw("inlining resulting dataset JSON body", "err", err)
}

if r.eventsCh != nil {
pview, err := preview.CreatePreview(ctx, ds)
if err != nil {
return err
}
ds.SetBodyFile(qfs.NewMemfileBytes("body.json", ds.BodyBytes))
r.eventsCh <- event.Event{Type: event.ETTransformDatasetPreview, Payload: pview}
}

return nil
Expand Down
5 changes: 2 additions & 3 deletions transform/startf/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package startf
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -141,14 +140,14 @@ func TestExecStep(t *testing.T) {
t.Fatal(err)
}
// Check that body was set by the transform step.
data, err := json.Marshal(ds.Body)
data, err := ioutil.ReadAll(ds.BodyFile())
if err != nil {
t.Fatal(err)
}
actual := string(data)
expect := `[[1,2,3]]`
if actual != expect {
t.Errorf("expected: \"%s\", actual: \"%s\"", expect, actual)
t.Errorf("expected: %q, actual: %q", expect, actual)
}
}

Expand Down

0 comments on commit 552f07b

Please sign in to comment.