Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(execute): use random dataset ids instead of ones hashed from the name #3434

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions execute/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,22 @@ func (id DatasetID) IsZero() bool {
return id == ZeroDatasetID
}

// DatasetIDFromNodeID constructs a DatasetID from
// the plan.NodeID.
//
// This method is used to create **consistent** DatasetID
// values from a plan.NodeID. If two nodes have the same id,
// they will have the same DatasetID.
func DatasetIDFromNodeID(id plan.NodeID) DatasetID {
return DatasetID(uuid.NewV5(uuid.UUID{}, string(id)))
}

// RandomDatasetID will construct a new random DatasetID.
func RandomDatasetID() (DatasetID, error) {
did, err := uuid.NewV4()
return DatasetID(did), err
}

type dataset struct {
id DatasetID

Expand All @@ -85,6 +97,10 @@ func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *datase
}
}

func (d *dataset) ID() DatasetID {
return d.id
}

func (d *dataset) AddTransformation(t Transformation) {
d.ts = append(d.ts, t)
}
Expand Down Expand Up @@ -184,6 +200,10 @@ func NewPassthroughDataset(id DatasetID) *PassthroughDataset {
return &PassthroughDataset{id: id}
}

func (d *PassthroughDataset) ID() DatasetID {
return d.id
}

func (d *PassthroughDataset) AddTransformation(t Transformation) {
d.ts = append(d.ts, t)
}
Expand Down
10 changes: 7 additions & 3 deletions execute/executetest/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func RandomDatasetID() execute.DatasetID {
}

