diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go index 938ee40fb4fb..3a0552e53eba 100644 --- a/sdks/go/pkg/beam/coder.go +++ b/sdks/go/pkg/beam/coder.go @@ -152,7 +152,23 @@ func NewCoder(t FullType) Coder { func inferCoder(t FullType) (*coder.Coder, error) { switch t.Class() { - case typex.Concrete, typex.Container: + case typex.Container: + switch t.Type() { + case reflectx.ByteSlice: + return &coder.Coder{Kind: coder.Bytes, T: t}, nil + } + switch t.Type().Kind() { + case reflect.Slice: + c, err := inferCoder(t.Components()[0]) + if err != nil { + return nil, err + } + return &coder.Coder{Kind: coder.Iterable, T: t, Components: []*coder.Coder{c}}, nil + + default: + panic(fmt.Sprintf("inferCoder: unknown container kind %v", t)) + } + case typex.Concrete: switch t.Type() { case reflectx.Int64: // use the beam varint coder. @@ -183,9 +199,6 @@ func inferCoder(t FullType) (*coder.Coder, error) { case reflectx.String: return &coder.Coder{Kind: coder.String, T: t}, nil - case reflectx.ByteSlice: - return &coder.Coder{Kind: coder.Bytes, T: t}, nil - case reflectx.Bool: return &coder.Coder{Kind: coder.Bool, T: t}, nil diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 3e5084e857d5..9fa8df7500a7 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -324,7 +324,7 @@ func (n *DataSource) Down(ctx context.Context) error { } func (n *DataSource) String() string { - return fmt.Sprintf("DataSource[%v, %v] Coder:%v Out:%v", n.SID, n.Name, n.Coder, n.Out.ID()) + return fmt.Sprintf("DataSource[%v, %v] Out:%v Coder:%v ", n.SID, n.Name, n.Out.ID(), n.Coder) } // incrementIndexAndCheckSplit increments DataSource.index by one and checks if diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index 489878defa02..42de145db808 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -403,5 +403,5 @@ func (n *ParDo) fail(err error) error { } func (n *ParDo) String() string { - return fmt.Sprintf("ParDo[%v] Out:%v", path.Base(n.Fn.Name()), IDs(n.Out...)) + return fmt.Sprintf("ParDo[%v] Out:%v Sig: %v", path.Base(n.Fn.Name()), IDs(n.Out...), n.Fn.ProcessElementFn().Fn.Type()) } diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index 478f37791d3b..c98e48bf54d9 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -21,6 +21,7 @@ import ( "strconv" "strings" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" @@ -95,11 +96,67 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) { b.units = b.units[:len(b.units)-1] } + mayFixDataSourceCoder(u) b.units = append(b.units, u) } return b.build() } +// mayFixDataSourceCoder checks the node downstream of the DataSource and if applicable, changes +// a KV> coder to a CoGBK. This requires knowledge of the downstream node because +// coder interpretation is ambiguous to received types in DoFns, and we can only interpret it right +// at execution time with knowledge of both. +func mayFixDataSourceCoder(u *DataSource) { + if !coder.IsKV(coder.SkipW(u.Coder)) { + return // If it's not a KV, there's nothing to do here. + } + if coder.SkipW(u.Coder).Components[1].Kind != coder.Iterable { + return // If the V is not an iterable, we don't care. + } + out := u.Out + if mp, ok := out.(*Multiplex); ok { + // Here we trust that the Multiplex Outs are all the same signature, since we've validated + // that at construction time. + out = mp.Out[0] + } + + switch n := out.(type) { + // These nodes always expect CoGBK behavior. + case *Expand, *MergeAccumulators, *ReshuffleOutput, *Combine: + u.Coder = convertToCoGBK(u.Coder) + return + case *ParDo: + // So we now know we have a KV>. So we need to validate whether the DoFn has an + // iter function in the value slot. If it does, we need to use a CoGBK coder. + sig := n.Fn.ProcessElementFn() + // Get all valid inputs and side inputs. + in := sig.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter) + + if len(in) < 2 { + return // Somehow there's only a single value, so we're done. (Defense against generic KVs) + } + // It's an iterator, so we can assume it's a GBK, due to previous pre-conditions. + if sig.Param[in[1]].Kind == funcx.FnIter { + u.Coder = convertToCoGBK(u.Coder) + return + } + } +} + +func convertToCoGBK(oc *coder.Coder) *coder.Coder { + ocnw := coder.SkipW(oc) + // Validate that all values from the coder are iterables. + comps := make([]*coder.Coder, 0, len(ocnw.Components)) + comps = append(comps, ocnw.Components[0]) + for _, c := range ocnw.Components[1:] { + if c.Kind != coder.Iterable { + panic(fmt.Sprintf("want all values to be iterables: %v", oc)) + } + comps = append(comps, c.Components[0]) + } + return coder.NewW(coder.NewCoGBK(comps), oc.Window) +} + type builder struct { desc *fnpb.ProcessBundleDescriptor coders *graphx.CoderUnmarshaller diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate_test.go b/sdks/go/pkg/beam/core/runtime/exec/translate_test.go index 3867b9425161..17d33caf8b1a 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" @@ -90,6 +91,99 @@ func TestUnmarshalReshuffleCoders(t *testing.T) { } } +func TestMayFixDataSourceCoder(t *testing.T) { + knownStart := coder.NewW( + coder.NewKV([]*coder.Coder{coder.NewBytes(), coder.NewI(coder.NewString())}), + coder.NewGlobalWindow()) + knownWant := coder.NewW( + coder.NewCoGBK([]*coder.Coder{coder.NewBytes(), coder.NewString()}), + coder.NewGlobalWindow()) + + makeParDo := func(t *testing.T, fn any) *ParDo { + t.Helper() + dfn, err := graph.NewDoFn(fn) + if err != nil { + t.Fatalf("couldn't construct ParDo with Sig: %T %v", fn, err) + } + return &ParDo{Fn: dfn} + } + + tests := []struct { + name string + start, want *coder.Coder + out Node + }{ + { + name: "bytes", + start: coder.NewBytes(), + }, { + name: "W", + start: coder.NewW(coder.NewBytes(), coder.NewGlobalWindow()), + }, { + name: "W", + start: coder.NewW( + coder.NewKV([]*coder.Coder{coder.NewBytes(), coder.NewBool()}), + coder.NewGlobalWindow()), + }, { + name: "W>_nil", + start: knownStart, + }, { + name: "W>_Expand", + out: &Expand{}, + start: knownStart, + want: knownWant, + }, { + name: "W>_Combine", + out: &Combine{}, + start: knownStart, + want: knownWant, + }, { + name: "W>_ReshuffleOutput", + out: &ReshuffleOutput{}, + start: knownStart, + want: knownWant, + }, { + name: "W>_MergeAccumulators", + out: &MergeAccumulators{}, + start: knownStart, + want: knownWant, + }, { + name: "W>_Multiplex_Expand", + out: &Multiplex{Out: []Node{&Expand{}}}, + start: knownStart, + want: knownWant, + }, { + name: "W>_Multiplex_ParDo_KV", + out: &Multiplex{Out: []Node{makeParDo(t, func([]byte, []string) {})}}, + start: knownStart, + }, { + name: "W>_Multiplex_ParDo_GBK", + out: &Multiplex{Out: []Node{makeParDo(t, func([]byte, func(*string) bool) {})}}, + start: knownStart, + want: knownWant, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // If want is nil, we expect no changes. + if test.want == nil { + test.want = test.start + } + + u := &DataSource{ + Coder: test.start, + Out: test.out, + } + mayFixDataSourceCoder(u) + if !test.want.Equals(u.Coder) { + t.Errorf("mayFixDataSourceCoder(Datasource[Coder: %v, Out: %T]), got %v, want %v", test.start, test.out, u.Coder, test.want) + } + + }) + } +} + func TestUnmarshallWindowFn(t *testing.T) { tests := []struct { name string diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go index cecbea86ed7c..3af820e8a2bc 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go @@ -216,9 +216,6 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder, } id := components[1] - kind := coder.KV - root := typex.KVType - elm, err := b.peek(id) if err != nil { return nil, err @@ -226,15 +223,15 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder, switch elm.GetSpec().GetUrn() { case urnIterableCoder, urnStateBackedIterableCoder: - id = elm.GetComponentCoderIds()[0] - kind = coder.CoGBK - root = typex.CoGBKType + iterElmID := elm.GetComponentCoderIds()[0] // TODO(https://github.com/apache/beam/issues/18032): If CoGBK with > 1 input, handle as special GBK. We expect // it to be encoded as CoGBK>>. Remove this handling once // CoGBK has a first-class representation. - if ids, ok := b.isCoGBKList(id); ok { + // If the value is an iterable, and a special CoGBK type, then expand it to the real + // CoGBK signature, instead of the special type. + if ids, ok := b.isCoGBKList(iterElmID); ok { // CoGBK values, err := b.Coders(ids) @@ -242,9 +239,11 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder, return nil, err } - t := typex.New(root, append([]typex.FullType{key.T}, coder.Types(values)...)...) - return &coder.Coder{Kind: kind, T: t, Components: append([]*coder.Coder{key}, values...)}, nil + t := typex.New(typex.CoGBKType, append([]typex.FullType{key.T}, coder.Types(values)...)...) + return &coder.Coder{Kind: coder.CoGBK, T: t, Components: append([]*coder.Coder{key}, values...)}, nil } + // It's valid to have a KV> without being a CoGBK, and validating if we need to change to + // a CoGBK is done at the DataSource, since that's when we can check against the downstream nodes. } value, err := b.Coder(id) @@ -252,8 +251,8 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder, return nil, err } - t := typex.New(root, key.T, value.T) - return &coder.Coder{Kind: kind, T: t, Components: []*coder.Coder{key, value}}, nil + t := typex.New(typex.KVType, key.T, value.T) + return &coder.Coder{Kind: coder.KV, T: t, Components: []*coder.Coder{key, value}}, nil case urnLengthPrefixCoder: if len(components) != 1 { @@ -338,7 +337,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder, } return c, nil - case urnIterableCoder: + case urnIterableCoder, urnStateBackedIterableCoder: if len(components) != 1 { return nil, errors.Errorf("could not unmarshal iterable coder from %v, expected one component but got %d", c, len(components)) } @@ -553,6 +552,13 @@ func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) { return b.internBuiltInCoder(urnTimerCoder, comp...), nil + case coder.Iterable: + comp, err := b.AddMulti(c.Components) + if err != nil { + return "", errors.Wrapf(err, "failed to marshal iterable coder %v", c) + } + return b.internBuiltInCoder(urnIterableCoder, comp...), nil + default: err := errors.Errorf("unexpected coder kind: %v", c.Kind) return "", errors.WithContextf(err, "failed to marshal coder %v", c) diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go index aad15df0f23f..01c70181ce89 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go @@ -49,60 +49,70 @@ func TestMarshalUnmarshalCoders(t *testing.T) { baz := custom("baz", reflectx.Int) tests := []struct { - name string - c *coder.Coder + name string + c *coder.Coder + equivalent *coder.Coder }{ { - "bytes", - coder.NewBytes(), + name: "bytes", + c: coder.NewBytes(), }, { - "bool", - coder.NewBool(), + name: "bool", + c: coder.NewBool(), }, { - "varint", - coder.NewVarInt(), + name: "varint", + c: coder.NewVarInt(), }, { - "double", - coder.NewDouble(), + name: "double", + c: coder.NewDouble(), }, { - "string", - coder.NewString(), + name: "string", + c: coder.NewString(), }, { - "foo", - foo, + name: "foo", + c: foo, }, { - "bar", - bar, + name: "bar", + c: bar, }, { - "baz", - baz, + name: "baz", + c: baz, }, { - "W", - coder.NewW(coder.NewBytes(), coder.NewGlobalWindow()), + name: "W", + c: coder.NewW(coder.NewBytes(), coder.NewGlobalWindow()), }, { - "N", - coder.NewN(coder.NewBytes()), + name: "N", + c: coder.NewN(coder.NewBytes()), }, { - "KV", - coder.NewKV([]*coder.Coder{foo, bar}), + name: "I", + c: coder.NewI(foo), }, { - "CoGBK", - coder.NewCoGBK([]*coder.Coder{foo, bar}), + name: "KV", + c: coder.NewKV([]*coder.Coder{foo, bar}), }, { - "CoGBK", - coder.NewCoGBK([]*coder.Coder{foo, bar, baz}), + name: "KV>", + c: coder.NewKV([]*coder.Coder{foo, coder.NewI(bar)}), + }, + { + name: "CoGBK", + c: coder.NewCoGBK([]*coder.Coder{foo, bar}), + equivalent: coder.NewKV([]*coder.Coder{foo, coder.NewI(bar)}), + }, + { + name: "CoGBK", + c: coder.NewCoGBK([]*coder.Coder{foo, bar, baz}), }, { name: "R[graphx.registeredNamedTypeForTest]", @@ -124,7 +134,10 @@ func TestMarshalUnmarshalCoders(t *testing.T) { if err != nil { t.Fatalf("Unmarshal(Marshal(%v)) failed: %v", test.c, err) } - if len(coders) != 1 || !test.c.Equals(coders[0]) { + if test.equivalent != nil && !test.equivalent.Equals(coders[0]) { + t.Errorf("Unmarshal(Marshal(%v)) = %v, want equivalent", test.equivalent, coders) + } + if test.equivalent == nil && !test.c.Equals(coders[0]) { t.Errorf("Unmarshal(Marshal(%v)) = %v, want identity", test.c, coders) } }) @@ -149,7 +162,10 @@ func TestMarshalUnmarshalCoders(t *testing.T) { if err != nil { t.Fatalf("Unmarshal(Marshal(%v)) failed: %v", test.c, err) } - if len(coders) != 1 || !test.c.Equals(coders[0]) { + if test.equivalent != nil && !test.equivalent.Equals(coders[0]) { + t.Errorf("Unmarshal(Marshal(%v)) = %v, want equivalent", test.equivalent, coders) + } + if test.equivalent == nil && !test.c.Equals(coders[0]) { t.Errorf("Unmarshal(Marshal(%v)) = %v, want identity", test.c, coders) } }) @@ -166,8 +182,11 @@ func TestMarshalUnmarshalCoders(t *testing.T) { if err != nil { t.Fatalf("DecodeCoderRef(EncodeCoderRef(%v)) failed: %v", test.c, err) } - if !test.c.Equals(got) { - t.Errorf("DecodeCoderRef(EncodeCoderRef(%v)) = %v, want identity", test.c, got) + if test.equivalent != nil && !test.equivalent.Equals(got) { + t.Errorf("DecodeCoderRef(EncodeCoderRef(%v)) = %v want equivalent", test.equivalent, got) + } + if test.equivalent == nil && !test.c.Equals(got) { + t.Errorf("DecodeCoderRef(EncodeCoderRef(%v)) = %v want identity", test.c, got) } }) } diff --git a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go index efdf4ef140f4..1bda90aa9e1c 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go @@ -16,6 +16,8 @@ package graphx import ( + "reflect" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema" v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1" @@ -128,6 +130,16 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) { } return &CoderRef{Type: nullableType, Components: []*CoderRef{innerref}}, nil + case coder.Iterable: + if len(c.Components) != 1 { + return nil, errors.Errorf("bad I: %v", c) + } + innerref, err := EncodeCoderRef(c.Components[0]) + if err != nil { + return nil, err + } + return &CoderRef{Type: streamType, Components: []*CoderRef{innerref}}, nil + case coder.CoGBK: if len(c.Components) < 2 { return nil, errors.Errorf("bad CoGBK: %v", c) @@ -243,27 +255,19 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) { } elm := c.Components[1] - kind := coder.KV - root := typex.KVType - - isGBK := elm.Type == streamType - if isGBK { - elm = elm.Components[0] - kind = coder.CoGBK - root = typex.CoGBKType - + if elm.Type == streamType { // TODO(https://github.com/apache/beam/issues/18032): If CoGBK with > 1 input, handle as special GBK. We expect // it to be encoded as CoGBK>. Remove this handling once // CoGBK has a first-class representation. - if refs, ok := isCoGBKList(elm); ok { + if refs, ok := isCoGBKList(elm.Components[0]); ok { values, err := DecodeCoderRefs(refs) if err != nil { return nil, err } - t := typex.New(root, append([]typex.FullType{key.T}, coder.Types(values)...)...) - return &coder.Coder{Kind: kind, T: t, Components: append([]*coder.Coder{key}, values...)}, nil + t := typex.New(typex.CoGBKType, append([]typex.FullType{key.T}, coder.Types(values)...)...) + return &coder.Coder{Kind: coder.CoGBK, T: t, Components: append([]*coder.Coder{key}, values...)}, nil } } @@ -272,8 +276,8 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) { return nil, err } - t := typex.New(root, key.T, value.T) - return &coder.Coder{Kind: kind, T: t, Components: []*coder.Coder{key, value}}, nil + t := typex.New(typex.KVType, key.T, value.T) + return &coder.Coder{Kind: coder.KV, T: t, Components: []*coder.Coder{key, value}}, nil case nullableType: if len(c.Components) != 1 { @@ -319,7 +323,17 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) { return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: []*coder.Coder{elm}, Window: w}, nil case streamType: - return nil, errors.Errorf("stream must be pair value: %+v", c) + if len(c.Components) != 1 { + return nil, errors.Errorf("bad iterable/stream: %+v", c) + } + + inner, err := DecodeCoderRef(c.Components[0]) + if err != nil { + return nil, err + } + + t := typex.New(reflect.SliceOf(inner.T.Type()), inner.T) + return &coder.Coder{Kind: coder.Iterable, T: t, Components: []*coder.Coder{inner}}, nil case rowType: subC := c.Components[0] diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go b/sdks/go/pkg/beam/core/typex/fulltype.go index 386acad38185..41ef0ab09d22 100644 --- a/sdks/go/pkg/beam/core/typex/fulltype.go +++ b/sdks/go/pkg/beam/core/typex/fulltype.go @@ -108,8 +108,13 @@ func New(t reflect.Type, components ...FullType) FullType { case Container: switch t.Kind() { case reflect.Slice: - // We include the child type as a component for convenience. - return &tree{class, t, []FullType{New(t.Elem())}} + if len(components) == 0 { + // For elements without sub components, we just create with the type, this handles vanilla slices. + // We include the child type as a component for convenience. + return &tree{class, t, []FullType{New(t.Elem())}} + } + // For elements which themselves have components, we need to go deeper. + return &tree{class, t, []FullType{New(t.Elem(), components[0].Components()...)}} default: panic(fmt.Sprintf("Unexpected aggregate type: %v", t)) } @@ -117,10 +122,10 @@ func New(t reflect.Type, components ...FullType) FullType { switch t { case KVType: if len(components) != 2 { - panic("Invalid number of components for KV") + panic(fmt.Sprintf("Invalid number of components for KV: %v, %v", t, components)) } if isAnyNonKVComposite(components) { - panic("Invalid to nest composites inside KV") + panic(fmt.Sprintf("Invalid to nest composite composites inside KV: %v, %v", t, components)) } return &tree{class, t, components} case WindowedValueType: @@ -133,10 +138,10 @@ func New(t reflect.Type, components ...FullType) FullType { return &tree{class, t, components} case CoGBKType: if len(components) < 2 { - panic("Invalid number of components for CoGBK") + panic(fmt.Sprintf("Invalid number of components for CoGBK: %v", t)) } if isAnyNonKVComposite(components) { - panic("Invalid to nest composites inside CoGBK") + panic(fmt.Sprintf("Invalid to nest composites inside CoGBK: %v", t)) } return &tree{class, t, components} case TimersType: @@ -221,15 +226,14 @@ func NewCoGBK(components ...FullType) FullType { // // For example: // -// SA: KV := KV -// SA: KV := KV // X bound to string by assignment -// SA: KV := KV // Assignable only if X is already bound to string -// SA: KV := KV // Not assignable under any binding -// -// Not SA: KV := KV -// Not SA: X := KV -// Not SA: GBK(X,Y) := KV +// SA: KV := KV +// SA: KV := KV // X bound to string by assignment +// SA: KV := KV // Assignable only if X is already bound to string +// SA: KV := KV // Not assignable under any binding // +// Not SA: KV := KV +// Not SA: X := KV +// Not SA: GBK(X,Y) := KV func IsStructurallyAssignable(from, to FullType) bool { switch from.Class() { case Concrete: @@ -423,6 +427,5 @@ func checkTypesNotNil(list []FullType) { // NoFiringPane return PaneInfo assigned as NoFiringPane(0x0f) func NoFiringPane() PaneInfo { - pn := PaneInfo{IsFirst: true, IsLast: true, Timing: PaneUnknown} - return pn + return PaneInfo{IsFirst: true, IsLast: true, Timing: PaneUnknown} } diff --git a/sdks/go/pkg/beam/core/typex/fulltype_test.go b/sdks/go/pkg/beam/core/typex/fulltype_test.go index 8c7c161b7010..060404ad9115 100644 --- a/sdks/go/pkg/beam/core/typex/fulltype_test.go +++ b/sdks/go/pkg/beam/core/typex/fulltype_test.go @@ -34,6 +34,10 @@ func TestIsBound(t *testing.T) { {NewKV(New(reflectx.String), New(reflect.SliceOf(reflectx.Int))), true}, {NewKV(New(reflectx.String), New(reflect.SliceOf(XType))), false}, {NewKV(New(reflectx.String), New(reflectx.String)), true}, + {NewKV(New(reflectx.String), New(reflect.SliceOf(reflectx.String))), true}, + {NewW(NewKV(New(reflectx.ByteSlice), New(reflectx.Int))), true}, + {New(reflect.SliceOf(KVType), NewKV(New(reflectx.ByteSlice), New(reflectx.Int))), true}, + {New(TimersType, New(reflectx.ByteSlice)), true}, } for _, test := range tests { @@ -44,6 +48,9 @@ func TestIsBound(t *testing.T) { } func TestIsStructurallyAssignable(t *testing.T) { + type foo int + var f foo + fooT := reflect.TypeOf(f) tests := []struct { A, B FullType Exp bool @@ -52,6 +59,7 @@ func TestIsStructurallyAssignable(t *testing.T) { {New(reflectx.Int32), New(reflectx.Int64), false}, // from Go assignability {New(reflectx.Int64), New(reflectx.Int32), false}, // from Go assignability {New(reflectx.Int), New(TType), true}, + {New(reflectx.Int), New(fooT), false}, {New(XType), New(TType), true}, {NewKV(New(XType), New(YType)), New(TType), false}, // T cannot match composites {NewKV(New(reflectx.Int), New(reflectx.Int)), NewCoGBK(New(reflectx.Int), New(reflectx.Int)), false}, // structural mismatch @@ -60,6 +68,8 @@ func TestIsStructurallyAssignable(t *testing.T) { {NewKV(New(reflectx.String), New(reflectx.Int)), NewKV(New(TType), New(TType)), true}, {NewKV(New(reflectx.Int), New(reflectx.Int)), NewKV(New(TType), New(TType)), true}, {NewKV(New(reflectx.Int), New(reflectx.String)), NewKV(New(TType), New(reflectx.String)), true}, + {New(reflect.SliceOf(reflectx.Int)), New(reflect.SliceOf(fooT)), false}, + {New(reflect.SliceOf(reflectx.Int)), New(TType), true}, } for _, test := range tests { @@ -103,6 +113,18 @@ func TestBindSubstitute(t *testing.T) { NewCoGBK(New(YType), New(XType)), NewCoGBK(New(XType), New(ZType)), }, + { + New(reflect.SliceOf(reflectx.String)), + New(XType), + NewKV(New(reflectx.Int), New(XType)), + NewKV(New(reflectx.Int), New(reflect.SliceOf(reflectx.String))), + }, + { + New(reflectx.String), + New(XType), + NewKV(New(reflectx.Int), New(reflect.SliceOf(XType))), + NewKV(New(reflectx.Int), New(reflect.SliceOf(reflectx.String))), + }, } for _, test := range tests {