Skip to content

Commit

Permalink
[Go SDK]: Add fileio MatchFiles, MatchAll and ReadMatches transforms (#…
Browse files Browse the repository at this point in the history
…25809)

* Create utility file structs

* Create MatchFiles and MatchAll transforms

* Create ReadMatches transform

* Add example doc for transforms

* Move FileMetadata and Compression to fileio

* Provide functional options without exporting enums

* Update CHANGES.md

* Rename read compression options

* Provide default options
  • Loading branch information
johannaojeling authored Mar 29, 2023
1 parent 8778339 commit 9c61455
Show file tree
Hide file tree
Showing 10 changed files with 1,414 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* BigQuery Storage Write API is now available in Python SDK via cross-language ([#21961](https://github.com/apache/beam/issues/21961)).
* Added HbaseIO support for writing RowMutations (ordered by rowkey) to Hbase (Java) ([#25830](https://github.com/apache/beam/issues/25830)).
* Added fileio transforms MatchFiles, MatchAll and ReadMatches (Go) ([#25779](https://github.com/apache/beam/issues/25779)).

## New Features / Improvements

Expand Down
74 changes: 74 additions & 0 deletions sdks/go/pkg/beam/io/fileio/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fileio_test

import (
"context"
"log"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
)

func ExampleMatchFiles() {
beam.Init()
p, s := beam.NewPipelineWithRoot()

matches := fileio.MatchFiles(s, "gs://path/to/*.gz")
debug.Print(s, matches)

if err := beamx.Run(context.Background(), p); err != nil {
log.Fatalf("Failed to execute job: %v", err)
}
}

func ExampleMatchAll() {
beam.Init()
p, s := beam.NewPipelineWithRoot()

globs := beam.Create(s, "gs://path/to/sub1/*.gz", "gs://path/to/sub2/*.gz")
matches := fileio.MatchAll(s, globs)
debug.Print(s, matches)

if err := beamx.Run(context.Background(), p); err != nil {
log.Fatalf("Failed to execute job: %v", err)
}
}

func ExampleReadMatches() {
beam.Init()
p, s := beam.NewPipelineWithRoot()

pairFn := func(ctx context.Context, file fileio.ReadableFile, emit func(string, string)) error {
contents, err := file.ReadString(ctx)
if err != nil {
return err
}
emit(file.Metadata.Path, contents)
return nil
}

matches := fileio.MatchFiles(s, "gs://path/to/*.gz")
files := fileio.ReadMatches(s, matches)
pairs := beam.ParDo(s, pairFn, files)
debug.Print(s, pairs)

if err := beamx.Run(context.Background(), p); err != nil {
log.Fatalf("Failed to execute job: %v", err)
}
}
142 changes: 142 additions & 0 deletions sdks/go/pkg/beam/io/fileio/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fileio

import (
"context"
"errors"
"io"
"path/filepath"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

func init() {
beam.RegisterType(reflect.TypeOf((*FileMetadata)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*ReadableFile)(nil)).Elem())
}

// FileMetadata contains metadata about a file, namely its path and size in bytes.
type FileMetadata struct {
Path string
Size int64
}

// compressionType is the type of compression used to compress a file.
type compressionType int

const (
// compressionAuto indicates that the compression type should be auto-detected.
compressionAuto compressionType = iota
// compressionGzip indicates that the file is compressed using gzip.
compressionGzip
// compressionUncompressed indicates that the file is not compressed.
compressionUncompressed
)

// ReadableFile is a wrapper around a FileMetadata and compressionType that can be used to obtain a
// file descriptor or read the file's contents.
type ReadableFile struct {
Metadata FileMetadata
Compression compressionType
}

// Open opens the file for reading. The compression type is determined by the Compression field of
// the ReadableFile. If Compression is compressionAuto, the compression type is auto-detected from
// the file extension. It is the caller's responsibility to close the returned reader.
func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) {
fs, err := filesystem.New(ctx, f.Metadata.Path)
if err != nil {
return nil, err
}
defer fs.Close()

rc, err := fs.OpenRead(ctx, f.Metadata.Path)
if err != nil {
return nil, err
}

comp := f.Compression
if comp == compressionAuto {
comp = compressionFromExt(f.Metadata.Path)
}

return newDecompressionReader(rc, comp)
}

// compressionFromExt detects the compression of a file based on its extension. If the extension is
// not recognized, compressionUncompressed is returned.
func compressionFromExt(path string) compressionType {
switch filepath.Ext(path) {
case ".gz":
return compressionGzip
default:
return compressionUncompressed
}
}

// newDecompressionReader returns an io.ReadCloser that can be used to read uncompressed data from
// reader, based on the specified compression. If the compression is compressionAuto, a non-nil
// error is returned. It is the caller's responsibility to close the returned reader.
func newDecompressionReader(
reader io.ReadCloser,
compression compressionType,
) (io.ReadCloser, error) {
switch compression {
case compressionAuto:
return nil, errors.New(
"compression must be resolved into a concrete type before obtaining a reader",
)
case compressionGzip:
return newGzipReader(reader)
default:
return reader, nil
}
}

// Read reads the entire file into memory and returns the contents.
func (f ReadableFile) Read(ctx context.Context) (data []byte, err error) {
rc, err := f.Open(ctx)
if err != nil {
return nil, err
}

defer func() {
closeErr := rc.Close()
if err != nil {
if closeErr != nil {
log.Errorf(ctx, "error closing reader: %v", closeErr)
}
return
}
err = closeErr
}()

return io.ReadAll(rc)
}

// ReadString reads the entire file into memory and returns the contents as a string.
func (f ReadableFile) ReadString(ctx context.Context) (string, error) {
data, err := f.Read(ctx)
if err != nil {
return "", err
}

return string(data), nil
}
Loading

0 comments on commit 9c61455

Please sign in to comment.