From ccde1f8c20bb5ad571404a104cf4fde6cd7e3d3c Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 30 Sep 2022 12:45:45 -0400 Subject: [PATCH 1/6] dataframe api wrapper for GO SDK --- sdks/go/pkg/beam/schema.go | 17 +++ sdks/go/pkg/beam/transforms/xlang/external.go | 129 ++++++++++++++++++ .../xlang/python/dataframe/dataframe.go | 89 ++++++++++++ sdks/go/test/integration/integration.go | 1 + .../test/integration/xlang/dataframe_test.go | 68 +++++++++ 5 files changed, 304 insertions(+) create mode 100644 sdks/go/pkg/beam/transforms/xlang/external.go create mode 100644 sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go create mode 100644 sdks/go/test/integration/xlang/dataframe_test.go diff --git a/sdks/go/pkg/beam/schema.go b/sdks/go/pkg/beam/schema.go index b25a3e285f71..2a3134455d96 100644 --- a/sdks/go/pkg/beam/schema.go +++ b/sdks/go/pkg/beam/schema.go @@ -77,6 +77,23 @@ func RegisterSchemaProvider(rt reflect.Type, provider interface{}) { coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder) } +// RegisterSchemaProviderWithURN is for internal use only. Users are recommended to use +// beam.RegisterSchemaProvider() instead. +// RegisterSchemaProviderWithURN registers a new schema provider for a new logical type defined +// in pkg/beam/model/pipeline_v1/schema.pb.go +// +// RegisterSchemaProviderWithURN must be called before beam.Init(), and conventionally +// is called in a package init() function. +func RegisterSchemaProviderWithURN(rt reflect.Type, provider interface{}, urn string) { + p := provider.(SchemaProvider) + st, err := p.FromLogicalType(rt) + if err != nil { + panic(fmt.Sprintf("beam.RegisterSchemaProvider: schema type provider for %v, doesn't support that type", rt)) + } + schema.RegisterLogicalType(schema.ToLogicalType(urn, rt, st)) + coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder) +} + // SchemaProvider specializes schema handling for complex types, including conversion to a // valid schema base type, // diff --git a/sdks/go/pkg/beam/transforms/xlang/external.go b/sdks/go/pkg/beam/transforms/xlang/external.go new file mode 100644 index 000000000000..4c7c8111192f --- /dev/null +++ b/sdks/go/pkg/beam/transforms/xlang/external.go @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package python contains data structures required for python external transforms in a multilanguage pipeline. +package python + +import ( + "fmt" + "io" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" +) + +const ( + pythonCallableUrn = "beam:logical_type:python_callable:v1" +) + +var ( + pcsType = reflect.TypeOf((*PythonCallableSource)(nil)).Elem() + pcsStorageType = reflectx.String +) + +func init() { + beam.RegisterType(pcsType) + beam.RegisterSchemaProviderWithURN(pcsType, &PythonCallableSourceProvider{}, pythonCallableUrn) +} + +// PythonCallableSource is a wrapper object storing a Python function definition +// that can be evaluated to Python callables in Python SDK. +// +// The snippet of Python code can be a valid Python expression such as +// lambda x: x * x +// str.upper +// a fully qualified name such as +// math.sin +// or a complete multi-line function or class definition such as +// def foo(x): +// ... +// class Foo: +// ... +// +// Any lines preceding the function definition are first evaluated to provide context in which to +// define the function which can be useful to declare imports or any other needed values, e.g. +// import math +// +// def helper(x): +// return x * x +// +// def func(y): +// return helper(y) + y +// in which case `func` would get applied to each element. +type PythonCallableSource string + +// PythonCallableSourceProvider implement the SchemaProvider interface for logical types +type PythonCallableSourceProvider struct{} + +// FromLogicalType returns the goType of the logical type +func (p *PythonCallableSourceProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) { + if rt != pcsType { + return nil, fmt.Errorf("unable to provide schema.LogicalType for type %v, want %v", rt, pcsType) + } + return pcsStorageType, nil +} + +// BuildEncoder encodes the PythonCallableSource logical type +func (p *PythonCallableSourceProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) { + if _, err := p.FromLogicalType(rt); err != nil { + return nil, err + } + + return func(iface interface{}, w io.Writer) error { + v := iface.(PythonCallableSource) + return coder.EncodeStringUTF8(string(v), w) + }, nil +} + +// BuildDecoder decodes the PythonCallableSource logical type +func (p *PythonCallableSourceProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) { + if _, err := p.FromLogicalType(rt); err != nil { + return nil, err + } + + return func(r io.Reader) (interface{}, error) { + s, err := coder.DecodeStringUTF8(r) + if err != nil { + return nil, err + } + return PythonCallableSource(s), nil + }, nil +} + +// NewPythonExternalTransform creates a new instance of PythonExternalTransform. It accepts two types: +// A: used for normal arguments +// K: used for keyword arguments +func NewPythonExternalTransform[A, K any](constructor string) *pythonExternalTransform[A, K] { + return &pythonExternalTransform[A, K]{Constructor: constructor} +} + +// PythonExternalTransform holds the details required for an External Python Transform. +type pythonExternalTransform[A, K any] struct { + Constructor string `beam:"constructor"` + Args A `beam:"args"` + Kwargs K `beam:"kwargs"` +} + +// WithArgs adds arguments to the External Python Transform. +func (p *pythonExternalTransform[A, K]) WithArgs(args any) { + p.Args = args.(A) +} + +// WithKwargs adds keyword arguments to the External Python Transform. +func (p *pythonExternalTransform[A, K]) WithKwargs(kwargs any) { + p.Kwargs = kwargs.(K) +} diff --git a/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go b/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go new file mode 100644 index 000000000000..50c914948c8e --- /dev/null +++ b/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package dataframe is a wrapper for DataframeTransform defined in Apache Beam Python SDK. +// An exapnsion service for python external transforms can be started by running +// $ python -m apache_beam.runners.portability.expansion_service_test -p $PORT_FOR_EXPANSION_SERVICE +package dataframe + +import ( + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + python "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*config)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*kwargs)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*argStruct)(nil)).Elem()) +} + +type kwargs struct { + Fn python.PythonCallableSource `beam:"func"` + IncludeIndexes bool `beam:"include_indexes"` +} + +type argStruct struct { + args []string +} + +type config struct { + dpl kwargs + expansionAddr string +} + +type configOption func(*config) + +// WithExpansionAddr sets an URL for a Python expansion service. +func WithExpansionAddr(expansionAddr string) configOption { + return func(c *config) { + c.expansionAddr = expansionAddr + } +} + +// WithIndexes sets include_indexes option for DataframeTransform. +func WithIndexes() configOption { + return func(c *config) { + c.dpl.IncludeIndexes = true + } +} + +// Transform is a multi-language wrapper for a Python DataframeTransform with a given lambda function. +// lambda function is a required parameter. +// Additional option for including indexes in dataframe can be provided by using +// dataframe.WithIndexes(). +func Transform(s beam.Scope, fn string, col beam.PCollection, outT reflect.Type, opts ...configOption) beam.PCollection { + s.Scope("xlang.python.DataframeTransform") + cfg := config{ + dpl: kwargs{Fn: python.PythonCallableSource(fn)}, + } + for _, opt := range opts { + opt(&cfg) + } + + // TODO: load automatic expansion service here + if cfg.expansionAddr == "" { + panic("no expansion service address provided for xlang.DataframeTransform(), pass xlang.WithExpansionAddr(address) as a param.") + } + + pet := python.NewPythonExternalTransform[argStruct, kwargs]("apache_beam.dataframe.transforms.DataframeTransform") + pet.WithKwargs(cfg.dpl) + pl := beam.CrossLanguagePayload(pet) + result := beam.CrossLanguage(s, "beam:transforms:python:fully_qualified_named", pl, cfg.expansionAddr, beam.UnnamedInput(col), beam.UnnamedOutput(typex.New(outT))) + return result[beam.UnnamedOutputTag()] + +} diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index b70a861064bc..26acc5137e60 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -71,6 +71,7 @@ var directFilters = []string{ "TestDebeziumIO_BasicRead", "TestJDBCIO_BasicReadWrite", "TestJDBCIO_PostgresReadWrite", + "TestDataframe", // Triggers, Panes are not yet supported "TestTrigger.*", "TestPanes", diff --git a/sdks/go/test/integration/xlang/dataframe_test.go b/sdks/go/test/integration/xlang/dataframe_test.go new file mode 100644 index 000000000000..6872f50a1153 --- /dev/null +++ b/sdks/go/test/integration/xlang/dataframe_test.go @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xlang + +import ( + "flag" + "log" + "reflect" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/python/dataframe" + "github.com/apache/beam/sdks/v2/go/test/integration" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*TestRow)(nil)).Elem()) +} + +type TestRow struct { + A int64 `beam:"a"` + B int32 `beam:"b"` +} + +func TestDataframe(t *testing.T) { + flag.Parse() + beam.Init() + + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("python_transform") + if err != nil { + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr + } + + integration.CheckFilters(t) + checkFlags(t) + + row0 := TestRow{A: int64(100), B: int32(1)} + row1 := TestRow{A: int64(100), B: int32(2)} + row2 := TestRow{A: int64(100), B: int32(3)} + row3 := TestRow{A: int64(200), B: int32(4)} + + p, s := beam.NewPipelineWithRoot() + + input := beam.Create(s, row0, row1, row3) + outCol := dataframe.Transform(s, "lambda df: df.groupby('a').sum()", input, reflect.TypeOf((*TestRow)(nil)).Elem(), dataframe.WithExpansionAddr(expansionAddr), dataframe.WithIndexes()) + + passert.Equals(s, outCol, row2, row3) + ptest.RunAndValidate(t, p) +} From d6ad00a8c3d7538dba803b61193775565f24f0af Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 30 Sep 2022 13:00:55 -0400 Subject: [PATCH 2/6] change package name --- sdks/go/pkg/beam/transforms/xlang/external.go | 4 ++-- .../transforms/xlang/python/dataframe/dataframe.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/go/pkg/beam/transforms/xlang/external.go b/sdks/go/pkg/beam/transforms/xlang/external.go index 4c7c8111192f..265601eed194 100644 --- a/sdks/go/pkg/beam/transforms/xlang/external.go +++ b/sdks/go/pkg/beam/transforms/xlang/external.go @@ -13,8 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package python contains data structures required for python external transforms in a multilanguage pipeline. -package python +// Package xlang contains data structures required for python external transforms in a multilanguage pipeline. +package xlang import ( "fmt" diff --git a/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go b/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go index 50c914948c8e..b3159d65e122 100644 --- a/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go +++ b/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go @@ -23,7 +23,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" - python "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang" ) func init() { @@ -33,8 +33,8 @@ func init() { } type kwargs struct { - Fn python.PythonCallableSource `beam:"func"` - IncludeIndexes bool `beam:"include_indexes"` + Fn xlang.PythonCallableSource `beam:"func"` + IncludeIndexes bool `beam:"include_indexes"` } type argStruct struct { @@ -69,7 +69,7 @@ func WithIndexes() configOption { func Transform(s beam.Scope, fn string, col beam.PCollection, outT reflect.Type, opts ...configOption) beam.PCollection { s.Scope("xlang.python.DataframeTransform") cfg := config{ - dpl: kwargs{Fn: python.PythonCallableSource(fn)}, + dpl: kwargs{Fn: xlang.PythonCallableSource(fn)}, } for _, opt := range opts { opt(&cfg) @@ -80,7 +80,7 @@ func Transform(s beam.Scope, fn string, col beam.PCollection, outT reflect.Type, panic("no expansion service address provided for xlang.DataframeTransform(), pass xlang.WithExpansionAddr(address) as a param.") } - pet := python.NewPythonExternalTransform[argStruct, kwargs]("apache_beam.dataframe.transforms.DataframeTransform") + pet := xlang.NewPythonExternalTransform[argStruct, kwargs]("apache_beam.dataframe.transforms.DataframeTransform") pet.WithKwargs(cfg.dpl) pl := beam.CrossLanguagePayload(pet) result := beam.CrossLanguage(s, "beam:transforms:python:fully_qualified_named", pl, cfg.expansionAddr, beam.UnnamedInput(col), beam.UnnamedOutput(typex.New(outT))) From 65ba16b4b8f3581e147ed5155fdfb7f75d9b0e32 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 4 Oct 2022 23:22:51 -0400 Subject: [PATCH 3/6] update changes.md and resolve static check error --- CHANGES.md | 1 + .../pkg/beam/transforms/xlang/python/dataframe/dataframe.go | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 87d3d685eb31..413b24dba563 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,6 +66,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Dataframe wrapper added in Go SDK via Cross-Language (Need to manually start python expansion service). (Go) ([#23384](https://github.com/apache/beam/issues/23384)). ## Breaking Changes diff --git a/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go b/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go index b3159d65e122..a5aaf1c7d085 100644 --- a/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go +++ b/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go @@ -37,9 +37,7 @@ type kwargs struct { IncludeIndexes bool `beam:"include_indexes"` } -type argStruct struct { - args []string -} +type argStruct struct{} type config struct { dpl kwargs From e64b1ae889e68a70a42947a8ba0112d1b31e6c45 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 5 Oct 2022 11:32:06 -0400 Subject: [PATCH 4/6] updated directory structure and filter tests if no address provided --- .../xlang/dataframe/dataframe.go} | 26 +------- .../xlang/dataframe/dataframe_test.go | 60 +++++++++++++++++++ 2 files changed, 63 insertions(+), 23 deletions(-) rename sdks/go/test/integration/{xlang/dataframe_test.go => transforms/xlang/dataframe/dataframe.go} (75%) create mode 100644 sdks/go/test/integration/transforms/xlang/dataframe/dataframe_test.go diff --git a/sdks/go/test/integration/xlang/dataframe_test.go b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go similarity index 75% rename from sdks/go/test/integration/xlang/dataframe_test.go rename to sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go index 6872f50a1153..240f31a9a678 100644 --- a/sdks/go/test/integration/xlang/dataframe_test.go +++ b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go @@ -13,19 +13,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package xlang +package dataframe import ( - "flag" - "log" "reflect" - "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" - "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/python/dataframe" - "github.com/apache/beam/sdks/v2/go/test/integration" ) func init() { @@ -37,22 +32,7 @@ type TestRow struct { B int32 `beam:"b"` } -func TestDataframe(t *testing.T) { - flag.Parse() - beam.Init() - - services := integration.NewExpansionServices() - defer func() { services.Shutdown() }() - addr, err := services.GetAddr("python_transform") - if err != nil { - log.Printf("skipping missing expansion service: %v", err) - } else { - expansionAddr = addr - } - - integration.CheckFilters(t) - checkFlags(t) - +func DataframeTransform(expansionAddr string) *beam.Pipeline { row0 := TestRow{A: int64(100), B: int32(1)} row1 := TestRow{A: int64(100), B: int32(2)} row2 := TestRow{A: int64(100), B: int32(3)} @@ -64,5 +44,5 @@ func TestDataframe(t *testing.T) { outCol := dataframe.Transform(s, "lambda df: df.groupby('a').sum()", input, reflect.TypeOf((*TestRow)(nil)).Elem(), dataframe.WithExpansionAddr(expansionAddr), dataframe.WithIndexes()) passert.Equals(s, outCol, row2, row3) - ptest.RunAndValidate(t, p) + return p } diff --git a/sdks/go/test/integration/transforms/xlang/dataframe/dataframe_test.go b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe_test.go new file mode 100644 index 000000000000..4d73e7dc931b --- /dev/null +++ b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe_test.go @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataframe + +import ( + "flag" + "log" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/apache/beam/sdks/v2/go/test/integration" +) + +var expansionAddr string // Populate with expansion address labelled "python_transform". + +func checkFlags(t *testing.T) { + if expansionAddr == "" { + t.Skip("No python transform expansion address provided.") + } +} + +func TestDataframe(t *testing.T) { + integration.CheckFilters(t) + checkFlags(t) + p := DataframeTransform(expansionAddr) + ptest.RunAndValidate(t, p) +} + +func TestMain(m *testing.M) { + flag.Parse() + beam.Init() + + services := integration.NewExpansionServices() + defer func() { services.Shutdown() }() + addr, err := services.GetAddr("python_transform") + if err != nil { + log.Printf("skipping missing expansion service: %v", err) + } else { + expansionAddr = addr + } + + ptest.MainRet(m) +} From c349d724a6e58ddc4749228fb699ee6bbfa0dbe9 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 12 Oct 2022 10:06:40 -0400 Subject: [PATCH 5/6] refactor dir structure --- .../xlang/{python => }/dataframe/dataframe.go | 12 ++++---- .../transforms/xlang/{ => python}/external.go | 30 +++++++++---------- .../transforms/xlang/dataframe/dataframe.go | 2 +- 3 files changed, 22 insertions(+), 22 deletions(-) rename sdks/go/pkg/beam/transforms/xlang/{python => }/dataframe/dataframe.go (87%) rename sdks/go/pkg/beam/transforms/xlang/{ => python}/external.go (74%) diff --git a/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go b/sdks/go/pkg/beam/transforms/xlang/dataframe/dataframe.go similarity index 87% rename from sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go rename to sdks/go/pkg/beam/transforms/xlang/dataframe/dataframe.go index a5aaf1c7d085..b4f4c37f115e 100644 --- a/sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go +++ b/sdks/go/pkg/beam/transforms/xlang/dataframe/dataframe.go @@ -15,7 +15,7 @@ // Package dataframe is a wrapper for DataframeTransform defined in Apache Beam Python SDK. // An exapnsion service for python external transforms can be started by running -// $ python -m apache_beam.runners.portability.expansion_service_test -p $PORT_FOR_EXPANSION_SERVICE +// $ python -m apache_beam.runners.portability.expansion_service_main -p $PORT_FOR_EXPANSION_SERVICE package dataframe import ( @@ -23,7 +23,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" - "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/python" ) func init() { @@ -33,8 +33,8 @@ func init() { } type kwargs struct { - Fn xlang.PythonCallableSource `beam:"func"` - IncludeIndexes bool `beam:"include_indexes"` + Fn python.CallableSource `beam:"func"` + IncludeIndexes bool `beam:"include_indexes"` } type argStruct struct{} @@ -67,7 +67,7 @@ func WithIndexes() configOption { func Transform(s beam.Scope, fn string, col beam.PCollection, outT reflect.Type, opts ...configOption) beam.PCollection { s.Scope("xlang.python.DataframeTransform") cfg := config{ - dpl: kwargs{Fn: xlang.PythonCallableSource(fn)}, + dpl: kwargs{Fn: python.CallableSource(fn)}, } for _, opt := range opts { opt(&cfg) @@ -78,7 +78,7 @@ func Transform(s beam.Scope, fn string, col beam.PCollection, outT reflect.Type, panic("no expansion service address provided for xlang.DataframeTransform(), pass xlang.WithExpansionAddr(address) as a param.") } - pet := xlang.NewPythonExternalTransform[argStruct, kwargs]("apache_beam.dataframe.transforms.DataframeTransform") + pet := python.NewExternalTransform[argStruct, kwargs]("apache_beam.dataframe.transforms.DataframeTransform") pet.WithKwargs(cfg.dpl) pl := beam.CrossLanguagePayload(pet) result := beam.CrossLanguage(s, "beam:transforms:python:fully_qualified_named", pl, cfg.expansionAddr, beam.UnnamedInput(col), beam.UnnamedOutput(typex.New(outT))) diff --git a/sdks/go/pkg/beam/transforms/xlang/external.go b/sdks/go/pkg/beam/transforms/xlang/python/external.go similarity index 74% rename from sdks/go/pkg/beam/transforms/xlang/external.go rename to sdks/go/pkg/beam/transforms/xlang/python/external.go index 265601eed194..4c4691c498e0 100644 --- a/sdks/go/pkg/beam/transforms/xlang/external.go +++ b/sdks/go/pkg/beam/transforms/xlang/python/external.go @@ -13,8 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package xlang contains data structures required for python external transforms in a multilanguage pipeline. -package xlang +// Package python contains data structures required for python external transforms in a multilanguage pipeline. +package python import ( "fmt" @@ -31,16 +31,16 @@ const ( ) var ( - pcsType = reflect.TypeOf((*PythonCallableSource)(nil)).Elem() + pcsType = reflect.TypeOf((*CallableSource)(nil)).Elem() pcsStorageType = reflectx.String ) func init() { beam.RegisterType(pcsType) - beam.RegisterSchemaProviderWithURN(pcsType, &PythonCallableSourceProvider{}, pythonCallableUrn) + beam.RegisterSchemaProviderWithURN(pcsType, &CallableSourceProvider{}, pythonCallableUrn) } -// PythonCallableSource is a wrapper object storing a Python function definition +// CallableSource is a wrapper object storing a Python function definition // that can be evaluated to Python callables in Python SDK. // // The snippet of Python code can be a valid Python expression such as @@ -64,13 +64,13 @@ func init() { // def func(y): // return helper(y) + y // in which case `func` would get applied to each element. -type PythonCallableSource string +type CallableSource string -// PythonCallableSourceProvider implement the SchemaProvider interface for logical types -type PythonCallableSourceProvider struct{} +// CallableSourceProvider implement the SchemaProvider interface for logical types +type CallableSourceProvider struct{} // FromLogicalType returns the goType of the logical type -func (p *PythonCallableSourceProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) { +func (p *CallableSourceProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) { if rt != pcsType { return nil, fmt.Errorf("unable to provide schema.LogicalType for type %v, want %v", rt, pcsType) } @@ -78,19 +78,19 @@ func (p *PythonCallableSourceProvider) FromLogicalType(rt reflect.Type) (reflect } // BuildEncoder encodes the PythonCallableSource logical type -func (p *PythonCallableSourceProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) { +func (p *CallableSourceProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) { if _, err := p.FromLogicalType(rt); err != nil { return nil, err } return func(iface interface{}, w io.Writer) error { - v := iface.(PythonCallableSource) + v := iface.(CallableSource) return coder.EncodeStringUTF8(string(v), w) }, nil } // BuildDecoder decodes the PythonCallableSource logical type -func (p *PythonCallableSourceProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) { +func (p *CallableSourceProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) { if _, err := p.FromLogicalType(rt); err != nil { return nil, err } @@ -100,14 +100,14 @@ func (p *PythonCallableSourceProvider) BuildDecoder(rt reflect.Type) (func(io.Re if err != nil { return nil, err } - return PythonCallableSource(s), nil + return CallableSource(s), nil }, nil } -// NewPythonExternalTransform creates a new instance of PythonExternalTransform. It accepts two types: +// NewExternalTransform creates a new instance for python external transform. It accepts two types: // A: used for normal arguments // K: used for keyword arguments -func NewPythonExternalTransform[A, K any](constructor string) *pythonExternalTransform[A, K] { +func NewExternalTransform[A, K any](constructor string) *pythonExternalTransform[A, K] { return &pythonExternalTransform[A, K]{Constructor: constructor} } diff --git a/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go index 240f31a9a678..8a5a63398419 100644 --- a/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go +++ b/sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go @@ -20,7 +20,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" - "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/python/dataframe" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/dataframe" ) func init() { From e68ca5b666e1b77ee8cd9023f275a3603912df15 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 18 Oct 2022 09:51:41 -0400 Subject: [PATCH 6/6] unexport type --- sdks/go/pkg/beam/transforms/xlang/python/external.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/transforms/xlang/python/external.go b/sdks/go/pkg/beam/transforms/xlang/python/external.go index 4c4691c498e0..3eb14c3c1346 100644 --- a/sdks/go/pkg/beam/transforms/xlang/python/external.go +++ b/sdks/go/pkg/beam/transforms/xlang/python/external.go @@ -37,7 +37,7 @@ var ( func init() { beam.RegisterType(pcsType) - beam.RegisterSchemaProviderWithURN(pcsType, &CallableSourceProvider{}, pythonCallableUrn) + beam.RegisterSchemaProviderWithURN(pcsType, &callableSourceProvider{}, pythonCallableUrn) } // CallableSource is a wrapper object storing a Python function definition @@ -66,11 +66,11 @@ func init() { // in which case `func` would get applied to each element. type CallableSource string -// CallableSourceProvider implement the SchemaProvider interface for logical types -type CallableSourceProvider struct{} +// callableSourceProvider implement the SchemaProvider interface for logical types +type callableSourceProvider struct{} // FromLogicalType returns the goType of the logical type -func (p *CallableSourceProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) { +func (p *callableSourceProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) { if rt != pcsType { return nil, fmt.Errorf("unable to provide schema.LogicalType for type %v, want %v", rt, pcsType) } @@ -78,7 +78,7 @@ func (p *CallableSourceProvider) FromLogicalType(rt reflect.Type) (reflect.Type, } // BuildEncoder encodes the PythonCallableSource logical type -func (p *CallableSourceProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) { +func (p *callableSourceProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) { if _, err := p.FromLogicalType(rt); err != nil { return nil, err } @@ -90,7 +90,7 @@ func (p *CallableSourceProvider) BuildEncoder(rt reflect.Type) (func(interface{} } // BuildDecoder decodes the PythonCallableSource logical type -func (p *CallableSourceProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) { +func (p *callableSourceProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) { if _, err := p.FromLogicalType(rt); err != nil { return nil, err }