Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go SDK] Dataframe API wrapper #23450

Merged
merged 9 commits into from
Oct 18, 2022
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Dataframe wrapper added in Go SDK via Cross-Language (Need to manually start python expansion service). (Go) ([#23384](https://github.com/apache/beam/issues/23384)).

## Breaking Changes

Expand Down
17 changes: 17 additions & 0 deletions sdks/go/pkg/beam/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ func RegisterSchemaProvider(rt reflect.Type, provider interface{}) {
coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder)
}

// RegisterSchemaProviderWithURN is for internal use only. Users are recommended to use
// beam.RegisterSchemaProvider() instead.
// RegisterSchemaProviderWithURN registers a new schema provider for a new logical type defined
// in pkg/beam/model/pipeline_v1/schema.pb.go
//
// RegisterSchemaProviderWithURN must be called before beam.Init(), and conventionally
// is called in a package init() function.
func RegisterSchemaProviderWithURN(rt reflect.Type, provider interface{}, urn string) {
p := provider.(SchemaProvider)
st, err := p.FromLogicalType(rt)
if err != nil {
panic(fmt.Sprintf("beam.RegisterSchemaProvider: schema type provider for %v, doesn't support that type", rt))
}
schema.RegisterLogicalType(schema.ToLogicalType(urn, rt, st))
coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder)
}

// SchemaProvider specializes schema handling for complex types, including conversion to a
// valid schema base type,
//
Expand Down
129 changes: 129 additions & 0 deletions sdks/go/pkg/beam/transforms/xlang/external.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package xlang contains data structures required for python external transforms in a multilanguage pipeline.
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
package xlang

import (
"fmt"
"io"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)

const (
pythonCallableUrn = "beam:logical_type:python_callable:v1"
)

var (
pcsType = reflect.TypeOf((*PythonCallableSource)(nil)).Elem()
pcsStorageType = reflectx.String
)

func init() {
beam.RegisterType(pcsType)
beam.RegisterSchemaProviderWithURN(pcsType, &PythonCallableSourceProvider{}, pythonCallableUrn)
}

// PythonCallableSource is a wrapper object storing a Python function definition
// that can be evaluated to Python callables in Python SDK.
//
// The snippet of Python code can be a valid Python expression such as
// lambda x: x * x
// str.upper
// a fully qualified name such as
// math.sin
// or a complete multi-line function or class definition such as
// def foo(x):
// ...
// class Foo:
// ...
//
// Any lines preceding the function definition are first evaluated to provide context in which to
// define the function which can be useful to declare imports or any other needed values, e.g.
// import math
//
// def helper(x):
// return x * x
//
// def func(y):
// return helper(y) + y
// in which case `func` would get applied to each element.
type PythonCallableSource string

// PythonCallableSourceProvider implement the SchemaProvider interface for logical types
type PythonCallableSourceProvider struct{}
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved

// FromLogicalType returns the goType of the logical type
func (p *PythonCallableSourceProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) {
if rt != pcsType {
return nil, fmt.Errorf("unable to provide schema.LogicalType for type %v, want %v", rt, pcsType)
}
return pcsStorageType, nil
}

// BuildEncoder encodes the PythonCallableSource logical type
func (p *PythonCallableSourceProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) {
if _, err := p.FromLogicalType(rt); err != nil {
return nil, err
}

return func(iface interface{}, w io.Writer) error {
v := iface.(PythonCallableSource)
return coder.EncodeStringUTF8(string(v), w)
}, nil
}

// BuildDecoder decodes the PythonCallableSource logical type
func (p *PythonCallableSourceProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) {
if _, err := p.FromLogicalType(rt); err != nil {
return nil, err
}

return func(r io.Reader) (interface{}, error) {
s, err := coder.DecodeStringUTF8(r)
if err != nil {
return nil, err
}
return PythonCallableSource(s), nil
}, nil
}

// NewPythonExternalTransform creates a new instance of PythonExternalTransform. It accepts two types:
// A: used for normal arguments
// K: used for keyword arguments
func NewPythonExternalTransform[A, K any](constructor string) *pythonExternalTransform[A, K] {
return &pythonExternalTransform[A, K]{Constructor: constructor}
}

// PythonExternalTransform holds the details required for an External Python Transform.
type pythonExternalTransform[A, K any] struct {
Constructor string `beam:"constructor"`
Args A `beam:"args"`
Kwargs K `beam:"kwargs"`
}

// WithArgs adds arguments to the External Python Transform.
func (p *pythonExternalTransform[A, K]) WithArgs(args any) {
p.Args = args.(A)
}

