Skip to content

Commit

Permalink
[#23893] Support composite scope transform metadata (#29204)
Browse files Browse the repository at this point in the history
* [#23893] Do Annotation plumbing to the graph.

* fmt

* Groundwork for DisplayData and Deps support

* rm env change

* Add plubming and simpler handling and test!

* Docs and details.

* Missed a rename.

* pipeline doc

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
lostluck and lostluck authored Nov 2, 2023
1 parent 9333648 commit edbeda7
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 3 deletions.
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Graph struct {

// New returns an empty graph with the scope set to the root.
func New() *Graph {
root := &Scope{0, "root", nil}
root := &Scope{id: 0, Label: "root", Parent: nil}
return &Graph{root: root}
}

Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/core/graph/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package graph

import "context"

// Scope is a syntactic Scope, such as arising from a composite Transform. It
// has no semantic meaning at execution time. Used by monitoring.
type Scope struct {
Expand All @@ -24,6 +26,8 @@ type Scope struct {
Label string
// Parent is the parent scope, if nested.
Parent *Scope
// Context contains optional metadata associated with this scope.
Context context.Context
}

// ID returns the graph-local identifier for the scope.
Expand Down
120 changes: 120 additions & 0 deletions sdks/go/pkg/beam/core/runtime/contextreg/contextreg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package contextreg contains the global registrations of functions for extracting
// ptransform annotations or environment resource hints from context.Context attached to
// scopes.
//
// For beam internal use only. API subject to change.
package contextreg

import (
"context"
"maps"
"sync"
)

var defaultReg = &Registry{}

// Default is the default registry for context extractors.
func Default() *Registry {
return defaultReg
}

// Registry contains a set of registrations for extracting annotations and hints from a context.Context.
//
// This type is exported to allow simpler testing of new extractors, and their interaction with the registry.
type Registry struct {
mu sync.Mutex
transforms []func(context.Context) TransformMetadata
envs []func(context.Context) EnvironmentMetadata
}

// TransformMetadata represents additional information on transforms to be added to the Pipeline proto graph.
type TransformMetadata struct {
Annotations map[string][]byte
// DisplayData []*pipepb.DisplayData
}

// EnvironmentMetadata represent additional information on environmental requirements to be added to the Pipeline
// proto graph.
type EnvironmentMetadata struct {
ResourceHints map[string][]byte
// DisplayData []*pipepb.DisplayData
// Dependencies []*pipepb.ArtifactInformation
}

// TransformExtractor registers a transform metadata extractor to this registry.
// These will be set on the current composite transform scope.
// They are accessible to runners via the transform hypergraph.
func (r *Registry) TransformExtractor(ext func(context.Context) TransformMetadata) {
r.mu.Lock()
r.transforms = append(r.transforms, ext)
r.mu.Unlock()
}

// EnvExtrator registers an environment metadata extractor to this registry.
// When non-empty extraction occurs, a new environment will be derived from the parent scopes environment.
func (r *Registry) EnvExtrator(ext func(context.Context) EnvironmentMetadata) {
r.mu.Lock()
r.envs = append(r.envs, ext)
r.mu.Unlock()
}

// ExtractTransformMetadata runs all registered transform extractors on the provided context,
// and returns the resulting metadata.
//
// A metadata field will be nil if there's no data. A nil context bypasses extractor execution.
func (r *Registry) ExtractTransformMetadata(ctx context.Context) TransformMetadata {
r.mu.Lock()
defer r.mu.Unlock()
if ctx == nil {
return TransformMetadata{}
}
ret := TransformMetadata{
Annotations: map[string][]byte{},
}
for _, ext := range r.transforms {
k := ext(ctx)
maps.Copy(ret.Annotations, k.Annotations)
}
if len(ret.Annotations) == 0 {
ret.Annotations = nil
}
return ret
}

// ExtractEnvironmentMetadata runs all registered environment extractors on the provided context,
// and returns the resulting metadata.
//
// A metadata field will be nil if there's no data. A nil context bypasses extractor execution.
func (r *Registry) ExtractEnvironmentMetadata(ctx context.Context) EnvironmentMetadata {
r.mu.Lock()
defer r.mu.Unlock()
if ctx == nil {
return EnvironmentMetadata{}
}
ret := EnvironmentMetadata{
ResourceHints: map[string][]byte{},
}
for _, ext := range r.envs {
k := ext(ctx)
maps.Copy(ret.ResourceHints, k.ResourceHints)
}
if len(ret.ResourceHints) == 0 {
ret.ResourceHints = nil
}
return ret
}
108 changes: 108 additions & 0 deletions sdks/go/pkg/beam/core/runtime/contextreg/contextreg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package contextreg

import (
"context"
"testing"
)

func TestPTransformExtractor(t *testing.T) {
reg := &Registry{}

type keyType string
key1 := keyType("annotation1")
key2 := keyType("annotation2")
key3 := keyType("annotation3")

reg.TransformExtractor(func(ctx context.Context) TransformMetadata {
v := ctx.Value(key1).(string)
return TransformMetadata{
Annotations: map[string][]byte{
"beam:test:annotation": []byte(v),
},
}
})
reg.TransformExtractor(func(ctx context.Context) TransformMetadata {
v := ctx.Value(key2).(string)
return TransformMetadata{
Annotations: map[string][]byte{
"beam:test:annotation2": []byte(v),
},
}
})
// Override the extaction for result annotation to use the last set version.
reg.TransformExtractor(func(ctx context.Context) TransformMetadata {
v := ctx.Value(key3).(string)
return TransformMetadata{
Annotations: map[string][]byte{
"beam:test:annotation": []byte(v),
},
}
})

ctx := context.Background()
// Set all 3 distinct context values.
ctx = context.WithValue(ctx, key1, "never seen")
want2 := "want_value2"
ctx = context.WithValue(ctx, key2, want2)
want3 := "want_value3"
ctx = context.WithValue(ctx, key3, want3)

ptrans := reg.ExtractTransformMetadata(ctx)

key := "beam:test:annotation"
if got, want := string(ptrans.Annotations[key]), want3; got != want {
t.Errorf("extracted annotation %q = %q, want %q", key, got, want)
}
key = "beam:test:annotation2"
if got, want := string(ptrans.Annotations[key]), want2; got != want {
t.Errorf("extracted annotation %q = %q, want %q", key, got, want)
}
if got, want := len(ptrans.Annotations), 2; got != want {
t.Errorf("extracted annotation %q = %q, want %q - have %v", key, got, want, ptrans)
}
}

func TestHintExtractor(t *testing.T) {
reg := &Registry{}

type keyType string
hintKey := keyType("hint")

reg.EnvExtrator(func(ctx context.Context) EnvironmentMetadata {
v := ctx.Value(hintKey).(string)
return EnvironmentMetadata{
ResourceHints: map[string][]byte{
"beam:test:hint": []byte(v),
},
}
})

ctx := context.Background()
wantedHint := "hint"
ctx = context.WithValue(ctx, hintKey, wantedHint)

env := reg.ExtractEnvironmentMetadata(ctx)

key := "beam:test:hint"
if got, want := string(env.ResourceHints[key]), wantedHint; got != want {
t.Errorf("extracted annotation %q = %q, want %q", key, got, want)
}
if got, want := len(env.ResourceHints), 1; got != want {
t.Errorf("extracted annotation %q = %q, want %q - have %v", key, got, want, env)
}
}
17 changes: 17 additions & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/contextreg"
v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
Expand Down Expand Up @@ -154,6 +155,18 @@ type Options struct {

// PipelineResourceHints for setting defaults across the whole pipeline.
PipelineResourceHints resource.Hints

// ContextReg is an override for the context extractor registry for testing.
ContextReg *contextreg.Registry
}

// GetContextReg returns the default context registry if the option is
// unset, and the field version otherwise.
func (opts *Options) GetContextReg() *contextreg.Registry {
if opts.ContextReg == nil {
return contextreg.Default()
}
return opts.ContextReg
}

// Marshal converts a graph to a model pipeline.
Expand Down Expand Up @@ -273,10 +286,14 @@ func (m *marshaller) addScopeTree(s *ScopeTree) (string, error) {
subtransforms = append(subtransforms, id)
}

metadata := m.opt.GetContextReg().ExtractTransformMetadata(s.Scope.Scope.Context)

transform := &pipepb.PTransform{
UniqueName: s.Scope.Name,
Subtransforms: subtransforms,
EnvironmentId: m.addDefaultEnv(),
Annotations: metadata.Annotations,
// DisplayData: metadata.DisplayData,
}

if err := m.updateIfCombineComposite(s, transform); err != nil {
Expand Down
78 changes: 76 additions & 2 deletions sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/contextreg"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
Expand Down Expand Up @@ -165,8 +166,8 @@ func TestMarshal(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if len(edges) != test.edges {
t.Fatal("expected a single edge")
if got, want := len(edges), test.edges; got != want {
t.Fatalf("got %v edges, want %v", got, want)
}

payload, err := proto.Marshal(&pipepb.DockerPayload{ContainerImage: "foo"})
Expand All @@ -192,6 +193,79 @@ func TestMarshal(t *testing.T) {
}
}

func TestMarshal_PTransformAnnotations(t *testing.T) {
var creg contextreg.Registry

const annotationKey = "myAnnotation"

// A misused ptransform extractor that, if a context is attached to a scope will add an annotation to those transforms.
creg.TransformExtractor(func(ctx context.Context) contextreg.TransformMetadata {
return contextreg.TransformMetadata{
Annotations: map[string][]byte{
annotationKey: {42, 42, 42},
},
}
})

tests := []struct {
name string
makeGraph func(t *testing.T, g *graph.Graph)

transforms int
}{
{
name: "AnnotationSetOnComposite",
makeGraph: func(t *testing.T, g *graph.Graph) {
in := newIntInput(g)
side := newIntInput(g)
s := g.NewScope(g.Root(), "sub")
s.Context = context.Background() // Allow the default annotation to trigger.
addDoFn(t, g, pickSideFn, s, []*graph.Node{in, side}, []*coder.Coder{intCoder(), intCoder()}, nil)
},
transforms: 2,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
g := graph.New()
test.makeGraph(t, g)

edges, _, err := g.Build()
if err != nil {
t.Fatal(err)
}

payload, err := proto.Marshal(&pipepb.DockerPayload{ContainerImage: "foo"})
if err != nil {
t.Fatal(err)
}
p, err := graphx.Marshal(edges,
&graphx.Options{Environment: &pipepb.Environment{Urn: "beam:env:docker:v1", Payload: payload}, ContextReg: &creg})
if err != nil {
t.Fatal(err)
}

pts := p.GetComponents().GetTransforms()
if got, want := len(pts), test.transforms; got != want {
t.Errorf("got %d transforms, want %d : %v", got, want, proto.MarshalTextString(p))
}
for _, pt := range pts {
// Context annotations only apply to composites, and are not duplicated to leaves.
if len(pt.GetSubtransforms()) == 0 {
if _, ok := pt.GetAnnotations()[annotationKey]; ok {
t.Errorf("unexpected annotation %v on leaf transform: %v", annotationKey, pt.GetAnnotations())
}
continue
}
if _, ok := pt.GetAnnotations()[annotationKey]; !ok {
t.Errorf("expected %q annotation, but wasn't present: %v", annotationKey, pt.GetAnnotations())
}
}
})
}
}

// testRT's methods can all be no-ops, we just need it to implement sdf.RTracker.
type testRT struct {
}
Expand Down
Loading

0 comments on commit edbeda7

Please sign in to comment.