Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go SDK]: Infer field names from struct tags #24473

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions sdks/go/pkg/beam/io/bigqueryio/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx"
bq "google.golang.org/api/bigquery/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
Expand All @@ -43,6 +44,9 @@ const writeSizeLimit = 10485760
// Estimate for overall message overhead.for a write message in bytes.
const writeOverheadBytes = 1024

// bigQueryTag is the struct tag key used to identify BigQuery field names.
const bigQueryTag = "bigquery"

func init() {
beam.RegisterType(reflect.TypeOf((*queryFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*writeFn)(nil)).Elem())
Expand Down Expand Up @@ -88,9 +92,21 @@ func Read(s beam.Scope, project, table string, t reflect.Type) beam.PCollection

s = s.Scope("bigquery.Read")

// TODO(herohde) 7/13/2017: using * is probably too inefficient. We could infer
// a focused query from the type.
return query(s, project, fmt.Sprintf("SELECT * from [%v]", table), t)
stmt := constructSelectStatement(t, bigQueryTag, table)

return query(s, project, stmt, t)
}

func constructSelectStatement(t reflect.Type, tagKey string, table string) string {
columns := structx.InferFieldNames(t, tagKey)

if len(columns) == 0 {
panic(fmt.Sprintf("bigqueryio.Read: type %v has no columns to select", t))
}

columnStr := strings.Join(columns, ", ")

return fmt.Sprintf("SELECT %v FROM [%v]", columnStr, table)
}

// QueryOptions represents additional options for executing a query.
Expand Down
42 changes: 41 additions & 1 deletion sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

package bigqueryio

import "testing"
import (
"reflect"
"testing"
)

func TestNewQualifiedTableName(t *testing.T) {
tests := []struct {
Expand All @@ -36,3 +39,40 @@ func TestNewQualifiedTableName(t *testing.T) {
}
}
}

func Test_constructSelectStatement(t *testing.T) {
t.Run("Statement with columns inferred from struct fields", func(t *testing.T) {
typ := reflect.TypeOf(struct {
Col1 string `bigquery:"col1"`
Col2 string `bigquery:"col2,nullable"`
Col3 string `bigquery:",nullable"`
Col4 string
Col5 string `other:"col5"`
Col6 string `bigquery:"-"`
col7 string
}{})
tagKey := "bigquery"
table := "test_table"
want := "SELECT col1, col2, Col3, Col4, Col5 FROM [test_table]"

if got := constructSelectStatement(typ, tagKey, table); got != want {
t.Errorf("constructSelectStatement() = %v, want %v", got, want)
}
})
}

func Test_constructSelectStatementPanic(t *testing.T) {
t.Run("Panic for no columns", func(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("constructSelectStatement() does not panic")
}
}()

typ := reflect.TypeOf(struct{}{})
tagKey := "bigquery"
table := "test_table"

constructSelectStatement(typ, tagKey, table)
})
}
21 changes: 6 additions & 15 deletions sdks/go/pkg/beam/io/spannerio/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,34 @@ package spannerio
import (
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"reflect"
"strings"

"cloud.google.com/go/spanner"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx"
"google.golang.org/api/iterator"
)

// spannerTag is the struct tag key used to identify Spanner field names.
const spannerTag = "spanner"

func init() {
register.DoFn3x1[context.Context, []byte, func(beam.X), error]((*queryFn)(nil))
register.Emitter1[beam.X]()
register.DoFn3x1[context.Context, int, func(*beam.X) bool, error]((*writeFn)(nil))
register.Iter1[beam.X]()
}

func columnsFromStruct(t reflect.Type) []string {
var columns []string

for i := 0; i < t.NumField(); i++ {
columns = append(columns, t.Field(i).Tag.Get("spanner"))
}

return columns
}

// Read reads all rows from the given table. The table must have a schema
// compatible with the given type, t, and Read returns a PCollection<t>. If the
// table has more rows than t, then Read is implicitly a projection.
func Read(s beam.Scope, database string, table string, t reflect.Type) beam.PCollection {
s = s.Scope("spanner.Read")

// TODO(herohde) 7/13/2017: using * is probably too inefficient. We could infer
// a focused query from the type.

cols := strings.Join(columnsFromStruct(t), ",")
cols := strings.Join(structx.InferFieldNames(t, spannerTag), ",")

return query(s, database, nil, fmt.Sprintf("SELECT %v from %v", cols, table), t)
}
Expand Down
11 changes: 0 additions & 11 deletions sdks/go/pkg/beam/io/spannerio/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,6 @@ type TestDto struct {
Two int64 `spanner:"Two"`
}