// WithKwargs adds keyword arguments to the External Python Transform.
func (p *pythonExternalTransform[A, K]) WithKwargs(kwargs any) {
p.Kwargs = kwargs.(K)
}
87 changes: 87 additions & 0 deletions sdks/go/pkg/beam/transforms/xlang/python/dataframe/dataframe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package dataframe is a wrapper for DataframeTransform defined in Apache Beam Python SDK.
// An exapnsion service for python external transforms can be started by running
// $ python -m apache_beam.runners.portability.expansion_service_test -p $PORT_FOR_EXPANSION_SERVICE
package dataframe

import (
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang"
)

func init() {
beam.RegisterType(reflect.TypeOf((*config)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*kwargs)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*argStruct)(nil)).Elem())
}

type kwargs struct {
Fn xlang.PythonCallableSource `beam:"func"`
IncludeIndexes bool `beam:"include_indexes"`
}

type argStruct struct{}

type config struct {
dpl kwargs
expansionAddr string
}

type configOption func(*config)

// WithExpansionAddr sets an URL for a Python expansion service.
func WithExpansionAddr(expansionAddr string) configOption {
return func(c *config) {
c.expansionAddr = expansionAddr
}
}

// WithIndexes sets include_indexes option for DataframeTransform.
func WithIndexes() configOption {
return func(c *config) {
c.dpl.IncludeIndexes = true
}
}

// Transform is a multi-language wrapper for a Python DataframeTransform with a given lambda function.
// lambda function is a required parameter.
// Additional option for including indexes in dataframe can be provided by using
// dataframe.WithIndexes().
func Transform(s beam.Scope, fn string, col beam.PCollection, outT reflect.Type, opts ...configOption) beam.PCollection {
s.Scope("xlang.python.DataframeTransform")
cfg := config{
dpl: kwargs{Fn: xlang.PythonCallableSource(fn)},
}
for _, opt := range opts {
opt(&cfg)
}

// TODO: load automatic expansion service here
if cfg.expansionAddr == "" {
panic("no expansion service address provided for xlang.DataframeTransform(), pass xlang.WithExpansionAddr(address) as a param.")
}

pet := xlang.NewPythonExternalTransform[argStruct, kwargs]("apache_beam.dataframe.transforms.DataframeTransform")
pet.WithKwargs(cfg.dpl)
pl := beam.CrossLanguagePayload(pet)
result := beam.CrossLanguage(s, "beam:transforms:python:fully_qualified_named", pl, cfg.expansionAddr, beam.UnnamedInput(col), beam.UnnamedOutput(typex.New(outT)))
return result[beam.UnnamedOutputTag()]

}
1 change: 1 addition & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var directFilters = []string{
"TestDebeziumIO_BasicRead",
"TestJDBCIO_BasicReadWrite",
"TestJDBCIO_PostgresReadWrite",
"TestDataframe",
// Triggers, Panes are not yet supported
"TestTrigger.*",
"TestPanes",
Expand Down
48 changes: 48 additions & 0 deletions sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataframe

import (
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/python/dataframe"
)

func init() {
beam.RegisterType(reflect.TypeOf((*TestRow)(nil)).Elem())
}

type TestRow struct {
A int64 `beam:"a"`
B int32 `beam:"b"`
}

func DataframeTransform(expansionAddr string) *beam.Pipeline {
row0 := TestRow{A: int64(100), B: int32(1)}
row1 := TestRow{A: int64(100), B: int32(2)}
row2 := TestRow{A: int64(100), B: int32(3)}
row3 := TestRow{A: int64(200), B: int32(4)}

p, s := beam.NewPipelineWithRoot()

input := beam.Create(s, row0, row1, row3)
outCol := dataframe.Transform(s, "lambda df: df.groupby('a').sum()", input, reflect.TypeOf((*TestRow)(nil)).Elem(), dataframe.WithExpansionAddr(expansionAddr), dataframe.WithIndexes())

passert.Equals(s, outCol, row2, row3)
return p
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataframe

import (
"flag"
"log"
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
)

var expansionAddr string // Populate with expansion address labelled "python_transform".

func checkFlags(t *testing.T) {
if expansionAddr == "" {
t.Skip("No python transform expansion address provided.")
}
}

func TestDataframe(t *testing.T) {
integration.CheckFilters(t)
checkFlags(t)
p := DataframeTransform(expansionAddr)
ptest.RunAndValidate(t, p)
}

func TestMain(m *testing.M) {
flag.Parse()
beam.Init()

services := integration.NewExpansionServices()
defer func() { services.Shutdown() }()
addr, err := services.GetAddr("python_transform")
lostluck marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Printf("skipping missing expansion service: %v", err)
} else {
expansionAddr = addr
}

ptest.MainRet(m)
}