diff --git a/CHANGES.md b/CHANGES.md index ab8a0b797a7b..0f54d233ad6f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -62,6 +62,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * BigQuery Storage Write API is now available in Python SDK via cross-language ([#21961](https://github.com/apache/beam/issues/21961)). * Added HbaseIO support for writing RowMutations (ordered by rowkey) to Hbase (Java) ([#25830](https://github.com/apache/beam/issues/25830)). +* Added fileio transforms MatchFiles, MatchAll and ReadMatches (Go) ([#25779](https://github.com/apache/beam/issues/25779)). ## New Features / Improvements diff --git a/sdks/go/pkg/beam/io/fileio/example_test.go b/sdks/go/pkg/beam/io/fileio/example_test.go new file mode 100644 index 000000000000..ed27546f9dd5 --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/example_test.go @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio_test + +import ( + "context" + "log" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" +) + +func ExampleMatchFiles() { + beam.Init() + p, s := beam.NewPipelineWithRoot() + + matches := fileio.MatchFiles(s, "gs://path/to/*.gz") + debug.Print(s, matches) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} + +func ExampleMatchAll() { + beam.Init() + p, s := beam.NewPipelineWithRoot() + + globs := beam.Create(s, "gs://path/to/sub1/*.gz", "gs://path/to/sub2/*.gz") + matches := fileio.MatchAll(s, globs) + debug.Print(s, matches) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} + +func ExampleReadMatches() { + beam.Init() + p, s := beam.NewPipelineWithRoot() + + pairFn := func(ctx context.Context, file fileio.ReadableFile, emit func(string, string)) error { + contents, err := file.ReadString(ctx) + if err != nil { + return err + } + emit(file.Metadata.Path, contents) + return nil + } + + matches := fileio.MatchFiles(s, "gs://path/to/*.gz") + files := fileio.ReadMatches(s, matches) + pairs := beam.ParDo(s, pairFn, files) + debug.Print(s, pairs) + + if err := beamx.Run(context.Background(), p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } +} diff --git a/sdks/go/pkg/beam/io/fileio/file.go b/sdks/go/pkg/beam/io/fileio/file.go new file mode 100644 index 000000000000..4ae7b3d3d074 --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/file.go @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "context" + "errors" + "io" + "path/filepath" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*FileMetadata)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*ReadableFile)(nil)).Elem()) +} + +// FileMetadata contains metadata about a file, namely its path and size in bytes. +type FileMetadata struct { + Path string + Size int64 +} + +// compressionType is the type of compression used to compress a file. +type compressionType int + +const ( + // compressionAuto indicates that the compression type should be auto-detected. + compressionAuto compressionType = iota + // compressionGzip indicates that the file is compressed using gzip. + compressionGzip + // compressionUncompressed indicates that the file is not compressed. + compressionUncompressed +) + +// ReadableFile is a wrapper around a FileMetadata and compressionType that can be used to obtain a +// file descriptor or read the file's contents. +type ReadableFile struct { + Metadata FileMetadata + Compression compressionType +} + +// Open opens the file for reading. The compression type is determined by the Compression field of +// the ReadableFile. If Compression is compressionAuto, the compression type is auto-detected from +// the file extension. It is the caller's responsibility to close the returned reader. +func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) { + fs, err := filesystem.New(ctx, f.Metadata.Path) + if err != nil { + return nil, err + } + defer fs.Close() + + rc, err := fs.OpenRead(ctx, f.Metadata.Path) + if err != nil { + return nil, err + } + + comp := f.Compression + if comp == compressionAuto { + comp = compressionFromExt(f.Metadata.Path) + } + + return newDecompressionReader(rc, comp) +} + +// compressionFromExt detects the compression of a file based on its extension. If the extension is +// not recognized, compressionUncompressed is returned. +func compressionFromExt(path string) compressionType { + switch filepath.Ext(path) { + case ".gz": + return compressionGzip + default: + return compressionUncompressed + } +} + +// newDecompressionReader returns an io.ReadCloser that can be used to read uncompressed data from +// reader, based on the specified compression. If the compression is compressionAuto, a non-nil +// error is returned. It is the caller's responsibility to close the returned reader. +func newDecompressionReader( + reader io.ReadCloser, + compression compressionType, +) (io.ReadCloser, error) { + switch compression { + case compressionAuto: + return nil, errors.New( + "compression must be resolved into a concrete type before obtaining a reader", + ) + case compressionGzip: + return newGzipReader(reader) + default: + return reader, nil + } +} + +// Read reads the entire file into memory and returns the contents. +func (f ReadableFile) Read(ctx context.Context) (data []byte, err error) { + rc, err := f.Open(ctx) + if err != nil { + return nil, err + } + + defer func() { + closeErr := rc.Close() + if err != nil { + if closeErr != nil { + log.Errorf(ctx, "error closing reader: %v", closeErr) + } + return + } + err = closeErr + }() + + return io.ReadAll(rc) +} + +// ReadString reads the entire file into memory and returns the contents as a string. +func (f ReadableFile) ReadString(ctx context.Context) (string, error) { + data, err := f.Read(ctx) + if err != nil { + return "", err + } + + return string(data), nil +} diff --git a/sdks/go/pkg/beam/io/fileio/file_test.go b/sdks/go/pkg/beam/io/fileio/file_test.go new file mode 100644 index 000000000000..5f7d292387c3 --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/file_test.go @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "bytes" + "context" + "path/filepath" + "testing" + "testing/iotest" +) + +func TestReadableFile_Open(t *testing.T) { + dir := t.TempDir() + write(t, filepath.Join(dir, "file1.txt"), []byte("test1")) + writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2")) + + tests := []struct { + name string + file ReadableFile + want []byte + }{ + { + name: "Open uncompressed file", + file: ReadableFile{ + Metadata: FileMetadata{ + Path: filepath.Join(dir, "file1.txt"), + }, + Compression: compressionUncompressed, + }, + want: []byte("test1"), + }, + { + name: "Open file with auto-detection of compression", + file: ReadableFile{ + Metadata: FileMetadata{ + Path: filepath.Join(dir, "file2.gz"), + }, + Compression: compressionAuto, + }, + want: []byte("test2"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + rc, err := tt.file.Open(ctx) + if err != nil { + t.Fatalf("Open() error = %v, want nil", err) + } + + t.Cleanup(func() { + rc.Close() + }) + + if err := iotest.TestReader(rc, tt.want); err != nil { + t.Errorf("TestReader() error = %v, want nil", err) + } + }) + } +} + +func Test_compressionFromExt(t *testing.T) { + tests := []struct { + name string + path string + want compressionType + }{ + { + name: "compressionGzip for gz extension", + path: "file.gz", + want: compressionGzip, + }, + { + name: "compressionUncompressed for no extension", + path: "file", + want: compressionUncompressed, + }, + { + name: "compressionUncompressed for unrecognized extension", + path: "file.unknown", + want: compressionUncompressed, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := compressionFromExt(tt.path); got != tt.want { + t.Errorf("compressionFromExt() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_newDecompressionReader(t *testing.T) { + dir := t.TempDir() + write(t, filepath.Join(dir, "file1.txt"), []byte("test1")) + writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2")) + + tests := []struct { + name string + path string + comp compressionType + want []byte + wantErr bool + }{ + { + name: "Reader for uncompressed file", + path: filepath.Join(dir, "file1.txt"), + comp: compressionUncompressed, + want: []byte("test1"), + }, + { + name: "Reader for gzip compressed file", + path: filepath.Join(dir, "file2.gz"), + comp: compressionGzip, + want: []byte("test2"), + }, + { + name: "Error - reader for auto compression not supported", + path: filepath.Join(dir, "file2.gz"), + comp: compressionAuto, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rc := openFile(t, tt.path) + + dr, err := newDecompressionReader(rc, tt.comp) + if (err != nil) != tt.wantErr { + t.Fatalf("newDecompressionReader() error = %v, wantErr %v", err, tt.wantErr) + } + if tt.wantErr { + return + } + + t.Cleanup(func() { + dr.Close() + }) + + if err := iotest.TestReader(dr, tt.want); err != nil { + t.Errorf("TestReader() error = %v, want nil", err) + } + }) + } +} + +func TestReadableFile_Read(t *testing.T) { + dir := t.TempDir() + write(t, filepath.Join(dir, "file1.txt"), []byte("test1")) + writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2")) + + tests := []struct { + name string + file ReadableFile + want []byte + }{ + { + name: "Read contents from uncompressed file", + file: ReadableFile{ + Metadata: FileMetadata{ + Path: filepath.Join(dir, "file1.txt"), + }, + Compression: compressionUncompressed, + }, + want: []byte("test1"), + }, + { + name: "Read contents from gzip compressed file", + file: ReadableFile{ + Metadata: FileMetadata{ + Path: filepath.Join(dir, "file2.gz"), + }, + Compression: compressionGzip, + }, + want: []byte("test2"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + got, err := tt.file.Read(ctx) + if err != nil { + t.Fatalf("Read() error = %v, want nil", err) + } + + if !bytes.Equal(got, tt.want) { + t.Errorf("Read() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestReadableFile_ReadString(t *testing.T) { + dir := t.TempDir() + write(t, filepath.Join(dir, "file1.txt"), []byte("test1")) + writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2")) + + tests := []struct { + name string + file ReadableFile + want string + }{ + { + name: "Read contents from uncompressed file as string", + file: ReadableFile{ + Metadata: FileMetadata{ + Path: filepath.Join(dir, "file1.txt"), + }, + Compression: compressionUncompressed, + }, + want: "test1", + }, + { + name: "Read contents from gzip compressed file as string", + file: ReadableFile{ + Metadata: FileMetadata{ + Path: filepath.Join(dir, "file2.gz"), + }, + Compression: compressionGzip, + }, + want: "test2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + got, err := tt.file.ReadString(ctx) + if err != nil { + t.Fatalf("ReadString() error = %v, want nil", err) + } + + if got != tt.want { + t.Errorf("ReadString() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/io/fileio/gzip.go b/sdks/go/pkg/beam/io/fileio/gzip.go new file mode 100644 index 000000000000..5a63d9be7d3c --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/gzip.go @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "compress/gzip" + "context" + "io" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" +) + +// gzipReader is a wrapper around a gzip.Reader that also closes the underlying io.ReadCloser. +type gzipReader struct { + rc io.ReadCloser + zr *gzip.Reader +} + +// newGzipReader creates a new gzipReader from an io.ReadCloser. +func newGzipReader(rc io.ReadCloser) (*gzipReader, error) { + zr, err := gzip.NewReader(rc) + if err != nil { + return nil, err + } + return &gzipReader{rc: rc, zr: zr}, nil +} + +// Read reads from the gzip reader. +func (r *gzipReader) Read(p []byte) (int, error) { + return r.zr.Read(p) +} + +// Close closes the gzip reader and the underlying io.ReadCloser. +func (r *gzipReader) Close() (err error) { + defer func() { + rcErr := r.rc.Close() + if err != nil { + if rcErr != nil { + log.Errorf(context.Background(), "error closing reader: %v", rcErr) + } + return + } + err = rcErr + }() + + return r.zr.Close() +} diff --git a/sdks/go/pkg/beam/io/fileio/helper_test.go b/sdks/go/pkg/beam/io/fileio/helper_test.go new file mode 100644 index 000000000000..e8b61ef067fd --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/helper_test.go @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "bufio" + "compress/gzip" + "io" + "os" + "path/filepath" + "testing" +) + +// openFile opens a file for reading. +func openFile(t *testing.T, path string) io.ReadCloser { + t.Helper() + + f, err := os.Open(path) + if err != nil { + t.Fatal(err) + } + + return f +} + +// createFile creates a file and parent directories if needed. +func createFile(t *testing.T, path string) *os.File { + t.Helper() + + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0750); err != nil && !os.IsExist(err) { + t.Fatal(err) + } + + file, err := os.Create(path) + if err != nil { + t.Fatal(err) + } + + return file +} + +// write writes data to a file. +func write(t *testing.T, path string, data []byte) { + t.Helper() + + f := createFile(t, path) + defer f.Close() + + bw := bufio.NewWriter(f) + if _, err := bw.Write(data); err != nil { + t.Fatal(err) + } + + if err := bw.Flush(); err != nil { + t.Fatal(err) + } +} + +// writeGzip compresses and writes data to a file using gzip. +func writeGzip(t *testing.T, path string, data []byte) { + t.Helper() + + f := createFile(t, path) + defer f.Close() + + zw := gzip.NewWriter(f) + if _, err := zw.Write(data); err != nil { + t.Fatal(err) + } + + if err := zw.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/sdks/go/pkg/beam/io/fileio/match.go b/sdks/go/pkg/beam/io/fileio/match.go new file mode 100644 index 000000000000..0c8470c61b64 --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/match.go @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package fileio provides transforms for matching and reading files. +package fileio + +import ( + "context" + "fmt" + "strings" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" +) + +func init() { + register.DoFn3x1[context.Context, string, func(FileMetadata), error](&matchFn{}) + register.Emitter1[FileMetadata]() +} + +// emptyTreatment controls how empty matches of a pattern are treated. +type emptyTreatment int + +const ( + // emptyAllow allows empty matches. + emptyAllow emptyTreatment = iota + // emptyDisallow disallows empty matches. + emptyDisallow + // emptyAllowIfWildcard allows empty matches if the pattern contains a wildcard. + emptyAllowIfWildcard +) + +type matchOption struct { + EmptyTreatment emptyTreatment +} + +// MatchOptionFn is a function that can be passed to MatchFiles or MatchAll to configure options for +// matching files. +type MatchOptionFn func(*matchOption) + +// MatchEmptyAllowIfWildcard specifies that empty matches are allowed if the pattern contains a +// wildcard. +func MatchEmptyAllowIfWildcard() MatchOptionFn { + return func(o *matchOption) { + o.EmptyTreatment = emptyAllowIfWildcard + } +} + +// MatchEmptyAllow specifies that empty matches are allowed. +func MatchEmptyAllow() MatchOptionFn { + return func(o *matchOption) { + o.EmptyTreatment = emptyAllow + } +} + +// MatchEmptyDisallow specifies that empty matches are not allowed. +func MatchEmptyDisallow() MatchOptionFn { + return func(o *matchOption) { + o.EmptyTreatment = emptyDisallow + } +} + +// MatchFiles finds all files matching the glob pattern and returns a PCollection of +// the matching files. MatchFiles accepts a variadic number of MatchOptionFn that can be used to +// configure the treatment of empty matches. By default, empty matches are allowed if the pattern +// contains a wildcard. +func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollection { + s = s.Scope("fileio.MatchFiles") + + filesystem.ValidateScheme(glob) + return MatchAll(s, beam.Create(s, glob), opts...) +} + +// MatchAll finds all files matching the glob patterns given by the incoming PCollection and +// returns a PCollection of the matching files. MatchAll accepts a variadic number of +// MatchOptionFn that can be used to configure the treatment of empty matches. By default, empty +// matches are allowed if the pattern contains a wildcard. +func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) beam.PCollection { + s = s.Scope("fileio.MatchAll") + + option := &matchOption{ + EmptyTreatment: emptyAllowIfWildcard, + } + + for _, opt := range opts { + opt(option) + } + + return beam.ParDo(s, newMatchFn(option), col) +} + +type matchFn struct { + EmptyTreatment emptyTreatment +} + +func newMatchFn(option *matchOption) *matchFn { + return &matchFn{ + EmptyTreatment: option.EmptyTreatment, + } +} + +func (fn *matchFn) ProcessElement( + ctx context.Context, + glob string, + emit func(FileMetadata), +) error { + if strings.TrimSpace(glob) == "" { + return nil + } + + fs, err := filesystem.New(ctx, glob) + if err != nil { + return err + } + defer fs.Close() + + files, err := fs.List(ctx, glob) + if err != nil { + return err + } + + if len(files) == 0 { + if !allowEmptyMatch(glob, fn.EmptyTreatment) { + return fmt.Errorf("no files matching pattern %q", glob) + } + return nil + } + + metadata, err := metadataFromFiles(ctx, fs, files) + if err != nil { + return err + } + + for _, md := range metadata { + emit(md) + } + + return nil +} + +func allowEmptyMatch(glob string, treatment emptyTreatment) bool { + switch treatment { + case emptyDisallow: + return false + case emptyAllowIfWildcard: + return strings.Contains(glob, "*") + default: + return true + } +} + +func metadataFromFiles( + ctx context.Context, + fs filesystem.Interface, + files []string, +) ([]FileMetadata, error) { + if len(files) == 0 { + return nil, nil + } + + metadata := make([]FileMetadata, len(files)) + + for i, path := range files { + size, err := fs.Size(ctx, path) + if err != nil { + return nil, err + } + + metadata[i] = FileMetadata{ + Path: path, + Size: size, + } + } + + return metadata, nil +} diff --git a/sdks/go/pkg/beam/io/fileio/match_test.go b/sdks/go/pkg/beam/io/fileio/match_test.go new file mode 100644 index 000000000000..57b2d8cfe1c5 --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/match_test.go @@ -0,0 +1,285 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "context" + "path/filepath" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/google/go-cmp/cmp" +) + +type testFile struct { + filename string + data []byte +} + +var testFiles = []testFile{ + { + filename: "file1.txt", + data: []byte("test1"), + }, + { + filename: "file2.txt", + data: []byte(""), + }, + { + filename: "file3.csv", + data: []byte("test3"), + }, +} + +func TestMatchFiles(t *testing.T) { + dir := t.TempDir() + testDir := filepath.Join(dir, "testdata") + + for _, tf := range testFiles { + write(t, filepath.Join(testDir, tf.filename), tf.data) + } + + tests := []struct { + name string + glob string + opts []MatchOptionFn + want []any + }{ + { + name: "Match files", + glob: filepath.Join(dir, "*", "file*.txt"), + want: []any{ + FileMetadata{ + Path: filepath.Join(testDir, "file1.txt"), + Size: 5, + }, + FileMetadata{ + Path: filepath.Join(testDir, "file2.txt"), + Size: 0, + }, + }, + }, + { + name: "Read matches with specified empty match treatment", + opts: []MatchOptionFn{ + MatchEmptyAllow(), + }, + glob: filepath.Join(dir, "non-existent.txt"), + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + + got := MatchFiles(s, tt.glob, tt.opts...) + + passert.Equals(s, got, tt.want...) + ptest.RunAndValidate(t, p) + }) + } +} + +func TestMatchAll(t *testing.T) { + dir := t.TempDir() + testDir := filepath.Join(dir, "testdata") + + for _, tf := range testFiles { + write(t, filepath.Join(testDir, tf.filename), tf.data) + } + + tests := []struct { + name string + opts []MatchOptionFn + input []any + want []any + wantErr bool + }{ + { + name: "Match all", + input: []any{ + filepath.Join(dir, "*", "file*.txt"), + filepath.Join(dir, "*", "file*.csv"), + }, + want: []any{ + FileMetadata{ + Path: filepath.Join(testDir, "file1.txt"), + Size: 5, + }, + FileMetadata{ + Path: filepath.Join(testDir, "file2.txt"), + Size: 0, + }, + FileMetadata{ + Path: filepath.Join(testDir, "file3.csv"), + Size: 5, + }, + }, + }, + { + name: "No matches", + input: []any{ + filepath.Join(dir, "*", "non-existent.txt"), + }, + want: nil, + }, + { + name: "No matches for empty glob", + input: []any{""}, + want: nil, + }, + { + name: "No matches for glob without wildcard and empty matches allowed", + opts: []MatchOptionFn{ + MatchEmptyAllow(), + }, + input: []any{ + filepath.Join(dir, "non-existent.txt"), + }, + want: nil, + }, + { + name: "Error - no matches for glob without wildcard", + input: []any{ + filepath.Join(dir, "non-existent.txt"), + }, + wantErr: true, + }, + { + name: "Error - no matches and empty matches disallowed", + opts: []MatchOptionFn{ + MatchEmptyDisallow(), + }, + input: []any{ + filepath.Join(dir, "*", "non-existent.txt"), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + + col := beam.Create(s, tt.input...) + got := MatchAll(s, col, tt.opts...) + + passert.Equals(s, got, tt.want...) + if err := ptest.Run(p); (err != nil) != tt.wantErr { + t.Errorf("MatchAll() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_allowEmptyMatch(t *testing.T) { + tests := []struct { + name string + glob string + treatment emptyTreatment + want bool + }{ + { + name: "Allow for emptyAllow", + glob: "path/to/file.txt", + treatment: emptyAllow, + want: true, + }, + { + name: "Disallow for emptyDisallow", + glob: "path/to/file.txt", + treatment: emptyDisallow, + want: false, + }, + { + name: "Allow for glob with wildcard and emptyAllowIfWildcard", + glob: "path/to/*.txt", + treatment: emptyAllowIfWildcard, + want: true, + }, + { + name: "Disallow for glob without wildcard and emptyAllowIfWildcard", + glob: "path/to/file.txt", + treatment: emptyAllowIfWildcard, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := allowEmptyMatch(tt.glob, tt.treatment); got != tt.want { + t.Errorf("allowEmptyMatch() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_metadataFromFiles(t *testing.T) { + dir := t.TempDir() + files := make([]string, len(testFiles)) + + for i, tf := range testFiles { + file := filepath.Join(dir, tf.filename) + write(t, file, tf.data) + files[i] = file + } + + tests := []struct { + name string + files []string + want []FileMetadata + }{ + { + name: "Slice of FileMetadata from file paths", + files: files, + want: []FileMetadata{ + { + Path: filepath.Join(dir, "file1.txt"), + Size: 5, + }, + { + Path: filepath.Join(dir, "file2.txt"), + Size: 0, + }, + { + Path: filepath.Join(dir, "file3.csv"), + Size: 5, + }, + }, + }, + { + name: "Nil when files is empty", + files: nil, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + fs := local.New(ctx) + + got, err := metadataFromFiles(ctx, fs, tt.files) + if err != nil { + t.Fatalf("metadataFromFiles() error = %v, want nil", err) + } + + if !cmp.Equal(got, tt.want) { + t.Errorf("metadataFromFiles() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/io/fileio/read.go b/sdks/go/pkg/beam/io/fileio/read.go new file mode 100644 index 000000000000..b06f6a2a6f92 --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/read.go @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "fmt" + "strings" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" +) + +func init() { + register.DoFn2x1[FileMetadata, func(ReadableFile), error](&readFn{}) + register.Emitter1[ReadableFile]() +} + +// directoryTreatment controls how paths to directories are treated when reading matches. +type directoryTreatment int + +const ( + // directorySkip skips directories. + directorySkip directoryTreatment = iota + // directoryDisallow disallows directories. + directoryDisallow +) + +type readOption struct { + Compression compressionType + DirectoryTreatment directoryTreatment +} + +// ReadOptionFn is a function that can be passed to ReadMatches to configure options for +// reading files. +type ReadOptionFn func(*readOption) + +// ReadAutoCompression specifies that the compression type of files should be auto-detected. +func ReadAutoCompression() ReadOptionFn { + return func(o *readOption) { + o.Compression = compressionAuto + } +} + +// ReadGzip specifies that files have been compressed using gzip. +func ReadGzip() ReadOptionFn { + return func(o *readOption) { + o.Compression = compressionGzip + } +} + +// ReadUncompressed specifies that files have not been compressed. +func ReadUncompressed() ReadOptionFn { + return func(o *readOption) { + o.Compression = compressionUncompressed + } +} + +// ReadDirectorySkip specifies that directories are skipped. +func ReadDirectorySkip() ReadOptionFn { + return func(o *readOption) { + o.DirectoryTreatment = directorySkip + } +} + +// ReadDirectoryDisallow specifies that directories are not allowed. +func ReadDirectoryDisallow() ReadOptionFn { + return func(o *readOption) { + o.DirectoryTreatment = directoryDisallow + } +} + +// ReadMatches accepts the result of MatchFiles or MatchAll as a PCollection and +// converts it to a PCollection. The ReadableFile can be used to retrieve file +// metadata, open the file for reading or read the entire file into memory. ReadMatches accepts a +// variadic number of ReadOptionFn that can be used to configure the compression type of the files +// and treatment of directories. By default, the compression type is determined by the file +// extension and directories are skipped. +func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection { + s = s.Scope("fileio.ReadMatches") + + option := &readOption{ + Compression: compressionAuto, + DirectoryTreatment: directorySkip, + } + + for _, opt := range opts { + opt(option) + } + + return beam.ParDo(s, newReadFn(option), col) +} + +type readFn struct { + Compression compressionType + DirectoryTreatment directoryTreatment +} + +func newReadFn(option *readOption) *readFn { + return &readFn{ + Compression: option.Compression, + DirectoryTreatment: option.DirectoryTreatment, + } +} + +func (fn *readFn) ProcessElement(metadata FileMetadata, emit func(ReadableFile)) error { + if isDirectory(metadata.Path) { + if fn.DirectoryTreatment == directoryDisallow { + return fmt.Errorf("path to directory not allowed: %q", metadata.Path) + } + return nil + } + + file := ReadableFile{ + Metadata: metadata, + Compression: fn.Compression, + } + + emit(file) + return nil +} + +func isDirectory(path string) bool { + if strings.HasSuffix(path, "/") || strings.HasSuffix(path, "\\") { + return true + } + return false +} diff --git a/sdks/go/pkg/beam/io/fileio/read_test.go b/sdks/go/pkg/beam/io/fileio/read_test.go new file mode 100644 index 000000000000..b961d324f9ac --- /dev/null +++ b/sdks/go/pkg/beam/io/fileio/read_test.go @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileio + +import ( + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" +) + +func TestReadMatches(t *testing.T) { + tests := []struct { + name string + opts []ReadOptionFn + input []any + want []any + wantErr bool + }{ + { + name: "Read matches", + input: []any{ + FileMetadata{ + Path: "file1.txt", + Size: 5, + }, + FileMetadata{ + Path: "file2.txt", + Size: 0, + }, + }, + want: []any{ + ReadableFile{ + Metadata: FileMetadata{ + Path: "file1.txt", + Size: 5, + }, + Compression: compressionAuto, + }, + ReadableFile{ + Metadata: FileMetadata{ + Path: "file2.txt", + Size: 0, + }, + Compression: compressionAuto, + }, + }, + }, + { + name: "Read matches with specified compression", + opts: []ReadOptionFn{ + ReadGzip(), + }, + input: []any{ + FileMetadata{ + Path: "file1", + Size: 5, + }, + FileMetadata{ + Path: "file2", + Size: 0, + }, + }, + want: []any{ + ReadableFile{ + Metadata: FileMetadata{ + Path: "file1", + Size: 5, + }, + Compression: compressionGzip, + }, + ReadableFile{ + Metadata: FileMetadata{ + Path: "file2", + Size: 0, + }, + Compression: compressionGzip, + }, + }, + }, + { + name: "Read matches and skip directories", + input: []any{ + FileMetadata{ + Path: "dir/", + Size: 0, + }, + FileMetadata{ + Path: "file1.txt", + Size: 5, + }, + }, + want: []any{ + ReadableFile{ + Metadata: FileMetadata{ + Path: "file1.txt", + Size: 5, + }, + Compression: compressionAuto, + }, + }, + }, + { + name: "Error - directories disallowed", + opts: []ReadOptionFn{ + ReadDirectoryDisallow(), + }, + input: []any{ + FileMetadata{ + Path: "dir/", + Size: 0, + }, + FileMetadata{ + Path: "file1.txt", + Size: 5, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Run(tt.name, func(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + + col := beam.Create(s, tt.input...) + got := ReadMatches(s, col, tt.opts...) + + passert.Equals(s, got, tt.want...) + if err := ptest.Run(p); (err != nil) != tt.wantErr { + t.Errorf("ReadMatches() error = %v, wantErr %v", err, tt.wantErr) + } + }) + }) + } +} + +func Test_isDirectory(t *testing.T) { + tests := []struct { + name string + path string + want bool + }{ + { + name: "Path to directory with forward slash directory separator", + path: "path/to/", + want: true, + }, + { + name: "Path to directory with backslash directory separator", + path: "path\\to\\", + want: true, + }, + { + name: "Path to file", + path: "path/to/file", + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isDirectory(tt.path); got != tt.want { + t.Errorf("isDirectory() = %v, want %v", got, tt.want) + } + }) + } +}