From 4dbb2904a5a529b84fb99ff0a4b7f352749672a2 Mon Sep 17 00:00:00 2001 From: Johanna Ojeling Date: Sun, 12 Mar 2023 10:12:21 +0100 Subject: [PATCH 1/9] Create utility file structs --- sdks/go/pkg/beam/io/fileio/file.go | 124 +++++++++++ sdks/go/pkg/beam/io/fileio/file_test.go | 256 ++++++++++++++++++++++ sdks/go/pkg/beam/io/fileio/gzip.go | 60 +++++ sdks/go/pkg/beam/io/fileio/helper_test.go | 88 ++++++++ sdks/go/pkg/beam/io/filesystem/file.go | 44 ++++ 5 files changed, 572 insertions(+) create mode 100644 sdks/go/pkg/beam/io/fileio/file.go create mode 100644 sdks/go/pkg/beam/io/fileio/file_test.go create mode 100644 sdks/go/pkg/beam/io/fileio/gzip.go create mode 100644 sdks/go/pkg/beam/io/fileio/helper_test.go create mode 100644 sdks/go/pkg/beam/io/filesystem/file.go diff --git a/sdks/go/pkg/beam/io/fileio/file.go b/sdks/go/pkg/beam/io/fileio/file.go new file mode 100644 index 000000000000..fcc1eaf10845 --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/file.go @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "context" + "errors" + "io" + "path/filepath" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*ReadableFile)(nil)).Elem()) +} + +// ReadableFile is a wrapper around a filesystem.FileMetadata and filesystem.Compression that can be +// used to obtain a file descriptor or read the file's contents. +type ReadableFile struct { + Metadata filesystem.FileMetadata + Compression filesystem.Compression +} + +// Open opens the file for reading. The compression type is determined by the Compression field of +// the ReadableFile. If Compression is filesystem.CompressionAuto, the compression type is +// auto-detected from the file extension. It is the caller's responsibility to close the returned +// reader. +func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) { + fs, err := filesystem.New(ctx, f.Metadata.Path) + if err != nil { + return nil, err + } + defer fs.Close() + + rc, err := fs.OpenRead(ctx, f.Metadata.Path) + if err != nil { + return nil, err + } + + comp := f.Compression + if comp == filesystem.CompressionAuto { + comp = compressionFromExt(f.Metadata.Path) + } + + return newDecompressionReader(rc, comp) +} + +// compressionFromExt detects the compression of a file based on its extension. If the extension is +// not recognized, filesystem.CompressionUncompressed is returned. +func compressionFromExt(path string) filesystem.Compression { + switch filepath.Ext(path) { + case ".gz": + return filesystem.CompressionGzip + default: + return filesystem.CompressionUncompressed + } +} + +// newDecompressionReader returns an io.ReadCloser that can be used to read uncompressed data from +// reader, based on the specified compression. If the compression is filesystem.CompressionAuto, +// a non-nil error is returned. It is the caller's responsibility to close the returned reader. +func newDecompressionReader( + reader io.ReadCloser, + compression filesystem.Compression, +) (io.ReadCloser, error) { + switch compression { + case filesystem.CompressionAuto: + return nil, errors.New( + "compression must be resolved into a concrete type before obtaining a reader", + ) + case filesystem.CompressionGzip: + return newGzipReader(reader) + default: + return reader, nil + } +} + +// Read reads the entire file into memory and returns the contents. +func (f ReadableFile) Read(ctx context.Context) (data []byte, err error) { + rc, err := f.Open(ctx) + if err != nil { + return nil, err + } + + defer func() { + closeErr := rc.Close() + if err != nil { + if closeErr != nil { + log.Errorf(ctx, "error closing reader: %v", closeErr) + } + return + } + err = closeErr + }() + + return io.ReadAll(rc) +} + +// ReadString reads the entire file into memory and returns the contents as a string. +func (f ReadableFile) ReadString(ctx context.Context) (string, error) { + data, err := f.Read(ctx) + if err != nil { + return "", err + } + + return string(data), nil +} diff --git a/sdks/go/pkg/beam/io/fileio/file_test.go b/sdks/go/pkg/beam/io/fileio/file_test.go new file mode 100644 index 000000000000..bc20dad7873e --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/file_test.go @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "bytes" + "context" + "path/filepath" + "testing" + "testing/iotest" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" +) + +func TestReadableFile_Open(t *testing.T) { + dir := t.TempDir() + write(t, filepath.Join(dir, "file1.txt"), []byte("test1")) + writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2")) + + tests := []struct { + name string + file ReadableFile + want []byte + }{ + { + name: "Open uncompressed file", + file: ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: filepath.Join(dir, "file1.txt"), + }, + Compression: filesystem.CompressionUncompressed, + }, + want: []byte("test1"), + }, + { + name: "Open file with auto-detection of compression", + file: ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: filepath.Join(dir, "file2.gz"), + }, + Compression: filesystem.CompressionAuto, + }, + want: []byte("test2"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + rc, err := tt.file.Open(ctx) + if err != nil { + t.Fatalf("Open() error = %v, want nil", err) + } + + t.Cleanup(func() { + rc.Close() + }) + + if err := iotest.TestReader(rc, tt.want); err != nil { + t.Errorf("TestReader() error = %v, want nil", err) + } + }) + } +} + +func Test_compressionFromExt(t *testing.T) { + tests := []struct { + name string + path string + want filesystem.Compression + }{ + { + name: "CompressionGzip for gz extension", + path: "file.gz", + want: filesystem.CompressionGzip, + }, + { + name: "CompressionUncompressed for no extension", + path: "file", + want: filesystem.CompressionUncompressed, + }, + { + name: "CompressionUncompressed for unrecognized extension", + path: "file.unknown", + want: filesystem.CompressionUncompressed, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := compressionFromExt(tt.path); got != tt.want { + t.Errorf("compressionFromExt() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_newDecompressionReader(t *testing.T) { + dir := t.TempDir() + write(t, filepath.Join(dir, "file1.txt"), []byte("test1")) + writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2")) + + tests := []struct { + name string + path string + comp filesystem.Compression + want []byte + wantErr bool + }{ + { + name: "Reader for uncompressed file", + path: filepath.Join(dir, "file1.txt"), + comp: filesystem.CompressionUncompressed, + want: []byte("test1"), + }, + { + name: "Reader for gzip compressed file", + path: filepath.Join(dir, "file2.gz"), + comp: filesystem.CompressionGzip, + want: []byte("test2"), + }, + { + name: "Error - reader for auto compression not supported", + path: filepath.Join(dir, "file2.gz"), + comp: filesystem.CompressionAuto, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rc := openFile(t, tt.path) + + dr, err := newDecompressionReader(rc, tt.comp) + if (err != nil) != tt.wantErr { + t.Fatalf("newDecompressionReader() error = %v, wantErr %v", err, tt.wantErr) + } + if tt.wantErr { + return + } + + t.Cleanup(func() { + dr.Close() + }) + + if err := iotest.TestReader(dr, tt.want); err != nil { + t.Errorf("TestReader() error = %v, want nil", err) + } + }) + } +} + +func TestReadableFile_Read(t *testing.T) { + dir := t.TempDir() + write(t, filepath.Join(dir, "file1.txt"), []byte("test1")) + writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2")) + + tests := []struct { + name string + file ReadableFile + want []byte + }{ + { + name: "Read contents from uncompressed file", + file: ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: filepath.Join(dir, "file1.txt"), + }, + Compression: filesystem.CompressionUncompressed, + }, + want: []byte("test1"), + }, + { + name: "Read contents from gzip compressed file", + file: ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: filepath.Join(dir, "file2.gz"), + }, + Compression: filesystem.CompressionGzip, + }, + want: []byte("test2"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + got, err := tt.file.Read(ctx) + if err != nil { + t.Fatalf("Read() error = %v, want nil", err) + } + + if !bytes.Equal(got, tt.want) { + t.Errorf("Read() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestReadableFile_ReadString(t *testing.T) { + dir := t.TempDir() + write(t, filepath.Join(dir, "file1.txt"), []byte("test1")) + writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2")) + + tests := []struct { + name string + file ReadableFile + want string + }{ + { + name: "Read contents from uncompressed file as string", + file: ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: filepath.Join(dir, "file1.txt"), + }, + Compression: filesystem.CompressionUncompressed, + }, + want: "test1", + }, + { + name: "Read contents from gzip compressed file as string", + file: ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: filepath.Join(dir, "file2.gz"), + }, + Compression: filesystem.CompressionGzip, + }, + want: "test2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + got, err := tt.file.ReadString(ctx) + if err != nil { + t.Fatalf("ReadString() error = %v, want nil", err) + } + + if got != tt.want { + t.Errorf("ReadString() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/io/fileio/gzip.go b/sdks/go/pkg/beam/io/fileio/gzip.go new file mode 100644 index 000000000000..5a63d9be7d3c --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/gzip.go @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "compress/gzip" + "context" + "io" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" +) + +// gzipReader is a wrapper around a gzip.Reader that also closes the underlying io.ReadCloser. +type gzipReader struct { + rc io.ReadCloser + zr *gzip.Reader +} + +// newGzipReader creates a new gzipReader from an io.ReadCloser. +func newGzipReader(rc io.ReadCloser) (*gzipReader, error) { + zr, err := gzip.NewReader(rc) + if err != nil { + return nil, err + } + return &gzipReader{rc: rc, zr: zr}, nil +} + +// Read reads from the gzip reader. +func (r *gzipReader) Read(p []byte) (int, error) { + return r.zr.Read(p) +} + +// Close closes the gzip reader and the underlying io.ReadCloser. +func (r *gzipReader) Close() (err error) { + defer func() { + rcErr := r.rc.Close() + if err != nil { + if rcErr != nil { + log.Errorf(context.Background(), "error closing reader: %v", rcErr) + } + return + } + err = rcErr + }() + + return r.zr.Close() +} diff --git a/sdks/go/pkg/beam/io/fileio/helper_test.go b/sdks/go/pkg/beam/io/fileio/helper_test.go new file mode 100644 index 000000000000..e8b61ef067fd --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/helper_test.go @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "bufio" + "compress/gzip" + "io" + "os" + "path/filepath" + "testing" +) + +// openFile opens a file for reading. +func openFile(t *testing.T, path string) io.ReadCloser { + t.Helper() + + f, err := os.Open(path) + if err != nil { + t.Fatal(err) + } + + return f +} + +// createFile creates a file and parent directories if needed. +func createFile(t *testing.T, path string) *os.File { + t.Helper() + + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0750); err != nil && !os.IsExist(err) { + t.Fatal(err) + } + + file, err := os.Create(path) + if err != nil { + t.Fatal(err) + } + + return file +} + +// write writes data to a file. +func write(t *testing.T, path string, data []byte) { + t.Helper() + + f := createFile(t, path) + defer f.Close() + + bw := bufio.NewWriter(f) + if _, err := bw.Write(data); err != nil { + t.Fatal(err) + } + + if err := bw.Flush(); err != nil { + t.Fatal(err) + } +} + +// writeGzip compresses and writes data to a file using gzip. +func writeGzip(t *testing.T, path string, data []byte) { + t.Helper() + + f := createFile(t, path) + defer f.Close() + + zw := gzip.NewWriter(f) + if _, err := zw.Write(data); err != nil { + t.Fatal(err) + } + + if err := zw.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/sdks/go/pkg/beam/io/filesystem/file.go b/sdks/go/pkg/beam/io/filesystem/file.go new file mode 100644 index 000000000000..58f0e7ea458a --- /dev/null +++ b/sdks/go/pkg/beam/io/filesystem/file.go @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filesystem + +import ( + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*FileMetadata)(nil)).Elem()) +} + +// FileMetadata contains metadata about a file, namely its path and size in bytes. +type FileMetadata struct { + Path string + Size int64 +} + +// Compression is the type of compression used to compress a file. +type Compression int + +const ( + // CompressionAuto indicates that the compression type should be auto-detected. + CompressionAuto Compression = iota + // CompressionGzip indicates that the file is compressed using gzip. + CompressionGzip + // CompressionUncompressed indicates that the file is not compressed. + CompressionUncompressed +) From b788b9fb312d679c21ca2cdb10785286050b6268 Mon Sep 17 00:00:00 2001 From: Johanna Ojeling Date: Sun, 12 Mar 2023 10:12:35 +0100 Subject: [PATCH 2/9] Create MatchFiles and MatchAll transforms --- sdks/go/pkg/beam/io/fileio/match.go | 174 ++++++++++++++ sdks/go/pkg/beam/io/fileio/match_test.go | 286 +++++++++++++++++++++++ 2 files changed, 460 insertions(+) create mode 100644 sdks/go/pkg/beam/io/fileio/match.go create mode 100644 sdks/go/pkg/beam/io/fileio/match_test.go diff --git a/sdks/go/pkg/beam/io/fileio/match.go b/sdks/go/pkg/beam/io/fileio/match.go new file mode 100644 index 000000000000..333b5b234d41 --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/match.go @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package fileio provides transforms for matching and reading files. +package fileio + +import ( + "context" + "fmt" + "strings" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" +) + +func init() { + register.DoFn3x1[context.Context, string, func(filesystem.FileMetadata), error](&matchFn{}) + register.Emitter1[filesystem.FileMetadata]() +} + +// EmptyMatchTreatment controls how empty matches of a pattern are treated. +type EmptyMatchTreatment int + +const ( + // EmptyMatchTreatmentAllow allows empty matches. + EmptyMatchTreatmentAllow EmptyMatchTreatment = iota + // EmptyMatchTreatmentDisallow disallows empty matches. + EmptyMatchTreatmentDisallow + // EmptyMatchTreatmentAllowIfWildcard allows empty matches if the pattern contains a wildcard. + EmptyMatchTreatmentAllowIfWildcard +) + +type matchOption struct { + EmptyMatchTreatment EmptyMatchTreatment +} + +// MatchOptionFn is a function that can be passed to MatchFiles or MatchAll to configure options for +// matching files. +type MatchOptionFn func(*matchOption) error + +// WithEmptyMatchTreatment specifies how empty matches of a pattern should be treated. By default, +// empty matches are allowed if the pattern contains a wildcard. +func WithEmptyMatchTreatment(treatment EmptyMatchTreatment) MatchOptionFn { + return func(o *matchOption) error { + o.EmptyMatchTreatment = treatment + return nil + } +} + +// MatchFiles finds all files matching the glob pattern and returns a +// PCollection of the matching files. +func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollection { + s = s.Scope("fileio.MatchFiles") + + filesystem.ValidateScheme(glob) + return MatchAll(s, beam.Create(s, glob), opts...) +} + +// MatchAll finds all files matching the glob patterns given by the incoming PCollection and +// returns a PCollection of the matching files. +func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) beam.PCollection { + s = s.Scope("fileio.MatchAll") + + option := &matchOption{ + EmptyMatchTreatment: EmptyMatchTreatmentAllowIfWildcard, + } + + for _, opt := range opts { + if err := opt(option); err != nil { + panic(fmt.Sprintf("fileio.MatchAll: invalid option: %v", err)) + } + } + + return beam.ParDo(s, newMatchFn(option), col) +} + +type matchFn struct { + EmptyMatchTreatment EmptyMatchTreatment +} + +func newMatchFn(option *matchOption) *matchFn { + return &matchFn{ + EmptyMatchTreatment: option.EmptyMatchTreatment, + } +} + +func (fn *matchFn) ProcessElement( + ctx context.Context, + glob string, + emit func(filesystem.FileMetadata), +) error { + if strings.TrimSpace(glob) == "" { + return nil + } + + fs, err := filesystem.New(ctx, glob) + if err != nil { + return err + } + defer fs.Close() + + files, err := fs.List(ctx, glob) + if err != nil { + return err + } + + if len(files) == 0 { + if !allowEmptyMatch(glob, fn.EmptyMatchTreatment) { + return fmt.Errorf("no files matching pattern %q", glob) + } + return nil + } + + metadata, err := metadataFromFiles(ctx, fs, files) + if err != nil { + return err + } + + for _, md := range metadata { + emit(md) + } + + return nil +} + +func allowEmptyMatch(glob string, treatment EmptyMatchTreatment) bool { + switch treatment { + case EmptyMatchTreatmentDisallow: + return false + case EmptyMatchTreatmentAllowIfWildcard: + return strings.Contains(glob, "*") + default: + return true + } +} + +func metadataFromFiles( + ctx context.Context, + fs filesystem.Interface, + files []string, +) ([]filesystem.FileMetadata, error) { + if len(files) == 0 { + return nil, nil + } + + metadata := make([]filesystem.FileMetadata, len(files)) + + for i, path := range files { + size, err := fs.Size(ctx, path) + if err != nil { + return nil, err + } + + metadata[i] = filesystem.FileMetadata{ + Path: path, + Size: size, + } + } + + return metadata, nil +} diff --git a/sdks/go/pkg/beam/io/fileio/match_test.go b/sdks/go/pkg/beam/io/fileio/match_test.go new file mode 100644 index 000000000000..0385cd6b2a74 --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/match_test.go @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "context" + "path/filepath" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/google/go-cmp/cmp" +) + +type testFile struct { + filename string + data []byte +} + +var testFiles = []testFile{ + { + filename: "file1.txt", + data: []byte("test1"), + }, + { + filename: "file2.txt", + data: []byte(""), + }, + { + filename: "file3.csv", + data: []byte("test3"), + }, +} + +func TestMatchFiles(t *testing.T) { + dir := t.TempDir() + testDir := filepath.Join(dir, "testdata") + + for _, tf := range testFiles { + write(t, filepath.Join(testDir, tf.filename), tf.data) + } + + tests := []struct { + name string + glob string + opts []MatchOptionFn + want []any + }{ + { + name: "Match files", + glob: filepath.Join(dir, "*", "file*.txt"), + want: []any{ + filesystem.FileMetadata{ + Path: filepath.Join(testDir, "file1.txt"), + Size: 5, + }, + filesystem.FileMetadata{ + Path: filepath.Join(testDir, "file2.txt"), + Size: 0, + }, + }, + }, + { + name: "Read matches with specified empty match treatment", + opts: []MatchOptionFn{ + WithEmptyMatchTreatment(EmptyMatchTreatmentAllow), + }, + glob: filepath.Join(dir, "non-existent.txt"), + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + + got := MatchFiles(s, tt.glob, tt.opts...) + + passert.Equals(s, got, tt.want...) + ptest.RunAndValidate(t, p) + }) + } +} + +func TestMatchAll(t *testing.T) { + dir := t.TempDir() + testDir := filepath.Join(dir, "testdata") + + for _, tf := range testFiles { + write(t, filepath.Join(testDir, tf.filename), tf.data) + } + + tests := []struct { + name string + opts []MatchOptionFn + input []any + want []any + wantErr bool + }{ + { + name: "Match all", + input: []any{ + filepath.Join(dir, "*", "file*.txt"), + filepath.Join(dir, "*", "file*.csv"), + }, + want: []any{ + filesystem.FileMetadata{ + Path: filepath.Join(testDir, "file1.txt"), + Size: 5, + }, + filesystem.FileMetadata{ + Path: filepath.Join(testDir, "file2.txt"), + Size: 0, + }, + filesystem.FileMetadata{ + Path: filepath.Join(testDir, "file3.csv"), + Size: 5, + }, + }, + }, + { + name: "No matches", + input: []any{ + filepath.Join(dir, "*", "non-existent.txt"), + }, + want: nil, + }, + { + name: "No matches for empty glob", + input: []any{""}, + want: nil, + }, + { + name: "No matches for glob without wildcard and empty matches allowed", + opts: []MatchOptionFn{ + WithEmptyMatchTreatment(EmptyMatchTreatmentAllow), + }, + input: []any{ + filepath.Join(dir, "non-existent.txt"), + }, + want: nil, + }, + { + name: "Error - no matches for glob without wildcard", + input: []any{ + filepath.Join(dir, "non-existent.txt"), + }, + wantErr: true, + }, + { + name: "Error - no matches and empty matches disallowed", + opts: []MatchOptionFn{ + WithEmptyMatchTreatment(EmptyMatchTreatmentDisallow), + }, + input: []any{ + filepath.Join(dir, "*", "non-existent.txt"), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + + col := beam.Create(s, tt.input...) + got := MatchAll(s, col, tt.opts...) + + passert.Equals(s, got, tt.want...) + if err := ptest.Run(p); (err != nil) != tt.wantErr { + t.Errorf("MatchAll() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_allowEmptyMatch(t *testing.T) { + tests := []struct { + name string + glob string + treatment EmptyMatchTreatment + want bool + }{ + { + name: "Allow for EmptyMatchTreatmentAllow", + glob: "path/to/file.txt", + treatment: EmptyMatchTreatmentAllow, + want: true, + }, + { + name: "Disallow for EmptyMatchTreatmentDisallow", + glob: "path/to/file.txt", + treatment: EmptyMatchTreatmentDisallow, + want: false, + }, + { + name: "Allow for glob with wildcard and EmptyMatchTreatmentAllowIfWildcard", + glob: "path/to/*.txt", + treatment: EmptyMatchTreatmentAllowIfWildcard, + want: true, + }, + { + name: "Disallow for glob without wildcard and EmptyMatchTreatmentAllowIfWildcard", + glob: "path/to/file.txt", + treatment: EmptyMatchTreatmentAllowIfWildcard, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := allowEmptyMatch(tt.glob, tt.treatment); got != tt.want { + t.Errorf("allowEmptyMatch() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_metadataFromFiles(t *testing.T) { + dir := t.TempDir() + files := make([]string, len(testFiles)) + + for i, tf := range testFiles { + file := filepath.Join(dir, tf.filename) + write(t, file, tf.data) + files[i] = file + } + + tests := []struct { + name string + files []string + want []filesystem.FileMetadata + }{ + { + name: "Slice of FileMetadata from file paths", + files: files, + want: []filesystem.FileMetadata{ + { + Path: filepath.Join(dir, "file1.txt"), + Size: 5, + }, + { + Path: filepath.Join(dir, "file2.txt"), + Size: 0, + }, + { + Path: filepath.Join(dir, "file3.csv"), + Size: 5, + }, + }, + }, + { + name: "Nil when files is empty", + files: nil, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + fs := local.New(ctx) + + got, err := metadataFromFiles(ctx, fs, tt.files) + if err != nil { + t.Fatalf("metadataFromFiles() error = %v, want nil", err) + } + + if !cmp.Equal(got, tt.want) { + t.Errorf("metadataFromFiles() got = %v, want %v", got, tt.want) + } + }) + } +} From 6e6635f620c9425c038ec186bb4da23c3b042084 Mon Sep 17 00:00:00 2001 From: Johanna Ojeling Date: Sun, 12 Mar 2023 10:12:42 +0100 Subject: [PATCH 3/9] Create ReadMatches transform --- sdks/go/pkg/beam/io/fileio/read.go | 126 ++++++++++++++++ sdks/go/pkg/beam/io/fileio/read_test.go | 182 ++++++++++++++++++++++++ 2 files changed, 308 insertions(+) create mode 100644 sdks/go/pkg/beam/io/fileio/read.go create mode 100644 sdks/go/pkg/beam/io/fileio/read_test.go diff --git a/sdks/go/pkg/beam/io/fileio/read.go b/sdks/go/pkg/beam/io/fileio/read.go new file mode 100644 index 000000000000..4e01cf2928cd --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/read.go @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "fmt" + "strings" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" +) + +func init() { + register.DoFn2x1[filesystem.FileMetadata, func(ReadableFile), error](&readFn{}) + register.Emitter1[ReadableFile]() +} + +// DirectoryTreatment controls how directories are treated when reading matches. +type DirectoryTreatment int + +const ( + // DirectoryTreatmentSkip skips directories. + DirectoryTreatmentSkip DirectoryTreatment = iota + // DirectoryTreatmentDisallow disallows directories. + DirectoryTreatmentDisallow +) + +type readOption struct { + Compression filesystem.Compression + DirectoryTreatment DirectoryTreatment +} + +// ReadOptionFn is a function that can be passed to ReadMatches to configure options for +// reading files. +type ReadOptionFn func(*readOption) error + +// WithReadCompression specifies the compression type of the files that are read. By default, +// the compression type is determined by the file extension. +func WithReadCompression(compression filesystem.Compression) ReadOptionFn { + return func(o *readOption) error { + o.Compression = compression + return nil + } +} + +// WithDirectoryTreatment specifies how directories should be treated when reading files. By +// default, directories are skipped. +func WithDirectoryTreatment(treatment DirectoryTreatment) ReadOptionFn { + return func(o *readOption) error { + o.DirectoryTreatment = treatment + return nil + } +} + +// ReadMatches accepts the result of MatchFiles or MatchAll as a +// PCollection and converts it to a PCollection. The +// ReadableFile can be used to retrieve file metadata, open the file for reading or read the entire +// file into memory. The compression type of the readable files can be specified by passing the +// WithReadCompression option. If no compression type is provided, it will be determined by the file +// extension. +func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection { + s = s.Scope("fileio.ReadMatches") + + option := &readOption{ + Compression: filesystem.CompressionAuto, + DirectoryTreatment: DirectoryTreatmentSkip, + } + + for _, opt := range opts { + if err := opt(option); err != nil { + panic(fmt.Sprintf("fileio.ReadMatches: invalid option: %v", err)) + } + } + + return beam.ParDo(s, newReadFn(option), col) +} + +type readFn struct { + Compression filesystem.Compression + DirectoryTreatment DirectoryTreatment +} + +func newReadFn(option *readOption) *readFn { + return &readFn{ + Compression: option.Compression, + DirectoryTreatment: option.DirectoryTreatment, + } +} + +func (fn *readFn) ProcessElement(metadata filesystem.FileMetadata, emit func(ReadableFile)) error { + if isDirectory(metadata.Path) { + if fn.DirectoryTreatment == DirectoryTreatmentDisallow { + return fmt.Errorf("path to directory not allowed: %q", metadata.Path) + } + return nil + } + + file := ReadableFile{ + Metadata: metadata, + Compression: fn.Compression, + } + + emit(file) + return nil +} + +func isDirectory(path string) bool { + if strings.HasSuffix(path, "/") || strings.HasSuffix(path, "\\") { + return true + } + return false +} diff --git a/sdks/go/pkg/beam/io/fileio/read_test.go b/sdks/go/pkg/beam/io/fileio/read_test.go new file mode 100644 index 000000000000..ebbef03dc94d --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/read_test.go @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" +) + +func TestReadMatches(t *testing.T) { + tests := []struct { + name string + opts []ReadOptionFn + input []any + want []any + wantErr bool + }{ + { + name: "Read matches", + input: []any{ + filesystem.FileMetadata{ + Path: "file1.txt", + Size: 5, + }, + filesystem.FileMetadata{ + Path: "file2.txt", + Size: 0, + }, + }, + want: []any{ + ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: "file1.txt", + Size: 5, + }, + Compression: filesystem.CompressionAuto, + }, + ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: "file2.txt", + Size: 0, + }, + Compression: filesystem.CompressionAuto, + }, + }, + }, + { + name: "Read matches with specified compression", + opts: []ReadOptionFn{ + WithReadCompression(filesystem.CompressionGzip), + }, + input: []any{ + filesystem.FileMetadata{ + Path: "file1.txt.gz", + Size: 5, + }, + filesystem.FileMetadata{ + Path: "file2.txt.gz", + Size: 0, + }, + }, + want: []any{ + ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: "file1.txt.gz", + Size: 5, + }, + Compression: filesystem.CompressionGzip, + }, + ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: "file2.txt.gz", + Size: 0, + }, + Compression: filesystem.CompressionGzip, + }, + }, + }, + { + name: "Read matches and skip directories", + input: []any{ + filesystem.FileMetadata{ + Path: "dir/", + Size: 0, + }, + filesystem.FileMetadata{ + Path: "file1.txt", + Size: 5, + }, + }, + want: []any{ + ReadableFile{ + Metadata: filesystem.FileMetadata{ + Path: "file1.txt", + Size: 5, + }, + Compression: filesystem.CompressionAuto, + }, + }, + }, + { + name: "Error - directories disallowed", + opts: []ReadOptionFn{ + WithDirectoryTreatment(DirectoryTreatmentDisallow), + }, + input: []any{ + filesystem.FileMetadata{ + Path: "dir/", + Size: 0, + }, + filesystem.FileMetadata{ + Path: "file1.txt", + Size: 5, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Run(tt.name, func(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + + col := beam.Create(s, tt.input...) + got := ReadMatches(s, col, tt.opts...) + + passert.Equals(s, got, tt.want...) + if err := ptest.Run(p); (err != nil) != tt.wantErr { + t.Errorf("ReadMatches() error = %v, wantErr %v", err, tt.wantErr) + } + }) + }) + } +} + +func Test_isDirectory(t *testing.T) { + tests := []struct { + name string + path string + want bool + }{ + { + name: "Path to directory with forward slash directory separator", + path: "path/to/", + want: true, + }, + { + name: "Path to directory with backslash directory separator", + path: "path\\to\\", + want: true, + }, + { + name: "Path to file", + path: "path/to/file", + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isDirectory(tt.path); got != tt.want { + t.Errorf("isDirectory() = %v, want %v", got, tt.want) + } + }) + } +} From 40b5154c9ea088cb60f4e30d97d61b2c79ab3d8f Mon Sep 17 00:00:00 2001 From: Johanna Ojeling Date: Sun, 12 Mar 2023 10:12:50 +0100 Subject: [PATCH 4/9] Add example doc for transforms --- sdks/go/pkg/beam/io/fileio/example_test.go | 74 ++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 sdks/go/pkg/beam/io/fileio/example_test.go diff --git a/sdks/go/pkg/beam/io/fileio/example_test.go b/sdks/go/pkg/beam/io/fileio/example_test.go new file mode 100644 index 000000000000..ed27546f9dd5 --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/example_test.go @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio_test + +import ( + "context" + "log" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" +) + +func ExampleMatchFiles() { + beam.Init() + p, s := beam.NewPipelineWithRoot() + + matches := fileio.MatchFiles(s, "gs://path/to/*.gz") + debug.Print(s, matches) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} + +func ExampleMatchAll() { + beam.Init() + p, s := beam.NewPipelineWithRoot() + + globs := beam.Create(s, "gs://path/to/sub1/*.gz", "gs://path/to/sub2/*.gz") + matches := fileio.MatchAll(s, globs) + debug.Print(s, matches) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} + +func ExampleReadMatches() { + beam.Init() + p, s := beam.NewPipelineWithRoot() + + pairFn := func(ctx context.Context, file fileio.ReadableFile, emit func(string, string)) error { + contents, err := file.ReadString(ctx) + if err != nil { + return err + } + emit(file.Metadata.Path, contents) + return nil + } + + matches := fileio.MatchFiles(s, "gs://path/to/*.gz") + files := fileio.ReadMatches(s, matches) + pairs := beam.ParDo(s, pairFn, files) + debug.Print(s, pairs) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} From 76aba9c4cd2a1b9a21e5bc6f37e8d5cd6fe1d67f Mon Sep 17 00:00:00 2001 From: Johanna Ojeling Date: Fri, 24 Mar 2023 17:18:56 +0100 Subject: [PATCH 5/9] Move FileMetadata and Compression to fileio --- sdks/go/pkg/beam/io/fileio/file.go | 52 ++++++++++++++++-------- sdks/go/pkg/beam/io/fileio/file_test.go | 42 +++++++++---------- sdks/go/pkg/beam/io/fileio/match.go | 18 ++++---- sdks/go/pkg/beam/io/fileio/match_test.go | 15 ++++--- sdks/go/pkg/beam/io/fileio/read.go | 24 +++++------ sdks/go/pkg/beam/io/fileio/read_test.go | 39 +++++++++--------- sdks/go/pkg/beam/io/filesystem/file.go | 44 -------------------- 7 files changed, 101 insertions(+), 133 deletions(-) delete mode 100644 sdks/go/pkg/beam/io/filesystem/file.go diff --git a/sdks/go/pkg/beam/io/fileio/file.go b/sdks/go/pkg/beam/io/fileio/file.go index fcc1eaf10845..e28c2e72fcde 100644 --- a/sdks/go/pkg/beam/io/fileio/file.go +++ b/sdks/go/pkg/beam/io/fileio/file.go @@ -28,20 +28,38 @@ import ( ) func init() { + beam.RegisterType(reflect.TypeOf((*FileMetadata)(nil)).Elem()) beam.RegisterType(reflect.TypeOf((*ReadableFile)(nil)).Elem()) } -// ReadableFile is a wrapper around a filesystem.FileMetadata and filesystem.Compression that can be -// used to obtain a file descriptor or read the file's contents. +// FileMetadata contains metadata about a file, namely its path and size in bytes. +type FileMetadata struct { + Path string + Size int64 +} + +// Compression is the type of compression used to compress a file. +type Compression int + +const ( + // CompressionAuto indicates that the compression type should be auto-detected. + CompressionAuto Compression = iota + // CompressionGzip indicates that the file is compressed using gzip. + CompressionGzip + // CompressionUncompressed indicates that the file is not compressed. + CompressionUncompressed +) + +// ReadableFile is a wrapper around a FileMetadata and Compression that can be used to obtain a file +// descriptor or read the file's contents. type ReadableFile struct { - Metadata filesystem.FileMetadata - Compression filesystem.Compression + Metadata FileMetadata + Compression Compression } // Open opens the file for reading. The compression type is determined by the Compression field of -// the ReadableFile. If Compression is filesystem.CompressionAuto, the compression type is -// auto-detected from the file extension. It is the caller's responsibility to close the returned -// reader. +// the ReadableFile. If Compression is CompressionAuto, the compression type is auto-detected from +// the file extension. It is the caller's responsibility to close the returned reader. func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) { fs, err := filesystem.New(ctx, f.Metadata.Path) if err != nil { @@ -55,7 +73,7 @@ func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) { } comp := f.Compression - if comp == filesystem.CompressionAuto { + if comp == CompressionAuto { comp = compressionFromExt(f.Metadata.Path) } @@ -63,29 +81,29 @@ func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) { } // compressionFromExt detects the compression of a file based on its extension. If the extension is -// not recognized, filesystem.CompressionUncompressed is returned. -func compressionFromExt(path string) filesystem.Compression { +// not recognized, CompressionUncompressed is returned. +func compressionFromExt(path string) Compression { switch filepath.Ext(path) { case ".gz": - return filesystem.CompressionGzip + return CompressionGzip default: - return filesystem.CompressionUncompressed + return CompressionUncompressed } } // newDecompressionReader returns an io.ReadCloser that can be used to read uncompressed data from -// reader, based on the specified compression. If the compression is filesystem.CompressionAuto, -// a non-nil error is returned. It is the caller's responsibility to close the returned reader. +// reader, based on the specified compression. If the compression is CompressionAuto, a non-nil +// error is returned. It is the caller's responsibility to close the returned reader. func newDecompressionReader( reader io.ReadCloser, - compression filesystem.Compression, + compression Compression, ) (io.ReadCloser, error) { switch compression { - case filesystem.CompressionAuto: + case CompressionAuto: return nil, errors.New( "compression must be resolved into a concrete type before obtaining a reader", ) - case filesystem.CompressionGzip: + case CompressionGzip: return newGzipReader(reader) default: return reader, nil diff --git a/sdks/go/pkg/beam/io/fileio/file_test.go b/sdks/go/pkg/beam/io/fileio/file_test.go index bc20dad7873e..2776722a0c19 100644 --- a/sdks/go/pkg/beam/io/fileio/file_test.go +++ b/sdks/go/pkg/beam/io/fileio/file_test.go @@ -21,8 +21,6 @@ import ( "path/filepath" "testing" "testing/iotest" - - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" ) func TestReadableFile_Open(t *testing.T) { @@ -38,20 +36,20 @@ func TestReadableFile_Open(t *testing.T) { { name: "Open uncompressed file", file: ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: filepath.Join(dir, "file1.txt"), }, - Compression: filesystem.CompressionUncompressed, + Compression: CompressionUncompressed, }, want: []byte("test1"), }, { name: "Open file with auto-detection of compression", file: ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: filepath.Join(dir, "file2.gz"), }, - Compression: filesystem.CompressionAuto, + Compression: CompressionAuto, }, want: []byte("test2"), }, @@ -80,22 +78,22 @@ func Test_compressionFromExt(t *testing.T) { tests := []struct { name string path string - want filesystem.Compression + want Compression }{ { name: "CompressionGzip for gz extension", path: "file.gz", - want: filesystem.CompressionGzip, + want: CompressionGzip, }, { name: "CompressionUncompressed for no extension", path: "file", - want: filesystem.CompressionUncompressed, + want: CompressionUncompressed, }, { name: "CompressionUncompressed for unrecognized extension", path: "file.unknown", - want: filesystem.CompressionUncompressed, + want: CompressionUncompressed, }, } for _, tt := range tests { @@ -115,26 +113,26 @@ func Test_newDecompressionReader(t *testing.T) { tests := []struct { name string path string - comp filesystem.Compression + comp Compression want []byte wantErr bool }{ { name: "Reader for uncompressed file", path: filepath.Join(dir, "file1.txt"), - comp: filesystem.CompressionUncompressed, + comp: CompressionUncompressed, want: []byte("test1"), }, { name: "Reader for gzip compressed file", path: filepath.Join(dir, "file2.gz"), - comp: filesystem.CompressionGzip, + comp: CompressionGzip, want: []byte("test2"), }, { name: "Error - reader for auto compression not supported", path: filepath.Join(dir, "file2.gz"), - comp: filesystem.CompressionAuto, + comp: CompressionAuto, wantErr: true, }, } @@ -174,20 +172,20 @@ func TestReadableFile_Read(t *testing.T) { { name: "Read contents from uncompressed file", file: ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: filepath.Join(dir, "file1.txt"), }, - Compression: filesystem.CompressionUncompressed, + Compression: CompressionUncompressed, }, want: []byte("test1"), }, { name: "Read contents from gzip compressed file", file: ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: filepath.Join(dir, "file2.gz"), }, - Compression: filesystem.CompressionGzip, + Compression: CompressionGzip, }, want: []byte("test2"), }, @@ -221,20 +219,20 @@ func TestReadableFile_ReadString(t *testing.T) { { name: "Read contents from uncompressed file as string", file: ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: filepath.Join(dir, "file1.txt"), }, - Compression: filesystem.CompressionUncompressed, + Compression: CompressionUncompressed, }, want: "test1", }, { name: "Read contents from gzip compressed file as string", file: ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: filepath.Join(dir, "file2.gz"), }, - Compression: filesystem.CompressionGzip, + Compression: CompressionGzip, }, want: "test2", }, diff --git a/sdks/go/pkg/beam/io/fileio/match.go b/sdks/go/pkg/beam/io/fileio/match.go index 333b5b234d41..370e67c371ba 100644 --- a/sdks/go/pkg/beam/io/fileio/match.go +++ b/sdks/go/pkg/beam/io/fileio/match.go @@ -27,8 +27,8 @@ import ( ) func init() { - register.DoFn3x1[context.Context, string, func(filesystem.FileMetadata), error](&matchFn{}) - register.Emitter1[filesystem.FileMetadata]() + register.DoFn3x1[context.Context, string, func(FileMetadata), error](&matchFn{}) + register.Emitter1[FileMetadata]() } // EmptyMatchTreatment controls how empty matches of a pattern are treated. @@ -60,8 +60,8 @@ func WithEmptyMatchTreatment(treatment EmptyMatchTreatment) MatchOptionFn { } } -// MatchFiles finds all files matching the glob pattern and returns a -// PCollection of the matching files. +// MatchFiles finds all files matching the glob pattern and returns a PCollection of +// the matching files. func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollection { s = s.Scope("fileio.MatchFiles") @@ -70,7 +70,7 @@ func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollecti } // MatchAll finds all files matching the glob patterns given by the incoming PCollection and -// returns a PCollection of the matching files. +// returns a PCollection of the matching files. func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) beam.PCollection { s = s.Scope("fileio.MatchAll") @@ -100,7 +100,7 @@ func newMatchFn(option *matchOption) *matchFn { func (fn *matchFn) ProcessElement( ctx context.Context, glob string, - emit func(filesystem.FileMetadata), + emit func(FileMetadata), ) error { if strings.TrimSpace(glob) == "" { return nil @@ -151,12 +151,12 @@ func metadataFromFiles( ctx context.Context, fs filesystem.Interface, files []string, -) ([]filesystem.FileMetadata, error) { +) ([]FileMetadata, error) { if len(files) == 0 { return nil, nil } - metadata := make([]filesystem.FileMetadata, len(files)) + metadata := make([]FileMetadata, len(files)) for i, path := range files { size, err := fs.Size(ctx, path) @@ -164,7 +164,7 @@ func metadataFromFiles( return nil, err } - metadata[i] = filesystem.FileMetadata{ + metadata[i] = FileMetadata{ Path: path, Size: size, } diff --git a/sdks/go/pkg/beam/io/fileio/match_test.go b/sdks/go/pkg/beam/io/fileio/match_test.go index 0385cd6b2a74..df01869cfe25 100644 --- a/sdks/go/pkg/beam/io/fileio/match_test.go +++ b/sdks/go/pkg/beam/io/fileio/match_test.go @@ -21,7 +21,6 @@ import ( "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" @@ -66,11 +65,11 @@ func TestMatchFiles(t *testing.T) { name: "Match files", glob: filepath.Join(dir, "*", "file*.txt"), want: []any{ - filesystem.FileMetadata{ + FileMetadata{ Path: filepath.Join(testDir, "file1.txt"), Size: 5, }, - filesystem.FileMetadata{ + FileMetadata{ Path: filepath.Join(testDir, "file2.txt"), Size: 0, }, @@ -119,15 +118,15 @@ func TestMatchAll(t *testing.T) { filepath.Join(dir, "*", "file*.csv"), }, want: []any{ - filesystem.FileMetadata{ + FileMetadata{ Path: filepath.Join(testDir, "file1.txt"), Size: 5, }, - filesystem.FileMetadata{ + FileMetadata{ Path: filepath.Join(testDir, "file2.txt"), Size: 0, }, - filesystem.FileMetadata{ + FileMetadata{ Path: filepath.Join(testDir, "file3.csv"), Size: 5, }, @@ -242,12 +241,12 @@ func Test_metadataFromFiles(t *testing.T) { tests := []struct { name string files []string - want []filesystem.FileMetadata + want []FileMetadata }{ { name: "Slice of FileMetadata from file paths", files: files, - want: []filesystem.FileMetadata{ + want: []FileMetadata{ { Path: filepath.Join(dir, "file1.txt"), Size: 5, diff --git a/sdks/go/pkg/beam/io/fileio/read.go b/sdks/go/pkg/beam/io/fileio/read.go index 4e01cf2928cd..1d1b60359df0 100644 --- a/sdks/go/pkg/beam/io/fileio/read.go +++ b/sdks/go/pkg/beam/io/fileio/read.go @@ -20,12 +20,11 @@ import ( "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" ) func init() { - register.DoFn2x1[filesystem.FileMetadata, func(ReadableFile), error](&readFn{}) + register.DoFn2x1[FileMetadata, func(ReadableFile), error](&readFn{}) register.Emitter1[ReadableFile]() } @@ -40,7 +39,7 @@ const ( ) type readOption struct { - Compression filesystem.Compression + Compression Compression DirectoryTreatment DirectoryTreatment } @@ -50,7 +49,7 @@ type ReadOptionFn func(*readOption) error // WithReadCompression specifies the compression type of the files that are read. By default, // the compression type is determined by the file extension. -func WithReadCompression(compression filesystem.Compression) ReadOptionFn { +func WithReadCompression(compression Compression) ReadOptionFn { return func(o *readOption) error { o.Compression = compression return nil @@ -66,17 +65,16 @@ func WithDirectoryTreatment(treatment DirectoryTreatment) ReadOptionFn { } } -// ReadMatches accepts the result of MatchFiles or MatchAll as a -// PCollection and converts it to a PCollection. The -// ReadableFile can be used to retrieve file metadata, open the file for reading or read the entire -// file into memory. The compression type of the readable files can be specified by passing the -// WithReadCompression option. If no compression type is provided, it will be determined by the file -// extension. +// ReadMatches accepts the result of MatchFiles or MatchAll as a PCollection and +// converts it to a PCollection. The ReadableFile can be used to retrieve file +// metadata, open the file for reading or read the entire file into memory. The compression type of +// the readable files can be specified by passing the WithReadCompression option. If no compression +// type is provided, it will be determined by the file extension. func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection { s = s.Scope("fileio.ReadMatches") option := &readOption{ - Compression: filesystem.CompressionAuto, + Compression: CompressionAuto, DirectoryTreatment: DirectoryTreatmentSkip, } @@ -90,7 +88,7 @@ func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam. } type readFn struct { - Compression filesystem.Compression + Compression Compression DirectoryTreatment DirectoryTreatment } @@ -101,7 +99,7 @@ func newReadFn(option *readOption) *readFn { } } -func (fn *readFn) ProcessElement(metadata filesystem.FileMetadata, emit func(ReadableFile)) error { +func (fn *readFn) ProcessElement(metadata FileMetadata, emit func(ReadableFile)) error { if isDirectory(metadata.Path) { if fn.DirectoryTreatment == DirectoryTreatmentDisallow { return fmt.Errorf("path to directory not allowed: %q", metadata.Path) diff --git a/sdks/go/pkg/beam/io/fileio/read_test.go b/sdks/go/pkg/beam/io/fileio/read_test.go index ebbef03dc94d..ba0b79c9fb48 100644 --- a/sdks/go/pkg/beam/io/fileio/read_test.go +++ b/sdks/go/pkg/beam/io/fileio/read_test.go @@ -19,7 +19,6 @@ import ( "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) @@ -35,83 +34,83 @@ func TestReadMatches(t *testing.T) { { name: "Read matches", input: []any{ - filesystem.FileMetadata{ + FileMetadata{ Path: "file1.txt", Size: 5, }, - filesystem.FileMetadata{ + FileMetadata{ Path: "file2.txt", Size: 0, }, }, want: []any{ ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: "file1.txt", Size: 5, }, - Compression: filesystem.CompressionAuto, + Compression: CompressionAuto, }, ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: "file2.txt", Size: 0, }, - Compression: filesystem.CompressionAuto, + Compression: CompressionAuto, }, }, }, { name: "Read matches with specified compression", opts: []ReadOptionFn{ - WithReadCompression(filesystem.CompressionGzip), + WithReadCompression(CompressionGzip), }, input: []any{ - filesystem.FileMetadata{ + FileMetadata{ Path: "file1.txt.gz", Size: 5, }, - filesystem.FileMetadata{ + FileMetadata{ Path: "file2.txt.gz", Size: 0, }, }, want: []any{ ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: "file1.txt.gz", Size: 5, }, - Compression: filesystem.CompressionGzip, + Compression: CompressionGzip, }, ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: "file2.txt.gz", Size: 0, }, - Compression: filesystem.CompressionGzip, + Compression: CompressionGzip, }, }, }, { name: "Read matches and skip directories", input: []any{ - filesystem.FileMetadata{ + FileMetadata{ Path: "dir/", Size: 0, }, - filesystem.FileMetadata{ + FileMetadata{ Path: "file1.txt", Size: 5, }, }, want: []any{ ReadableFile{ - Metadata: filesystem.FileMetadata{ + Metadata: FileMetadata{ Path: "file1.txt", Size: 5, }, - Compression: filesystem.CompressionAuto, + Compression: CompressionAuto, }, }, }, @@ -121,11 +120,11 @@ func TestReadMatches(t *testing.T) { WithDirectoryTreatment(DirectoryTreatmentDisallow), }, input: []any{ - filesystem.FileMetadata{ + FileMetadata{ Path: "dir/", Size: 0, }, - filesystem.FileMetadata{ + FileMetadata{ Path: "file1.txt", Size: 5, }, diff --git a/sdks/go/pkg/beam/io/filesystem/file.go b/sdks/go/pkg/beam/io/filesystem/file.go deleted file mode 100644 index 58f0e7ea458a..000000000000 --- a/sdks/go/pkg/beam/io/filesystem/file.go +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package filesystem - -import ( - "reflect" - - "github.com/apache/beam/sdks/v2/go/pkg/beam" -) - -func init() { - beam.RegisterType(reflect.TypeOf((*FileMetadata)(nil)).Elem()) -} - -// FileMetadata contains metadata about a file, namely its path and size in bytes. -type FileMetadata struct { - Path string - Size int64 -} - -// Compression is the type of compression used to compress a file. -type Compression int - -const ( - // CompressionAuto indicates that the compression type should be auto-detected. - CompressionAuto Compression = iota - // CompressionGzip indicates that the file is compressed using gzip. - CompressionGzip - // CompressionUncompressed indicates that the file is not compressed. - CompressionUncompressed -) From 2f3495a0d2002881656e45c186a866adb2ccd1de Mon Sep 17 00:00:00 2001 From: Johanna Ojeling Date: Fri, 24 Mar 2023 17:41:13 +0100 Subject: [PATCH 6/9] Provide functional options without exporting enums --- sdks/go/pkg/beam/io/fileio/file.go | 42 +++++++-------- sdks/go/pkg/beam/io/fileio/file_test.go | 34 ++++++------ sdks/go/pkg/beam/io/fileio/match.go | 63 ++++++++++++---------- sdks/go/pkg/beam/io/fileio/match_test.go | 24 ++++----- sdks/go/pkg/beam/io/fileio/read.go | 66 ++++++++++++------------ sdks/go/pkg/beam/io/fileio/read_test.go | 22 ++++---- 6 files changed, 130 insertions(+), 121 deletions(-) diff --git a/sdks/go/pkg/beam/io/fileio/file.go b/sdks/go/pkg/beam/io/fileio/file.go index e28c2e72fcde..4ae7b3d3d074 100644 --- a/sdks/go/pkg/beam/io/fileio/file.go +++ b/sdks/go/pkg/beam/io/fileio/file.go @@ -38,27 +38,27 @@ type FileMetadata struct { Size int64 } -// Compression is the type of compression used to compress a file. -type Compression int +// compressionType is the type of compression used to compress a file. +type compressionType int const ( - // CompressionAuto indicates that the compression type should be auto-detected. - CompressionAuto Compression = iota - // CompressionGzip indicates that the file is compressed using gzip. - CompressionGzip - // CompressionUncompressed indicates that the file is not compressed. - CompressionUncompressed + // compressionAuto indicates that the compression type should be auto-detected. + compressionAuto compressionType = iota + // compressionGzip indicates that the file is compressed using gzip. + compressionGzip + // compressionUncompressed indicates that the file is not compressed. + compressionUncompressed ) -// ReadableFile is a wrapper around a FileMetadata and Compression that can be used to obtain a file -// descriptor or read the file's contents. +// ReadableFile is a wrapper around a FileMetadata and compressionType that can be used to obtain a +// file descriptor or read the file's contents. type ReadableFile struct { Metadata FileMetadata - Compression Compression + Compression compressionType } // Open opens the file for reading. The compression type is determined by the Compression field of -// the ReadableFile. If Compression is CompressionAuto, the compression type is auto-detected from +// the ReadableFile. If Compression is compressionAuto, the compression type is auto-detected from // the file extension. It is the caller's responsibility to close the returned reader. func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) { fs, err := filesystem.New(ctx, f.Metadata.Path) @@ -73,7 +73,7 @@ func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) { } comp := f.Compression - if comp == CompressionAuto { + if comp == compressionAuto { comp = compressionFromExt(f.Metadata.Path) } @@ -81,29 +81,29 @@ func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) { } // compressionFromExt detects the compression of a file based on its extension. If the extension is -// not recognized, CompressionUncompressed is returned. -func compressionFromExt(path string) Compression { +// not recognized, compressionUncompressed is returned. +func compressionFromExt(path string) compressionType { switch filepath.Ext(path) { case ".gz": - return CompressionGzip + return compressionGzip default: - return CompressionUncompressed + return compressionUncompressed } } // newDecompressionReader returns an io.ReadCloser that can be used to read uncompressed data from -// reader, based on the specified compression. If the compression is CompressionAuto, a non-nil +// reader, based on the specified compression. If the compression is compressionAuto, a non-nil // error is returned. It is the caller's responsibility to close the returned reader. func newDecompressionReader( reader io.ReadCloser, - compression Compression, + compression compressionType, ) (io.ReadCloser, error) { switch compression { - case CompressionAuto: + case compressionAuto: return nil, errors.New( "compression must be resolved into a concrete type before obtaining a reader", ) - case CompressionGzip: + case compressionGzip: return newGzipReader(reader) default: return reader, nil diff --git a/sdks/go/pkg/beam/io/fileio/file_test.go b/sdks/go/pkg/beam/io/fileio/file_test.go index 2776722a0c19..5f7d292387c3 100644 --- a/sdks/go/pkg/beam/io/fileio/file_test.go +++ b/sdks/go/pkg/beam/io/fileio/file_test.go @@ -39,7 +39,7 @@ func TestReadableFile_Open(t *testing.T) { Metadata: FileMetadata{ Path: filepath.Join(dir, "file1.txt"), }, - Compression: CompressionUncompressed, + Compression: compressionUncompressed, }, want: []byte("test1"), }, @@ -49,7 +49,7 @@ func TestReadableFile_Open(t *testing.T) { Metadata: FileMetadata{ Path: filepath.Join(dir, "file2.gz"), }, - Compression: CompressionAuto, + Compression: compressionAuto, }, want: []byte("test2"), }, @@ -78,22 +78,22 @@ func Test_compressionFromExt(t *testing.T) { tests := []struct { name string path string - want Compression + want compressionType }{ { - name: "CompressionGzip for gz extension", + name: "compressionGzip for gz extension", path: "file.gz", - want: CompressionGzip, + want: compressionGzip, }, { - name: "CompressionUncompressed for no extension", + name: "compressionUncompressed for no extension", path: "file", - want: CompressionUncompressed, + want: compressionUncompressed, }, { - name: "CompressionUncompressed for unrecognized extension", + name: "compressionUncompressed for unrecognized extension", path: "file.unknown", - want: CompressionUncompressed, + want: compressionUncompressed, }, } for _, tt := range tests { @@ -113,26 +113,26 @@ func Test_newDecompressionReader(t *testing.T) { tests := []struct { name string path string - comp Compression + comp compressionType want []byte wantErr bool }{ { name: "Reader for uncompressed file", path: filepath.Join(dir, "file1.txt"), - comp: CompressionUncompressed, + comp: compressionUncompressed, want: []byte("test1"), }, { name: "Reader for gzip compressed file", path: filepath.Join(dir, "file2.gz"), - comp: CompressionGzip, + comp: compressionGzip, want: []byte("test2"), }, { name: "Error - reader for auto compression not supported", path: filepath.Join(dir, "file2.gz"), - comp: CompressionAuto, + comp: compressionAuto, wantErr: true, }, } @@ -175,7 +175,7 @@ func TestReadableFile_Read(t *testing.T) { Metadata: FileMetadata{ Path: filepath.Join(dir, "file1.txt"), }, - Compression: CompressionUncompressed, + Compression: compressionUncompressed, }, want: []byte("test1"), }, @@ -185,7 +185,7 @@ func TestReadableFile_Read(t *testing.T) { Metadata: FileMetadata{ Path: filepath.Join(dir, "file2.gz"), }, - Compression: CompressionGzip, + Compression: compressionGzip, }, want: []byte("test2"), }, @@ -222,7 +222,7 @@ func TestReadableFile_ReadString(t *testing.T) { Metadata: FileMetadata{ Path: filepath.Join(dir, "file1.txt"), }, - Compression: CompressionUncompressed, + Compression: compressionUncompressed, }, want: "test1", }, @@ -232,7 +232,7 @@ func TestReadableFile_ReadString(t *testing.T) { Metadata: FileMetadata{ Path: filepath.Join(dir, "file2.gz"), }, - Compression: CompressionGzip, + Compression: compressionGzip, }, want: "test2", }, diff --git a/sdks/go/pkg/beam/io/fileio/match.go b/sdks/go/pkg/beam/io/fileio/match.go index 370e67c371ba..402312cdc640 100644 --- a/sdks/go/pkg/beam/io/fileio/match.go +++ b/sdks/go/pkg/beam/io/fileio/match.go @@ -31,37 +31,44 @@ func init() { register.Emitter1[FileMetadata]() } -// EmptyMatchTreatment controls how empty matches of a pattern are treated. -type EmptyMatchTreatment int +// emptyTreatment controls how empty matches of a pattern are treated. +type emptyTreatment int const ( - // EmptyMatchTreatmentAllow allows empty matches. - EmptyMatchTreatmentAllow EmptyMatchTreatment = iota - // EmptyMatchTreatmentDisallow disallows empty matches. - EmptyMatchTreatmentDisallow - // EmptyMatchTreatmentAllowIfWildcard allows empty matches if the pattern contains a wildcard. - EmptyMatchTreatmentAllowIfWildcard + // emptyAllow allows empty matches. + emptyAllow emptyTreatment = iota + // emptyDisallow disallows empty matches. + emptyDisallow + // emptyAllowIfWildcard allows empty matches if the pattern contains a wildcard. + emptyAllowIfWildcard ) type matchOption struct { - EmptyMatchTreatment EmptyMatchTreatment + EmptyTreatment emptyTreatment } // MatchOptionFn is a function that can be passed to MatchFiles or MatchAll to configure options for // matching files. -type MatchOptionFn func(*matchOption) error +type MatchOptionFn func(*matchOption) -// WithEmptyMatchTreatment specifies how empty matches of a pattern should be treated. By default, -// empty matches are allowed if the pattern contains a wildcard. -func WithEmptyMatchTreatment(treatment EmptyMatchTreatment) MatchOptionFn { - return func(o *matchOption) error { - o.EmptyMatchTreatment = treatment - return nil +// MatchEmptyAllow specifies that empty matches are allowed. +func MatchEmptyAllow() MatchOptionFn { + return func(o *matchOption) { + o.EmptyTreatment = emptyAllow + } +} + +// MatchEmptyDisallow specifies that empty matches are not allowed. +func MatchEmptyDisallow() MatchOptionFn { + return func(o *matchOption) { + o.EmptyTreatment = emptyDisallow } } // MatchFiles finds all files matching the glob pattern and returns a PCollection of -// the matching files. +// the matching files. MatchFiles accepts a variadic number of MatchOptionFn that can be used to +// configure the treatment of empty matches. By default, empty matches are allowed if the pattern +// contains a wildcard. func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollection { s = s.Scope("fileio.MatchFiles") @@ -70,30 +77,30 @@ func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollecti } // MatchAll finds all files matching the glob patterns given by the incoming PCollection and -// returns a PCollection of the matching files. +// returns a PCollection of the matching files. MatchAll accepts a variadic number of +// MatchOptionFn that can be used to configure the treatment of empty matches. By default, empty +// matches are allowed if the pattern contains a wildcard. func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) beam.PCollection { s = s.Scope("fileio.MatchAll") option := &matchOption{ - EmptyMatchTreatment: EmptyMatchTreatmentAllowIfWildcard, + EmptyTreatment: emptyAllowIfWildcard, } for _, opt := range opts { - if err := opt(option); err != nil { - panic(fmt.Sprintf("fileio.MatchAll: invalid option: %v", err)) - } + opt(option) } return beam.ParDo(s, newMatchFn(option), col) } type matchFn struct { - EmptyMatchTreatment EmptyMatchTreatment + EmptyTreatment emptyTreatment } func newMatchFn(option *matchOption) *matchFn { return &matchFn{ - EmptyMatchTreatment: option.EmptyMatchTreatment, + EmptyTreatment: option.EmptyTreatment, } } @@ -118,7 +125,7 @@ func (fn *matchFn) ProcessElement( } if len(files) == 0 { - if !allowEmptyMatch(glob, fn.EmptyMatchTreatment) { + if !allowEmptyMatch(glob, fn.EmptyTreatment) { return fmt.Errorf("no files matching pattern %q", glob) } return nil @@ -136,11 +143,11 @@ func (fn *matchFn) ProcessElement( return nil } -func allowEmptyMatch(glob string, treatment EmptyMatchTreatment) bool { +func allowEmptyMatch(glob string, treatment emptyTreatment) bool { switch treatment { - case EmptyMatchTreatmentDisallow: + case emptyDisallow: return false - case EmptyMatchTreatmentAllowIfWildcard: + case emptyAllowIfWildcard: return strings.Contains(glob, "*") default: return true diff --git a/sdks/go/pkg/beam/io/fileio/match_test.go b/sdks/go/pkg/beam/io/fileio/match_test.go index df01869cfe25..57b2d8cfe1c5 100644 --- a/sdks/go/pkg/beam/io/fileio/match_test.go +++ b/sdks/go/pkg/beam/io/fileio/match_test.go @@ -78,7 +78,7 @@ func TestMatchFiles(t *testing.T) { { name: "Read matches with specified empty match treatment", opts: []MatchOptionFn{ - WithEmptyMatchTreatment(EmptyMatchTreatmentAllow), + MatchEmptyAllow(), }, glob: filepath.Join(dir, "non-existent.txt"), want: nil, @@ -147,7 +147,7 @@ func TestMatchAll(t *testing.T) { { name: "No matches for glob without wildcard and empty matches allowed", opts: []MatchOptionFn{ - WithEmptyMatchTreatment(EmptyMatchTreatmentAllow), + MatchEmptyAllow(), }, input: []any{ filepath.Join(dir, "non-existent.txt"), @@ -164,7 +164,7 @@ func TestMatchAll(t *testing.T) { { name: "Error - no matches and empty matches disallowed", opts: []MatchOptionFn{ - WithEmptyMatchTreatment(EmptyMatchTreatmentDisallow), + MatchEmptyDisallow(), }, input: []any{ filepath.Join(dir, "*", "non-existent.txt"), @@ -191,31 +191,31 @@ func Test_allowEmptyMatch(t *testing.T) { tests := []struct { name string glob string - treatment EmptyMatchTreatment + treatment emptyTreatment want bool }{ { - name: "Allow for EmptyMatchTreatmentAllow", + name: "Allow for emptyAllow", glob: "path/to/file.txt", - treatment: EmptyMatchTreatmentAllow, + treatment: emptyAllow, want: true, }, { - name: "Disallow for EmptyMatchTreatmentDisallow", + name: "Disallow for emptyDisallow", glob: "path/to/file.txt", - treatment: EmptyMatchTreatmentDisallow, + treatment: emptyDisallow, want: false, }, { - name: "Allow for glob with wildcard and EmptyMatchTreatmentAllowIfWildcard", + name: "Allow for glob with wildcard and emptyAllowIfWildcard", glob: "path/to/*.txt", - treatment: EmptyMatchTreatmentAllowIfWildcard, + treatment: emptyAllowIfWildcard, want: true, }, { - name: "Disallow for glob without wildcard and EmptyMatchTreatmentAllowIfWildcard", + name: "Disallow for glob without wildcard and emptyAllowIfWildcard", glob: "path/to/file.txt", - treatment: EmptyMatchTreatmentAllowIfWildcard, + treatment: emptyAllowIfWildcard, want: false, }, } diff --git a/sdks/go/pkg/beam/io/fileio/read.go b/sdks/go/pkg/beam/io/fileio/read.go index 1d1b60359df0..0a57409360a9 100644 --- a/sdks/go/pkg/beam/io/fileio/read.go +++ b/sdks/go/pkg/beam/io/fileio/read.go @@ -28,68 +28,70 @@ func init() { register.Emitter1[ReadableFile]() } -// DirectoryTreatment controls how directories are treated when reading matches. -type DirectoryTreatment int +// directoryTreatment controls how paths to directories are treated when reading matches. +type directoryTreatment int const ( - // DirectoryTreatmentSkip skips directories. - DirectoryTreatmentSkip DirectoryTreatment = iota - // DirectoryTreatmentDisallow disallows directories. - DirectoryTreatmentDisallow + // directorySkip skips directories. + directorySkip directoryTreatment = iota + // directoryDisallow disallows directories. + directoryDisallow ) type readOption struct { - Compression Compression - DirectoryTreatment DirectoryTreatment + Compression compressionType + DirectoryTreatment directoryTreatment } // ReadOptionFn is a function that can be passed to ReadMatches to configure options for // reading files. -type ReadOptionFn func(*readOption) error +type ReadOptionFn func(*readOption) -// WithReadCompression specifies the compression type of the files that are read. By default, -// the compression type is determined by the file extension. -func WithReadCompression(compression Compression) ReadOptionFn { - return func(o *readOption) error { - o.Compression = compression - return nil +// ReadCompressionGzip specifies that files have been compressed using gzip. +func ReadCompressionGzip() ReadOptionFn { + return func(o *readOption) { + o.Compression = compressionGzip } } -// WithDirectoryTreatment specifies how directories should be treated when reading files. By -// default, directories are skipped. -func WithDirectoryTreatment(treatment DirectoryTreatment) ReadOptionFn { - return func(o *readOption) error { - o.DirectoryTreatment = treatment - return nil +// ReadCompressionUncompressed specifies that files have not been compressed. +func ReadCompressionUncompressed() ReadOptionFn { + return func(o *readOption) { + o.Compression = compressionUncompressed + } +} + +// ReadDirectoryDisallow specifies that directories are not allowed. +func ReadDirectoryDisallow() ReadOptionFn { + return func(o *readOption) { + o.DirectoryTreatment = directoryDisallow } } // ReadMatches accepts the result of MatchFiles or MatchAll as a PCollection and // converts it to a PCollection. The ReadableFile can be used to retrieve file -// metadata, open the file for reading or read the entire file into memory. The compression type of -// the readable files can be specified by passing the WithReadCompression option. If no compression -// type is provided, it will be determined by the file extension. +// metadata, open the file for reading or read the entire file into memory. ReadMatches accepts a +// variadic number of ReadOptionFn that can be used to configure the compression type of the files +// and treatment of directories. By default, the compression type is determined by the file +// extension and directories are skipped. func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection { s = s.Scope("fileio.ReadMatches") option := &readOption{ - Compression: CompressionAuto, - DirectoryTreatment: DirectoryTreatmentSkip, + Compression: compressionAuto, + DirectoryTreatment: directorySkip, } for _, opt := range opts { - if err := opt(option); err != nil { - panic(fmt.Sprintf("fileio.ReadMatches: invalid option: %v", err)) - } + opt(option) } return beam.ParDo(s, newReadFn(option), col) } type readFn struct { - Compression Compression - DirectoryTreatment DirectoryTreatment + Compression compressionType + DirectoryTreatment directoryTreatment } func newReadFn(option *readOption) *readFn { @@ -101,7 +103,7 @@ func newReadFn(option *readOption) *readFn { func (fn *readFn) ProcessElement(metadata FileMetadata, emit func(ReadableFile)) error { if isDirectory(metadata.Path) { - if fn.DirectoryTreatment == DirectoryTreatmentDisallow { + if fn.DirectoryTreatment == directoryDisallow { return fmt.Errorf("path to directory not allowed: %q", metadata.Path) } return nil diff --git a/sdks/go/pkg/beam/io/fileio/read_test.go b/sdks/go/pkg/beam/io/fileio/read_test.go index ba0b79c9fb48..778bb03302fe 100644 --- a/sdks/go/pkg/beam/io/fileio/read_test.go +++ b/sdks/go/pkg/beam/io/fileio/read_test.go @@ -49,46 +49,46 @@ func TestReadMatches(t *testing.T) { Path: "file1.txt", Size: 5, }, - Compression: CompressionAuto, + Compression: compressionAuto, }, ReadableFile{ Metadata: FileMetadata{ Path: "file2.txt", Size: 0, }, - Compression: CompressionAuto, + Compression: compressionAuto, }, }, }, { name: "Read matches with specified compression", opts: []ReadOptionFn{ - WithReadCompression(CompressionGzip), + ReadCompressionGzip(), }, input: []any{ FileMetadata{ - Path: "file1.txt.gz", + Path: "file1", Size: 5, }, FileMetadata{ - Path: "file2.txt.gz", + Path: "file2", Size: 0, }, }, want: []any{ ReadableFile{ Metadata: FileMetadata{ - Path: "file1.txt.gz", + Path: "file1", Size: 5, }, - Compression: CompressionGzip, + Compression: compressionGzip, }, ReadableFile{ Metadata: FileMetadata{ - Path: "file2.txt.gz", + Path: "file2", Size: 0, }, - Compression: CompressionGzip, + Compression: compressionGzip, }, }, }, @@ -110,14 +110,14 @@ func TestReadMatches(t *testing.T) { Path: "file1.txt", Size: 5, }, - Compression: CompressionAuto, + Compression: compressionAuto, }, }, }, { name: "Error - directories disallowed", opts: []ReadOptionFn{ - WithDirectoryTreatment(DirectoryTreatmentDisallow), + ReadDirectoryDisallow(), }, input: []any{ FileMetadata{ From d2217cb4df97859bacff2ac99d0371930c7dd9a1 Mon Sep 17 00:00:00 2001 From: Johanna Ojeling Date: Fri, 24 Mar 2023 17:42:07 +0100 Subject: [PATCH 7/9] Update CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 9943086167f2..fb65282a5401 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -61,6 +61,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * BigQuery Storage Write API is now available in Python SDK via cross-language ([#21961](https://github.com/apache/beam/issues/21961)). +* Added fileio transforms MatchFiles, MatchAll and ReadMatches (Go) ([#25779](https://github.com/apache/beam/issues/25779)). ## New Features / Improvements From 1c535d3714b2e7b5a1764a3101e94467b00866af Mon Sep 17 00:00:00 2001 From: Johanna Ojeling Date: Tue, 28 Mar 2023 21:26:12 +0200 Subject: [PATCH 8/9] Rename read compression options --- sdks/go/pkg/beam/io/fileio/read.go | 8 ++++---- sdks/go/pkg/beam/io/fileio/read_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/io/fileio/read.go b/sdks/go/pkg/beam/io/fileio/read.go index 0a57409360a9..7bcd6cf704d2 100644 --- a/sdks/go/pkg/beam/io/fileio/read.go +++ b/sdks/go/pkg/beam/io/fileio/read.go @@ -47,15 +47,15 @@ type readOption struct { // reading files. type ReadOptionFn func(*readOption) -// ReadCompressionGzip specifies that files have been compressed using gzip. -func ReadCompressionGzip() ReadOptionFn { +// ReadGzip specifies that files have been compressed using gzip. +func ReadGzip() ReadOptionFn { return func(o *readOption) { o.Compression = compressionGzip } } -// ReadCompressionUncompressed specifies that files have not been compressed. -func ReadCompressionUncompressed() ReadOptionFn { +// ReadUncompressed specifies that files have not been compressed. +func ReadUncompressed() ReadOptionFn { return func(o *readOption) { o.Compression = compressionUncompressed } diff --git a/sdks/go/pkg/beam/io/fileio/read_test.go b/sdks/go/pkg/beam/io/fileio/read_test.go index 778bb03302fe..b961d324f9ac 100644 --- a/sdks/go/pkg/beam/io/fileio/read_test.go +++ b/sdks/go/pkg/beam/io/fileio/read_test.go @@ -63,7 +63,7 @@ func TestReadMatches(t *testing.T) { { name: "Read matches with specified compression", opts: []ReadOptionFn{ - ReadCompressionGzip(), + ReadGzip(), }, input: []any{ FileMetadata{ From 256ae9cefbd74bac73b7890396f6bd6ed4afbb1e Mon Sep 17 00:00:00 2001 From: Johanna Ojeling Date: Tue, 28 Mar 2023 21:29:58 +0200 Subject: [PATCH 9/9] Provide default options --- sdks/go/pkg/beam/io/fileio/match.go | 8 ++++++++ sdks/go/pkg/beam/io/fileio/read.go | 14 ++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/sdks/go/pkg/beam/io/fileio/match.go b/sdks/go/pkg/beam/io/fileio/match.go index 402312cdc640..0c8470c61b64 100644 --- a/sdks/go/pkg/beam/io/fileio/match.go +++ b/sdks/go/pkg/beam/io/fileio/match.go @@ -51,6 +51,14 @@ type matchOption struct { // matching files. type MatchOptionFn func(*matchOption) +// MatchEmptyAllowIfWildcard specifies that empty matches are allowed if the pattern contains a +// wildcard. +func MatchEmptyAllowIfWildcard() MatchOptionFn { + return func(o *matchOption) { + o.EmptyTreatment = emptyAllowIfWildcard + } +} + // MatchEmptyAllow specifies that empty matches are allowed. func MatchEmptyAllow() MatchOptionFn { return func(o *matchOption) { diff --git a/sdks/go/pkg/beam/io/fileio/read.go b/sdks/go/pkg/beam/io/fileio/read.go index 7bcd6cf704d2..b06f6a2a6f92 100644 --- a/sdks/go/pkg/beam/io/fileio/read.go +++ b/sdks/go/pkg/beam/io/fileio/read.go @@ -47,6 +47,13 @@ type readOption struct { // reading files. type ReadOptionFn func(*readOption) +// ReadAutoCompression specifies that the compression type of files should be auto-detected. +func ReadAutoCompression() ReadOptionFn { + return func(o *readOption) { + o.Compression = compressionAuto + } +} + // ReadGzip specifies that files have been compressed using gzip. func ReadGzip() ReadOptionFn { return func(o *readOption) { @@ -61,6 +68,13 @@ func ReadUncompressed() ReadOptionFn { } } +// ReadDirectorySkip specifies that directories are skipped. +func ReadDirectorySkip() ReadOptionFn { + return func(o *readOption) { + o.DirectoryTreatment = directorySkip + } +} + // ReadDirectoryDisallow specifies that directories are not allowed. func ReadDirectoryDisallow() ReadOptionFn { return func(o *readOption) {