func TestColumnsFromStructReturnsColumns(t *testing.T) {
// arrange
// act
cols := columnsFromStruct(reflect.TypeOf(TestDto{}))

// assert
if len(cols) != 2 {
t.Fatalf("got %v columns, expected 2", len(cols))
}
}

func TestRead(t *testing.T) {
testCases := []struct {
name string
Expand Down
58 changes: 58 additions & 0 deletions sdks/go/pkg/beam/util/structx/struct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package structx provides utilities for working with structs.
package structx

import (
"fmt"
"reflect"
"strings"
)

// InferFieldNames returns the field names of the given struct type and tag key. Includes only
// exported fields. If a field's tag key is empty or not set, uses the field's name. If a field's
// tag key is set to '-', omits the field. Panics if the type's kind is not a struct.
func InferFieldNames(t reflect.Type, key string) []string {
if t.Kind() != reflect.Struct {
panic(fmt.Sprintf("structx: InferFieldNames of non-struct type %s", t))
}

var names []string

for i := 0; i < t.NumField(); i++ {
field := t.Field(i)

if field.Anonymous {
names = append(names, InferFieldNames(field.Type, key)...)
continue
}

if !field.IsExported() {
continue
}

value := field.Tag.Get(key)
name := strings.Split(value, ",")[0]

if name == "" {
names = append(names, field.Name)
} else if name != "-" {
names = append(names, name)
}
}

return names
}
144 changes: 144 additions & 0 deletions sdks/go/pkg/beam/util/structx/struct_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package structx

import (
"reflect"
"testing"

"github.com/google/go-cmp/cmp"
)

func TestInferFieldNames(t *testing.T) {
type embedded struct {
Embedded1 string `key:"embedded1"`
Embedded2 string
}

tests := []struct {
name string
t reflect.Type
key string
want []string
}{
{
name: "Return slice of field names from tagged struct fields",
t: reflect.TypeOf(struct {
Field1 string `key:"field1"`
Field2 string `key:"field2"`
}{}),
key: "key",
want: []string{"field1", "field2"},
},
{
name: "Return nil slice from struct with no fields",
t: reflect.TypeOf(struct{}{}),
key: "key",
want: nil,
},
{
name: "Use struct field name for field without tag",
t: reflect.TypeOf(struct {
Field1 string `key:"field1"`
Field2 string
}{}),
key: "key",
want: []string{"field1", "Field2"},
},
{
name: "Use struct field name for field with empty tag",
t: reflect.TypeOf(struct {
Field1 string `key:"field1"`
Field2 string `key:""`
}{}),
key: "key",
want: []string{"field1", "Field2"},
},
{
name: "Use struct field name for field with other tag key",
t: reflect.TypeOf(struct {
Field1 string `key:"field1"`
Field2 string `other:"field2"`
}{}),
key: "key",
want: []string{"field1", "Field2"},
},
{
name: "Omit field with '-' tag",
t: reflect.TypeOf(struct {
Field1 string `key:"field1"`
Field2 string `key:"-"`
}{}),
key: "key",
want: []string{"field1"},
},
{
name: "Omit unexported field",
t: reflect.TypeOf(struct {
Field1 string `key:"field1"`
field2 string
}{}),
key: "key",
want: []string{"field1"},
},
{
name: "Omit unexported field with tag",
t: reflect.TypeOf(struct {
Field1 string `key:"field1"`
field2 string `key:"field2"`
}{}),
key: "key",
want: []string{"field1"},
},
{
name: "Extract the first comma separated value",
t: reflect.TypeOf(struct {
Field1 string `key:"field1,omitempty"`
Field2 string `key:",omitempty"`
}{}),
key: "key",
want: []string{"field1", "Field2"},
},
{
name: "Include fields from embedded struct",
t: reflect.TypeOf(struct {
Field1 string `key:"field1"`
embedded
}{}),
key: "key",
want: []string{"field1", "embedded1", "Embedded2"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := InferFieldNames(tt.t, tt.key); !cmp.Equal(got, tt.want) {
t.Errorf("InferFieldNames() = %v, want %v", got, tt.want)
}
})
}
}

func TestInferFieldNamesPanic(t *testing.T) {
t.Run("Panic for non-struct type", func(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("InferFieldNames() does not panic")
}
}()

InferFieldNames(reflect.TypeOf(""), "key")
})
}