type Dataset struct {
ID execute.DatasetID
DatasetID execute.DatasetID
Retractions []flux.GroupKey
ProcessingTimeUpdates []execute.Time
WatermarkUpdates []execute.Time
Expand All @@ -27,10 +27,14 @@ type Dataset struct {

func NewDataset(id execute.DatasetID) *Dataset {
return &Dataset{
ID: id,
DatasetID: id,
}
}

func (d *Dataset) ID() execute.DatasetID {
return d.DatasetID
}

func (d *Dataset) AddTransformation(t execute.Transformation) {
panic("not implemented")
}
Expand Down Expand Up @@ -83,7 +87,7 @@ func TransformationPassThroughTestHelper(t *testing.T, newTr NewTransformation)
tr.Finish(parentID, nil)

exp := &Dataset{
ID: d.ID,
DatasetID: d.DatasetID,
ProcessingTimeUpdates: []execute.Time{now},
WatermarkUpdates: []execute.Time{now},
Finished: true,
Expand Down
35 changes: 23 additions & 12 deletions execute/executetest/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"testing"

uuid "github.com/gofrs/uuid"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux"
"github.com/influxdata/flux/dependencies/dependenciestest"
Expand All @@ -24,17 +23,29 @@ const FromTestKind = "from-test"
// It simulates the execution of a basic physical scan operation.
type FromProcedureSpec struct {
execute.ExecutionNode
id execute.DatasetID
data []*Table
ts []execute.Transformation
}

// NewFromProcedureSpec specifies a from-test procedure with source data
func NewFromProcedureSpec(data []*Table) *FromProcedureSpec {
// uuid.NewV4 can return an error because of enthropy. We will stick with the previous
// behavior of panicing on errors when creating new uuid's
id, err := execute.RandomDatasetID()
if err != nil {
panic(err)
}

// Normalize data before anything can read it
for _, tbl := range data {
tbl.Normalize()
}
return &FromProcedureSpec{data: data}
return &FromProcedureSpec{id: id, data: data}
}

func (src *FromProcedureSpec) ID() execute.DatasetID {
return src.id
}

func (src *FromProcedureSpec) Kind() plan.ProcedureKind {
Expand All @@ -54,27 +65,23 @@ func (src *FromProcedureSpec) AddTransformation(t execute.Transformation) {
}

func (src *FromProcedureSpec) Run(ctx context.Context) {
// uuid.NewV4 can return an error because of enthropy. We will stick with the previous
// behavior of panicing on errors when creating new uuid's
id := execute.DatasetID(uuid.Must(uuid.NewV4()))

if len(src.ts) == 0 {
return
} else if len(src.ts) == 1 {
t := src.ts[0]

var max execute.Time
for _, tbl := range src.data {
t.Process(id, tbl)
t.Process(src.id, tbl)
stopIdx := execute.ColIdx(execute.DefaultStopColLabel, tbl.Cols())
if stopIdx >= 0 {
if s := tbl.Key().ValueTime(stopIdx); s > max {
max = s
}
}
}
t.UpdateWatermark(id, max)
t.Finish(id, nil)
t.UpdateWatermark(src.id, max)
t.Finish(src.id, nil)
return
}

Expand All @@ -94,16 +101,16 @@ func (src *FromProcedureSpec) Run(ctx context.Context) {
for _, t := range src.ts {
var max execute.Time
for _, tbl := range buffers {
t.Process(id, tbl.Copy())
t.Process(src.id, tbl.Copy())
stopIdx := execute.ColIdx(execute.DefaultStopColLabel, tbl.Cols())
if stopIdx >= 0 {
if s := tbl.Key().ValueTime(stopIdx); s > max {
max = s
}
}
}
t.UpdateWatermark(id, max)
t.Finish(id, nil)
t.UpdateWatermark(src.id, max)
t.Finish(src.id, nil)
}
}

Expand All @@ -124,6 +131,10 @@ type AllocatingFromProcedureSpec struct {

const AllocatingFromTestKind = "allocating-from-test"

func (s *AllocatingFromProcedureSpec) ID() execute.DatasetID {
return s.id
}

func (AllocatingFromProcedureSpec) Kind() plan.ProcedureKind {
return AllocatingFromTestKind
}
Expand Down
7 changes: 5 additions & 2 deletions execute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ func (v *createExecutionNodeVisitor) Visit(node plan.Node) error {
}
spec := node.ProcedureSpec()
kind := spec.Kind()
id := DatasetIDFromNodeID(node.ID())
id, err := RandomDatasetID()
if err != nil {
return err
}

if yieldSpec, ok := spec.(plan.YieldProcedureSpec); ok {
r := newResult(yieldSpec.YieldName())
Expand All @@ -180,7 +183,7 @@ func (v *createExecutionNodeVisitor) Visit(node plan.Node) error {
}

for i, pred := range nonYieldPredecessors(node) {
ec.parents[i] = DatasetIDFromNodeID(pred.ID())
ec.parents[i] = v.nodes[pred].ID()
}

// If node is a leaf, create a source
Expand Down
1 change: 1 addition & 0 deletions execute/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

type Node interface {
ID() DatasetID
AddTransformation(t Transformation)
}

Expand Down
8 changes: 8 additions & 0 deletions execute/source_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type sourceDecoder struct {
ts []Transformation
}

func (c *sourceDecoder) ID() DatasetID {
return c.id
}

func (c *sourceDecoder) Do(ctx context.Context, f func(flux.Table) error) error {
err := c.decoder.Connect(ctx)
if err != nil {
Expand Down Expand Up @@ -110,6 +114,10 @@ type sourceIterator struct {
iterator SourceIterator
}

func (s *sourceIterator) ID() DatasetID {
return s.id
}

func (s *sourceIterator) AddTransformation(t Transformation) {
s.ts = append(s.ts, t)
}
Expand Down
4 changes: 4 additions & 0 deletions internal/execute/table/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func NewDataset(id execute.DatasetID, cache *BuilderCache) execute.Dataset {
}
}

func (d *dataset) ID() execute.DatasetID {
return d.id
}

func (d *dataset) AddTransformation(t execute.Transformation) {
d.ts = append(d.ts, t)
}
Expand Down
7 changes: 6 additions & 1 deletion mock/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ import (
// By default it does nothing.
type Source struct {
execute.ExecutionNode
id execute.DatasetID
AddTransformationFn func(transformation execute.Transformation)
RunFn func(ctx context.Context)
}

func (s *Source) ID() execute.DatasetID {
return s.id
}

func (s *Source) AddTransformation(t execute.Transformation) {
if s.AddTransformationFn != nil {
s.AddTransformationFn(t)
Expand All @@ -30,5 +35,5 @@ func (s *Source) Run(ctx context.Context) {
// of your test:
// execute.RegisterSource(influxdb.FromKind, mock.CreateMockFromSource)
func CreateMockFromSource(spec plan.ProcedureSpec, id execute.DatasetID, ctx execute.Administration) (execute.Source, error) {
return &Source{}, nil
return &Source{id: id}, nil
}
4 changes: 4 additions & 0 deletions stdlib/csv/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ type CSVSource struct {
alloc *memory.Allocator
}

func (c *CSVSource) ID() execute.DatasetID {
return c.id
}

func (c *CSVSource) AddTransformation(t execute.Transformation) {
c.ts = append(c.ts, t)
}
Expand Down
4 changes: 4 additions & 0 deletions stdlib/experimental/array/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ type tableSource struct {
ts execute.TransformationSet
}

func (s *tableSource) ID() execute.DatasetID {
return s.id
}

func (s *tableSource) AddTransformation(t execute.Transformation) {
s.ts = append(s.ts, t)
}
Expand Down
4 changes: 4 additions & 0 deletions stdlib/influxdata/influxdb/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func CreateSource(id execute.DatasetID, spec RemoteProcedureSpec, a execute.Admi
return s, nil
}

func (s *source) ID() execute.DatasetID {
return s.id
}

func (s *source) AddTransformation(t execute.Transformation) {
s.ts = append(s.ts, t)
}
Expand Down
4 changes: 4 additions & 0 deletions stdlib/influxdata/influxdb/v1/from_influx_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ type JSONSource struct {
ts []execute.Transformation
}

func (c *JSONSource) ID() execute.DatasetID {
return c.id
}

func (c *JSONSource) AddTransformation(t execute.Transformation) {
c.ts = append(c.ts, t)
}
Expand Down
4 changes: 4 additions & 0 deletions stdlib/internal/gen/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ type Source struct {
alloc *memory.Allocator
}

func (s *Source) ID() execute.DatasetID {
return s.id
}

func (s *Source) AddTransformation(t execute.Transformation) {
s.ts = append(s.ts, t)
}
Expand Down
4 changes: 4 additions & 0 deletions stdlib/internal/promql/empty_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ type EmptyTableSource struct {
ts []execute.Transformation
}

func (s *EmptyTableSource) ID() execute.DatasetID {
return s.id
}

func (s *EmptyTableSource) AddTransformation(t execute.Transformation) {
s.ts = append(s.ts, t)
}
Expand Down
4 changes: 4 additions & 0 deletions stdlib/socket/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ type socketSource struct {
ts []execute.Transformation
}

func (ss *socketSource) ID() execute.DatasetID {
return ss.d
}

func (ss *socketSource) AddTransformation(t execute.Transformation) {
ss.ts = append(ss.ts, t)
}
Expand Down